Wysyłanie/tworzenie json-wiadomości poprzez kafka

0

Pytanie

To jest mój pierwszy raz, kiedy używam Kafka, i mam zamiar używać kafka z .net

Chciałem wiedzieć, czy mogę wysyłać JSON jako wiadomości podczas tworzenia wydarzenia

Śledzę podręcznika: https://developer.confluent.io/get-started/dotnet/#build-producer

Ponadto, czy istnieje sposób, aby porównać tę wartość z modelem, aby struktura value/json zawsze była przywiązana do tego modelu

Tak więc, na przykład: jeśli chcę, aby moje wartość json było

{
  "customerName":"anything",
  "eventType":"one-of-three-enums",
  "columnsChanged": "string value or something"
}

Większość przykładów, które mogę znaleźć, jak to:

using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;

class Producer {
    static void Main(string[] args)
    {
        if (args.Length != 1) {
            Console.WriteLine("Please provide the configuration file path as a command line argument");
        }

        IConfiguration configuration = new ConfigurationBuilder()
            .AddIniFile(args[0])
            .Build();

        const string topic = "purchases";

        string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
        string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };

        using (var producer = new ProducerBuilder<string, string>(
            configuration.AsEnumerable()).Build())
        {
            var numProduced = 0;
            const int numMessages = 10;
            for (int i = 0; i < numMessages; ++i)
            {
                Random rnd = new Random();
                var user = users[rnd.Next(users.Length)];
                var item = items[rnd.Next(items.Length)];

                producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError) {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else {
                            Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
                            numProduced += 1;
                        }
                    });
            }

            producer.Flush(TimeSpan.FromSeconds(10));
            Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
        }
    }
}

Chciałbym, aby element był klasą w strukturze json.

.net apache-kafka asp.net-core
2021-11-23 21:53:21
1

Najlepsza odpowiedź

0

chciałem wiedzieć, czy mogę wysyłać JSON jako wiadomości podczas tworzenia wydarzenia

Tak. Kafka bajtów przechowuje i przetwarza bajty za pomocą сериализаторов. Przy tworzeniu firmy masz możliwość zadzwonić SetValueSerializer.

Niektóre z wbudowanych сериализаторов można znaleźć pod adresem- https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Кафка/Сериализаторы.cs

Trzeba będzie napisać własny, aby w ogóle obsługiwać wszystkie typy modeli JSON.

Podczas korzystania z serializacji UTF8 dla wierszy konieczna wstępnie serializacji obiektu z klasy modelu, a następnie wysłać go jako wartości. W twoim przykładzie można by zastąpiły var item z pewnym сериализованным obiektem.

Jak przekształcić obiekt C# w ciąg znaków JSON .NET?

Podczas korzystania z klas modeli twoje dane, jak zwykle, będą surowo symbolizują do momentu, aż zacznie się ręcznie pisać JSON lub użyć typy słowników. Jeśli potrzebujesz kontrola zewnętrznych, wiadomości, rejestr planów fuzji jest jednym z przykładów, który obsługuje JSONSchema i JsonSerializer Od confluent-dotnet-kafka projekt obsługuje to.

2021-11-23 22:27:28

Tylko kolejne pytanie. Czy wiesz, czy mogę ograniczyć rozmiar wiadomości i czy istnieje sposób, aby producenta sprawdzić, jaki rozmiar zdarzeń przed wysłaniem, i nie zezwalaj na wysyłanie wiadomości, jeśli wielkość przekracza limit?
Learn AspNet

U Kafki jest domyślny limit w 1 MB pakietów wiadomości. Jeśli dostaniesz rozmiar сериализованного tablicy bajtów, to musi być bliskie zbliżenie się do wielkości pojedynczego rekordu (istnieją jednak dodatkowe koszty, takie jak nagłówki wpisów i znacznika czasu).
OneCricketeer

Dziękuję. Nie można odpowiedzieć, proszę: stackoverflow.com/questions/70097676/...
Learn AspNet

W innych językach

Ta strona jest w innych językach

Русский
..................................................................................................................
Italiano
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................