That was an amazing demo, literally the same ideas can be applied using the Python client and I love the way you described the problem, pretty language agnostic. The blog post is also neat as supplementary material. Thanks for your time.
My naive solution for this optimization is (haven't run it to see if it works): - Producer: KafkaProducer is thread safe, so I will create a coroutine for each message. Each coroutine takes the same kafkaProducer, uses it to send a message, waits until the Future returned from kafkaProducer.send resolves then finishes. So with 10M messages, we'll have 10M coroutines. If 10M coroutines is too many for the Kotlin coroutine library, I hope there'll be a coroutine pool executor service (like the thread pool executor service in Java) for us to use. - Consumer: of course we need a number of kafka consumers which is equals to the number of partitions. To make each kafka consumer consume messages at the highest speed, I won't use the JsonDeserializer (decoding a byte array to a json string takes time) but use the ByteArrayDeserializer (i.e. no decoding is needed). The kafka consumer will send the byte array to a blocking queue / ring buffer for other coroutines to consume and process.Disadvantage of this method is that if our message processing is slow, there'll be a lot of messages in the blocking queue. What do you think of this solution?
An extremely helpful video! thanks a million 🤩
That was an amazing demo, literally the same ideas can be applied using the Python client and I love the way you described the problem, pretty language agnostic. The blog post is also neat as supplementary material. Thanks for your time.
Glad it helps!
full kafka course soon? 😇
You'd like that?
@Rock the JVM your courses and code are so close to real life problems, love it, kafka course will take it
My naive solution for this optimization is (haven't run it to see if it works):
- Producer: KafkaProducer is thread safe, so I will create a coroutine for each message. Each coroutine takes the same kafkaProducer, uses it to send a message, waits until the Future returned from kafkaProducer.send resolves then finishes. So with 10M messages, we'll have 10M coroutines. If 10M coroutines is too many for the Kotlin coroutine library, I hope there'll be a coroutine pool executor service (like the thread pool executor service in Java) for us to use.
- Consumer: of course we need a number of kafka consumers which is equals to the number of partitions. To make each kafka consumer consume messages at the highest speed, I won't use the JsonDeserializer (decoding a byte array to a json string takes time) but use the ByteArrayDeserializer (i.e. no decoding is needed). The kafka consumer will send the byte array to a blocking queue / ring buffer for other coroutines to consume and process.Disadvantage of this method is that if our message processing is slow, there'll be a lot of messages in the blocking queue.
What do you think of this solution?
Talk is cheap, code is king - try it out!
@@rockthejvm 😊 fair enlugh