W jego wierszu dokumentu, elasticsearch.helpers.async_bulk
opisuje siebie jako
Asystent :metamfetamina:
~elasticsearch.AsyncElasticsearch.bulk
api, który zapewnia bardziej przyjazny dla człowieka interfejs - wykorzystuje iterator działań i wysyła ich w elasticsearch w części. Źródło
Kontekst
Używam AsyncElasticsearch.bulk()
z powodzeniem wysłać ramki danych pandas w jakikolwiek egzemplarz ES
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
Pytanie
Jednak, jeśli chodzi o async_bulk
dostaję index is missing
błąd.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Próbowałem dostroić _rec_to_actions()
kilka sposobów bez większego efektu.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
Myślę, że główny problem jest w tym, że nie jestem całkiem pewien, że wiem, że to jest działanie w kontekście elasticsearch. To pojęcie jest obecny wszędzie w dokumentacji, ale w kodzie źródłowym tej biblioteki nie ma wyraźnej struktury danych (w każdym razie, ja nie mogłem jej znaleźć).
Co to jest działanie i jak mam ustawić swój własny generator do wysyłania danych w df self.index
?
Środowisko
- python = "3.9.5"
- elasticsearch = "7.14.1"