Ja jak nowy w strumieniu Scala i Akka, i staram się, aby ciąg wiadomości JSON z sklepu internetowego i przenieść je w temat Kafki.
W tej chwili pracuję tylko nad częścią "odbierać wiadomości od ws".
Wiadomości pochodzące z witryny, wyglądają następująco :
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
Chcę dzielić tę wiadomość JSON na kilka wiadomości :
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
A następnie przesuń każdy z tych wiadomości na temat kafki.
Oto co osiągnąłem do tej pory :
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
To działa, dostaję oczekiwane wyjście komunikat Json, ale zastanawiałem się, czy mogę napisać tego producenta w bardziej "konta-owski" stylu, na przykład, za pomocą GraphDSL. Dlatego mam kilka pytań :
- Czy możliwe jest w sposób ciągły spożywać strony internetowej za pomocą GraphDSL ? Jeśli tak, nie możesz mi pokazać przykład, proszę ?
- Dobra czy pomysł wykorzystania WS z pomocą GraphDSL ?
- Czy muszę rozłożyć otrzymaną wiadomość Json, jak ja to robię, przed wysłaniem go "procesie" kafki ? Czy lepiej wysłać go tak jak jest w celu zmniejszenia opóźnienia ?
- Po utworzeniu wiadomości "procesie" kafki mam zamiar używać go za pomocą Apache Storm, to dobry pomysł ? Czy powinienem trzymać Акки ?
Dziękuję, że czytają mnie, pozdrawiam, Ares