Połączenie dwóch wątków w strumieniu Konta

0

Pytanie

Staram się połączyć dwa wątki i nie mogę wyjaśnić wynik mojej realizacji.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

Spodziewam się kolejnego wyjścia z tego kodu.

2
3
4
.
.
.
11
10
20
.
.
.
100

Zamiast tego widzę, że jest drukowany tylko "2". Nie mógłbyś, proszę, wyjaśnić, co jest nie tak w mojej domniemanie i jak muszę zmienić program, aby uzyskać pożądany wynik.

akka akka-stream scala
2021-10-21 17:29:00
2

Najlepsza odpowiedź

3

Z dokumentów API Akka Stream:

Concat:

Wystawia się, gdy w bieżącym wątku jest dostępny element; jeśli aktualny enter zakończony, stara się wykonać następujący

Broadcast:

Promieniuje, gdy wszystkie wyjścia przestają inaczej ciśnienie i jest dostępny wejściowy element

Dwa operatora nie będą działać wspólnie, tak jak istnieje konflikt w tym, jak one działają -- Concat próbuje wyciągnąć wszystkie elementy z jednego z Broadcastwyjścia przed przełączeniem na inny, podczas gdy Broadcast nie będzie emitować, jeśli nie będzie popytu na jego produkty.

Do tego, co trzeba, można połączyć za pomocąconcat jak sugerują komentatorzy:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

lub, co jest równoważne, należy użyćSource.combine jak pokazano poniżej:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
2021-10-21 22:34:04
0

Z pomocą GraphDSL, który jest uproszczoną wersją realizacji Source.combine:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
2021-10-26 19:23:56

W innych językach

Ta strona jest w innych językach

Русский
..................................................................................................................
Italiano
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................