I want to join a KStream with a KTable. Both have a different key but are co-partitioned using a custom partitioner. However, the join does not produce and results.
The KStream has the following structure
- key: House - Group
- value: User
The KTable has the following structure
- key: User - Group
- value: Address
To make sure every insert both topics are processed in insertion order, I'm using a custom Partitioner where I'm partitioning both topics using the Group part of each key.
I want to end up with a stream of the following structure:
- key: House - Group
- value: User - Address
For this I'm doing the following:
val streamsBuilder = streamBuilderHolder.streamsBuilder
val houseToUser = streamsBuilder.stream<HouseGroup, User>("houseToUser")
val userToAddress = streamsBuilder.table<UserGroup, Address>("userToAddress")
val result: KStream<HouseGroup, UserWithAddress> = houseToUser
.map { k: HouseGroup, v: User ->
val newKey = UserGroup(v, k.group)
val newVal = UserHouse(v, k.house)
KeyValue(newKey, newVal)
}
.join(userToAddress) { v1: UserHouse, v2: Address ->
UserHouseWithAddress(v1, v2)
}
.map{k: UserGroup, v: UserHouseWithAddress ->
val newKey = HouseGroup(v.house, k.group)
val newVal = UserWithAddress(k.user, v.address)
KeyValue(newKey, newVal)
}
This expected a matching join but that did not work.
I guess the obvious solution is to join with a global table and let go of the custom partitioner. However, I still don't understand why the above would not work.