Issues scaling kafka consumers with Dask
I have run two simple pipelines(Non-dask and Dask) to read stream of messages from Kafka using Streamz FromKafkaBatched method. I seem to get similar throughput for both the pipelines.
Non-Dask pipeline:
from distributed import Client
from streamz import Stream
import time
import json
def preprocess(messages):
start_time = int(round(time.time()))
no_of_rows = len(messages)
if no_of_rows > 0:
size = no_of_rows*len(messages[0])
kafka_timestamp = json.loads(messages[0].decode('utf-8'))['timestamp']
else:
size = 0
kafka_timestamp = start_time
end_time = int(round(time.time()))
return "{},{},{},{},{}".format(no_of_rows, kafka_timestamp, start_time, end_time, size)
topic = "topic-1"
bootstrap_servers = 'localhost:9092'
consumer_conf = {'bootstrap.servers': bootstrap_servers,
'group.id': 'group_1', 'session.timeout.ms': 60000}
stream = Stream.from_kafka_batched(topic, consumer_conf, poll_interval='10s',
npartitions=5, asynchronous = True, dask= False)
kafka_out = stream.map(preprocess).to_kafka('test-out', consumer_conf)
kafka_out.flush()
The pipeline gives maximum throughput = ~43 MBps (topic partitions = 5).
Throughput computation = (Σ size) / (max(end_time) - min(start_time))
By setting dask=True the above pipeline is executed on a local dask cluster
Starting Dask Cluster:
# dask imports
from distributed import Client, LocalCluster
# dask client
cluster = LocalCluster(ip="0.0.0.0", scheduler_port=8786, diagnostics_port = 8787, processes=False, threads_per_worker=10, n_workers=1)
client = Client(cluster)
client.get_versions(check=True)
The pipeline with Dask gives maximum throughput of ~44.8 MBps when number of partitions in the topic = 1. Increasing the number of partitions in a topic decreases the performance.
My understanding from these experimentations is that confluent kafka performs as expected with single Dask thread (when number of partitions is 1) but performs poorly with multiple dask threads or multiple partitions. This may be due to underlying tornado async library. It would be great if experts help me understand this better as I am fairly new to async programming.
Hardware and Software Specs: 10 CPU cores, 30 GB RAM Ubuntu 16.04, Streamz(latest-build off latest code base) Kafka 2.2.0, confluent-kafka-python 1.0.0
I would be interested to see how this changes if you run with multiple worker processes
I tried varying number of processes (5 and 10) and number of partitions(1 and 5). I get best performance with 5 processes and 1 partition (44.2 MBps). With 5 processes and 5 partitions I am getting around 38 MBps.
How are you setting up the topic, and putting data into kafka? How are you putting data into kafka? I notice that you are sinking back to kafka again, but this is not a batched method; would suggest not doing anything with the messages to try to measure the read throughput alone.
I am creating continuous stream of data in the following two steps
- Writing to a base topic from a python script at 10 msgs/sec.
- Set up a spark job to read from base topic and explode(using an exploding constant) to a different topic.
Sinking to Kafka is not batched because we log only one record for each batch.
I decided to try a test outside of Dask and Streamz to see what numbers could be achieved. I inserted 10,000,000 1kb messages into a topic with 20 partitions into Kafka that is running in a container on my local machine. Note, that this topic has a bit more than 10 million messages as I started to run the insert script more than once. This machine is a bit old: Intel(R) Core(TM) i7-4770K CPU @ 3.50GHz. 4 cores (8 hyper-threaded), 32 GB of RAM.
I'm using the Consumer.consume() method in the Kafka consumer to read messages in batches. This code is simple. It creates one thread per partition. Each thread gets its own consumer as a consumer can only be assigned one set of partitions. Each thread iterates over the messages in its given partition. Unless I'm misunderstanding something, the threads here should map to tasks in Dask.
I'm consistently getting read rates of over 600 MB/s unless this script is wrong somewhere.
$ python main.py
read: 10141980 msgs 10141980000 b in: 15.36656379699707 s rate: 629.4280212077073 mb/s
$ python main.py
read: 10141980 msgs 10141980000 b in: 14.956122636795044 s rate: 646.70142645865 mb/s
$ python main.py
read: 10141980 msgs 10141980000 b in: 14.976755619049072 s rate: 645.8104872328803 mb/s
from concurrent.futures.thread import ThreadPoolExecutor
from time import time
import confluent_kafka as ck
def main():
topic = 'metrics2'
batch_size = 1000
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
}
start = time()
master_consumer = ck.Consumer(conf)
parts_metadata = master_consumer.list_topics(topic).topics[topic].partitions
total_offset = [0]
def r(part_no):
c = ck.Consumer(conf)
tp = ck.TopicPartition(topic, part_no)
low, high = c.get_watermark_offsets(tp)
tp = ck.TopicPartition(topic, part_no, low)
last_offset = low
c.assign([tp])
while True:
for m in c.consume(min(batch_size, high - last_offset)):
last_offset = m.offset()
if last_offset >= high - batch_size:
total_offset[0] += last_offset
break
with ThreadPoolExecutor(max_workers=len(parts_metadata)) as e:
for p in range(len(parts_metadata)):
e.submit(r, p)
took = time() - start
rate = (total_offset[0] * 1000) / 1024 / 1024 / took
print('read:', total_offset[0], 'msgs', 1000 * total_offset[0], 'b', 'in:', took, 's', 'rate:', rate, 'mb/s')
if __name__ == '__main__':
main()
One other thing I should note is that .consume(...) will block if the batch is not full. Therefore, I exit when last_offset >= high - batch_size.
In the original code, I still think that to_kafka can be a problem, because it forces upstream to wait until the message is sent and receipt acknowledged. You should at least have a buffer stage before it.
I would love to have a go at accelerating things up towards the "raw" example, but still need a script to fill the message queue, ideally that can be called in the context of kafka alone (e.g., the spotify/kafka docker container). I would start by looking on the distributed dashboard, to see how many tasks are really running in parallel and what is taking the majority of the time in the workers.
I think we can sink to a list instead of writing to kafka. This list may be appended just a few values(equal to number of partitions) every few seconds(poll_interval).
Also, for the sake of this experiment, can we try to read from a standalone topic (with some pre-written data) and create fixed size batches if the number of offsets is very high?.
Something like in FromKafkaBatched:
current_position = self.positions[partition]
lowest = max(current_position, low)
max_batch_size = 100000
while lowest < high:
curr_high = min(lowest + max_batch_size, high)
out.append((self.consumer_params, self.topic, partition,
lowest, curr_high-1))
lowest = curr_high
It seems like calling .value() has a significant impact on performance. I know there's no way around this. We need the value of the message. But, I'm just going to post this here for info.
With call to .value():
read: 9999999 msgs 9999999000 b in: 24.51851987838745 s rate: 388.96076344292777 mb/s
(venv) [jsmaupin@yard79 from-kafka-tests]$ python main.py
Without call to .value():
read: 9999999 msgs 9999999000 b in: 10.61636209487915 s rate: 898.3060416701757 mb/s
(venv) [jsmaupin@yard79 from-kafka-tests]$ python main.py
Maybe the message is not actually fetched until .value(), I don't know how this is done in the C-level library. Indeed, I don't think it makes any sense to avoid it, but interesting nevertheless.
My guess is that the message is fetched by the c-extensions, but not handed to the Python runtime until .value() is called.
(note that this repo just moved to its own org, don't be surprised by the redirect)
(24.51851987838745 - 10.61636209487915) / 9999999 = 1.39us to transform a few bytes, I suppose possible.
Note that your code uses .consume(), but the streamz implementation in get_message_batch uses .poll(0). The latter always gets one message at a time, the former gets a number of messages as a list.
Would anyone like to try with the following:
def get_message_batch(kafka_params, topic, partition, low, high, timeout=None):
"""Fetch a batch of kafka messages in given topic/partition
This will block until messages are available, or timeout is reached.
"""
import confluent_kafka as ck
t0 = time.time()
consumer = ck.Consumer(kafka_params)
tp = ck.TopicPartition(topic, partition, low)
consumer.assign([tp])
out = []
try:
while True:
msg = consumer.poll(0.01)
if msg:
v = msg.value()
if v and msg.error() is None:
off = msg.offset()
if high >= off:
out.append(v)
if high <= off:
break
if timeout is not None and time.time() - t0 > timeout:
break
finally:
pass
return out
Note that this does not close the consumer, which is actually what takes by far most of the time. It does use .poll(), though, which maybe should be .consume(), which can do the num-messages and timeout in one place. This leads back to the suggestion of the consumer factory ( see #233 ).
I've written another version of my script above that uses Dask. I'm waiting for a new pre-populating script I wrote to finish running. I can try yours right after.
@martindurant I am getting much better results with multiple workers. I got throughput close to 200 MBps reading from standalone topic (with 7GB of data). I can try removing .close() to see if it works better. Also, poll performs almost as good as consume as kafka internally uses a buffer. In fact better results with poll in some cases.
It seems that nearly the same script I had before, but converted to Dask takes significantly longer. It does not even finish. I am new to Dask, so maybe someone can help me out here. One thing I have noticed is that the tasks do not run in a sequential order with respect to the offset.
from concurrent.futures.thread import ThreadPoolExecutor
from random import shuffle
from distributed import Client, LocalCluster
from time import time
import confluent_kafka as ck
def main():
topic = 'metrics4'
batch_size = 1000
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
}
cluster = LocalCluster(n_workers=20)
client = Client(cluster)
start = time()
master_consumer = ck.Consumer(conf)
parts_metadata = master_consumer.list_topics(topic).topics[topic].partitions
def r(v):
print(v)
part_no, offset = v
c = ck.Consumer(conf)
tp = ck.TopicPartition(topic, part_no, offset)
c.assign([tp])
for m in c.consume(batch_size):
m.offset()
parts = []
for part_no in range(len(parts_metadata)):
low, high = master_consumer.get_watermark_offsets(ck.TopicPartition(topic, part_no))
for offset in range(low, high, batch_size):
parts.append((part_no, offset))
fut = client.scatter(parts, broadcast=True)
fut_res = [client.submit(r, f) for f in fut]
[f.result() for f in fut_res]
took = time() - start
count = (len(parts) * 1000)
rate = count / 1024 / 1024 / took
print('read:', count, 'msgs', 1024 * count, 'b', 'in:', took, 's', 'rate:', rate, 'mb/s')
if __name__ == '__main__':
main()
Sample output of (partition number, offset):
(1, 375000)
(1, 202000)
(1, 164000)
(1, 77000)
(1, 83000)
(1, 34000)
(1, 405000)
(1, 38000)
(1, 89000)
(1, 156000)
(1, 71000)
(1, 86000)
(1, 53000)
(1, 408000)
(1, 355000)
(1, 182000)
(1, 92000)
(1, 67000)
(1, 120000)
(1, 110000)
(1, 57000)
(1, 61000)
(1, 63000)
(1, 385000)
(1, 136000)
(1, 66000)
(1, 43000)
(1, 394000)
(1, 33000)
(1, 398000)
(1, 335000)
(1, 144000)
(1, 69000)
(1, 47000)
(1, 51000)
(1, 37000)
(1, 41000)
(1, 365000)
(1, 388000)
Ok, I've changed it so that it is 1 partition per task. I'm getting almost 700 Mb/s.
from time import time
import confluent_kafka as ck
from distributed import Client, LocalCluster
def main():
topic = 'metrics4'
batch_size = 10000
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
}
cluster = LocalCluster(n_workers=1, threads_per_worker=20)
client = Client(cluster)
start = time()
master_consumer = ck.Consumer(conf)
parts_metadata = master_consumer.list_topics(topic).topics[topic].partitions
def r(v):
print(v)
part_no, low, high = v
c = ck.Consumer(conf)
tp = ck.TopicPartition(topic, part_no, low)
last_offset = low
c.assign([tp])
while True:
for m in c.consume(min(batch_size, high - last_offset)):
last_offset = m.offset()
m.value()
if last_offset >= high - batch_size:
break
print('Creating partition sets')
parts = []
for part_no in range(len(parts_metadata)):
low, high = master_consumer.get_watermark_offsets(ck.TopicPartition(topic, part_no))
parts.append((part_no, low, high))
print('Partition set count:', len(parts))
print('Scattering')
fut = client.scatter(parts, broadcast=True)
print('Submitting')
fut_res = [client.submit(r, f) for f in fut]
print('Gathering results')
[f.result() for f in fut_res]
took = time() - start
count = sum(x[2] for x in parts)
rate = (count / 1024) / took
print('read:', count, 'msgs', 1024 * count, 'b', 'in:', took, 's', 'rate:', rate, 'mb/s')
if __name__ == '__main__':
main()
$ python dask_main.py
Creating partition sets
Partition set count: 20
Scattering
Submitting
Gathering results
(11, 0, 500696)
(10, 0, 499223)
(9, 0, 500215)
(8, 0, 500114)
(7, 0, 499904)
(6, 0, 500022)
(5, 0, 498319)
(4, 0, 501291)
(3, 0, 499556)
(2, 0, 499730)
(1, 0, 498943)
(0, 0, 499825)
(19, 0, 500231)
(18, 0, 501669)
(17, 0, 499241)
(16, 0, 499884)
(15, 0, 501094)
(14, 0, 499997)
(13, 0, 499913)
(12, 0, 500133)
read: 10000000 msgs 10240000000 b in: 14.280932426452637 s rate: 683.8226460557357 mb/s
After changing the above to use 20 workers and only 1 task, I'm seeing rates of over 1GB/s.
read: 10000000 msgs 10240000000 b in: 9.476426839828491 s rate: 1030.517637613793 mb/s
I think the idea here is that the normal usage pattern of Kafka is one thread per partition. The unit of parallelism in Kafka is the partition. We may be running into issues by having several workers shuffle around on different partitions.
@skmatti , is this improved now? Can you post your results?