Wejście przepływu Konta (`Input") jako wyjście (`Wyjście")

0

Pytanie

Próbuję napisać fragment kodu, który wykonuje następujące czynności:-

  1. Odczytuje ogromny csv-plik ze zdalnego źródła, takiego jak s3.
  2. Przetwarzaj plik rekord za rekordem.
  3. Wyślij powiadomienie do użytkownika
  4. Zapisz dane wyjściowe w zdalnej lokalizacji

Zapis próbki, w wejściu formacie csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

Mój wejściowy klasa case, który jest nagrywanie w wejściowym pliku csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Przykład zapisu w wyjściowym formacie csv (który należy zapisać):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Mój wyjściowy klasy wariantów, który jest nagrywanie w wejściowym pliku csv:

case class OutputRecord(recordId: String, name: String, designation: String)

Czytanie wpisów za pomocą pliku csv akka stream (wykorzystuje Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Teraz mam funkcja do obsługi rekordów:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Funkcja nagrywania wyjście nagrywania w formacie csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Funkcja wysyłania powiadomień e-mail:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Połącz to wszystko razem

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

W wierszu 15 i 16 dostaję błąd, mogę dodać wiersz 15 lub wiersz 16, ale nie obie, tak jak obie notify & writeOutput potrzeby outputRecord. Jak tylko powiadomienie spowodowane tracę outputRecord.

Czy sposób, w jaki mogę dodać oba notify i writeOutput do tego grafikę?

Nie szukam równoległe wykonanie, tak jak chcę zadzwonić notify i to tylko writeOutput. Tak że to nie pomoże: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

Opcja korzystania wydaje mi się bardzo proste, ale nie mogę znaleźć czyste rozwiązanie.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

Najlepsza odpowiedź

1

Wyjście znotify to PushResultale wkład writeOutput jest ByteString. Jak tylko zmienisz to, z którym zostanie skompilowany. Na wypadek, gdyby trzeba ByteStringdostaniesz to samo OutputRecord.

Przy okazji, w podanym przez ciebie przykładzie kodu podobny błąd występuje w readCSV i process.

2021-11-24 03:36:16

W innych językach

Ta strona jest w innych językach

Русский
..................................................................................................................
Italiano
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................