Próbuję napisać fragment kodu, który wykonuje następujące czynności:-
- Odczytuje ogromny csv-plik ze zdalnego źródła, takiego jak s3.
- Przetwarzaj plik rekord za rekordem.
- Wyślij powiadomienie do użytkownika
- Zapisz dane wyjściowe w zdalnej lokalizacji
Zapis próbki, w wejściu formacie csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
Mój wejściowy klasa case, który jest nagrywanie w wejściowym pliku csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Przykład zapisu w wyjściowym formacie csv (który należy zapisać):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Mój wyjściowy klasy wariantów, który jest nagrywanie w wejściowym pliku csv:
case class OutputRecord(recordId: String, name: String, designation: String)
Czytanie wpisów za pomocą pliku csv akka stream (wykorzystuje Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Teraz mam funkcja do obsługi rekordów:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Funkcja nagrywania wyjście nagrywania w formacie csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Funkcja wysyłania powiadomień e-mail:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Połącz to wszystko razem
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
W wierszu 15 i 16 dostaję błąd, mogę dodać wiersz 15 lub wiersz 16, ale nie obie, tak jak obie notify
& writeOutput
potrzeby outputRecord
. Jak tylko powiadomienie spowodowane tracę outputRecord
.
Czy sposób, w jaki mogę dodać oba notify
i writeOutput
do tego grafikę?
Nie szukam równoległe wykonanie, tak jak chcę zadzwonić notify
i to tylko writeOutput
. Tak że to nie pomoże: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
Opcja korzystania wydaje mi się bardzo proste, ale nie mogę znaleźć czyste rozwiązanie.