Strumień Akka stale zużywa websocket

0

Pytanie

Ja jak nowy w strumieniu Scala i Akka, i staram się, aby ciąg wiadomości JSON z sklepu internetowego i przenieść je w temat Kafki.

W tej chwili pracuję tylko nad częścią "odbierać wiadomości od ws".

Wiadomości pochodzące z witryny, wyglądają następująco :

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

Chcę dzielić tę wiadomość JSON na kilka wiadomości :

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

A następnie przesuń każdy z tych wiadomości na temat kafki.

Oto co osiągnąłem do tej pory :

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

To działa, dostaję oczekiwane wyjście komunikat Json, ale zastanawiałem się, czy mogę napisać tego producenta w bardziej "konta-owski" stylu, na przykład, za pomocą GraphDSL. Dlatego mam kilka pytań :

  • Czy możliwe jest w sposób ciągły spożywać strony internetowej za pomocą GraphDSL ? Jeśli tak, nie możesz mi pokazać przykład, proszę ?
  • Dobra czy pomysł wykorzystania WS z pomocą GraphDSL ?
  • Czy muszę rozłożyć otrzymaną wiadomość Json, jak ja to robię, przed wysłaniem go "procesie" kafki ? Czy lepiej wysłać go tak jak jest w celu zmniejszenia opóźnienia ?
  • Po utworzeniu wiadomości "procesie" kafki mam zamiar używać go za pomocą Apache Storm, to dobry pomysł ? Czy powinienem trzymać Акки ?

Dziękuję, że czytają mnie, pozdrawiam, Ares

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

Najlepsza odpowiedź

1

Ten kod jest bardzo podobny do Konta-owski: scaladsl to tak samo Konta, jak i GraphDSL czy realizacja użytkownika GraphStage. Jedynym powodem, IMO/E, iść w GraphDSL jest, jeśli rzeczywista forma grafika nie jest łatwo wyraża się w scaladsl.

Ja bym osobiście udał się w drodze ustalenia CoinPrice klasa, aby zrobić model jawnej

case class CoinPrice(coin: String, price: BigDecimal)

A następnie wypić Flow[Message, CoinPrice, NotUsed] który analizuje 1 przychodząca wiadomość na zero lub więcej CoinPrices. Coś (używając tutaj Play JSON), np.:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

Można, w zależności od wielkości JSON w komunikacie, rozbicie go na poszczególne etapy przepływu, aby zapewnić asynchroniczne granicę między analizą JSON i wyjęciem w CoinPrices. Na przykład,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

W powyższym przykładzie etapy w obie strony od async granica będzie wykonywane poszczególnymi uczestnikami i, w ten sposób, być może, w tym samym czasie (jeśli dostępna wystarczająca ilość rdzeni procesora, itp.), z Powodu dodatkowych kosztów dla uczestników w celu koordynacji i wymiany wiadomości. Te dodatkowe koszty koordynacji/komunikacja (patrz Uniwersalne prawo skalowalności Güntera) będą uzasadnione tylko w przypadku, gdy obiekty JSON są dość duże i postępują na tyle szybko (konsekwentnie postępują do tego, jak poprzedni zakończy przetwarzanie).

Jeśli zamierzasz korzystać z websocket, dopóki program nie zatrzyma się, może łatwiej będzie po prostu użyć Source.never[Message].

2021-11-21 12:42:30

Dziękuję za odpowiedź, wszystko jest bardzo jasne, mam tylko jedno pytanie. Jak mogę podzielić swoją odpowiedź na różne etapy przepływu ? Nie można po prostu pokazać mi mały przykład, proszę ? Lub skierować mnie na odpowiednią część dokumentacji ?
Arès

W innych językach

Ta strona jest w innych językach

Русский
..................................................................................................................
Italiano
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................