1

I want to join a KStream with a KTable. Both have a different key but are co-partitioned using a custom partitioner. However, the join does not produce and results.

The KStream has the following structure
- key: House - Group
- value: User
The KTable has the following structure
- key: User - Group
- value: Address

To make sure every insert both topics are processed in insertion order, I'm using a custom Partitioner where I'm partitioning both topics using the Group part of each key.

I want to end up with a stream of the following structure:
- key: House - Group
- value: User - Address

For this I'm doing the following:

val streamsBuilder = streamBuilderHolder.streamsBuilder
val houseToUser = streamsBuilder.stream<HouseGroup, User>("houseToUser")
val userToAddress = streamsBuilder.table<UserGroup, Address>("userToAddress")
val result: KStream<HouseGroup, UserWithAddress> = houseToUser
        .map { k: HouseGroup, v: User ->
            val newKey = UserGroup(v, k.group)
            val newVal = UserHouse(v, k.house)
            KeyValue(newKey, newVal)
        }
        .join(userToAddress) { v1: UserHouse, v2: Address ->
            UserHouseWithAddress(v1, v2)
        }
        .map{k: UserGroup, v: UserHouseWithAddress ->
            val newKey = HouseGroup(v.house, k.group)
            val newVal = UserWithAddress(k.user, v.address)
            KeyValue(newKey, newVal)
        }

This expected a matching join but that did not work.

I guess the obvious solution is to join with a global table and let go of the custom partitioner. However, I still don't understand why the above would not work.

2 Answers 2

2

I think the lack of matching is caused because different partitioners are used.

For your input topic CustomPartitioner is used. Kafka Streams be default uses org.apache.kafka.clients.producer.internals.DefaultPartitioner.

In your code just before KStream::join you have called KStream::map. KStream::map function enforced repartitioning before KStream::join. During repartioning messages are flushed to Kafka ($AppName-KSTREAM-MAP-000000000X-repartition topic). To spread messages Kafka Streams uses defined partitioner (property: ProducerConfig.PARTITIONER_CLASS_CONFIG). Summarizing: messages with same keys might be in different partition for "repartition topic" and for "KTable topic"

Solution in your case will be set your custom partition in properties for your Kafka Streams application (props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner")

For debugging you can check repartition topic ($AppName-KSTREAM-MAP-000000000X-repartition). Messages with same keys like input topic might be in different partitions (different number)

Documentation about Join co-partitioning requirements

Sign up to request clarification or add additional context in comments.

2 Comments

Hi @wardziniak, would you consider it good practice to use a custom partitioner inside kafka streams?
@JanBols, It depends how good is your partitioner - whether distribution of messages is even across the partitions. Since you already use custom partitioner (messages in input topics are distributed using custom partitioner), I think it is ok to set it in your use case. If you would like to use Default one you have to repartition both input topic, so it won't be efficient.
0

Try this, works for me.

static async System.Threading.Tasks.Task Main(string[] args)
        {
 
            int count = 0;
            string line = null;

            var appConfig = getAppConfig(Enviroment.Dev);
            var schemaRegistrConfig = getSchemmaRegistryConfig(appConfig);
            var registry = new CachedSchemaRegistryClient(schemaRegistrConfig);
            var serializer = new AvroSerializer<YourAvroSchemaClass>(registry);

            var adminClient = new AdminClientBuilder(new AdminClientConfig( getClientConfig(appConfig))).Build();
            var topics = new List<TopicSpecification>(){ new TopicSpecification { Name = appConfig.OutputTopic, NumPartitions = 11}};

            await adminClient.CreateTopicsAsync(topics);

            var producerConfig = getProducerConfig(appConfig);

            var producer = new ProducerBuilder<string, byte[]>(producerConfig)
                .SetPartitioner(appConfig.OutputTopic, (string topicName, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull) =>
             {
                 var keyValueInInt = Convert.ToInt32(System.Text.UTF8Encoding.UTF8.GetString(keyData.ToArray()));
                 return (Partition)Math.Floor((double)(keyValueInInt % partitionCount));
             }).Build();

            using (producer)
            {
                Console.WriteLine($"Start to load data from : {appConfig.DataFileName}: { DateTime.Now} ");
                var watch = new Stopwatch();
                watch.Start();
                try
                {
                    var stream = new StreamReader(appConfig.DataFileName);
                    while ((line = stream.ReadLine()) != null)
                    {
                        var message = parseLine(line);
                        var data = await serializer.SerializeAsync(message.Value, new SerializationContext(MessageComponentType.Value, appConfig.OutputTopic));
                        producer.Produce(appConfig.OutputTopic, new Message<string, byte[]> { Key = message.Key, Value = data });
 
                        if (count++ % 1000 == 0)
                        {
                            producer.Flush();
                            Console.WriteLine($"Write ... {count} in {watch.Elapsed.TotalSeconds} seconds");
                        }
                    }
                    producer.Flush();
                }
                catch (ProduceException<Null, string> e)
                {
                    Console.WriteLine($"Line: {line}");
                    Console.WriteLine($"Delivery failed: {e.Error.Reason}");
                    System.Environment.Exit(101);
                }
        finally
        {
            producer.Flush();
           
                }
            }
        }

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.