streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Issues scaling kafka consumers with Dask

Open skmatti opened this issue 6 years ago • 20 comments

I have run two simple pipelines(Non-dask and Dask) to read stream of messages from Kafka using Streamz FromKafkaBatched method. I seem to get similar throughput for both the pipelines.

Non-Dask pipeline:

from distributed import Client
from streamz import Stream
import time
import json

def preprocess(messages):
    start_time = int(round(time.time()))
    no_of_rows = len(messages)
    if no_of_rows > 0:
        size = no_of_rows*len(messages[0])
        kafka_timestamp = json.loads(messages[0].decode('utf-8'))['timestamp']
    else:
        size = 0 
        kafka_timestamp = start_time
    end_time = int(round(time.time()))
    return "{},{},{},{},{}".format(no_of_rows, kafka_timestamp, start_time, end_time, size)


topic = "topic-1"
bootstrap_servers = 'localhost:9092'
consumer_conf = {'bootstrap.servers': bootstrap_servers,
        'group.id': 'group_1', 'session.timeout.ms': 60000}


stream = Stream.from_kafka_batched(topic, consumer_conf, poll_interval='10s',
                     npartitions=5, asynchronous = True, dask= False)

kafka_out = stream.map(preprocess).to_kafka('test-out', consumer_conf)
kafka_out.flush()

The pipeline gives maximum throughput = ~43 MBps (topic partitions = 5). Throughput computation = (Σ size) / (max(end_time) - min(start_time))

By setting dask=True the above pipeline is executed on a local dask cluster

Starting Dask Cluster:

# dask imports 
from distributed import Client, LocalCluster

# dask client 
cluster = LocalCluster(ip="0.0.0.0", scheduler_port=8786, diagnostics_port = 8787, processes=False, threads_per_worker=10, n_workers=1)
client = Client(cluster)
client.get_versions(check=True)

The pipeline with Dask gives maximum throughput of ~44.8 MBps when number of partitions in the topic = 1. Increasing the number of partitions in a topic decreases the performance.

My understanding from these experimentations is that confluent kafka performs as expected with single Dask thread (when number of partitions is 1) but performs poorly with multiple dask threads or multiple partitions. This may be due to underlying tornado async library. It would be great if experts help me understand this better as I am fairly new to async programming.

Hardware and Software Specs: 10 CPU cores, 30 GB RAM Ubuntu 16.04, Streamz(latest-build off latest code base) Kafka 2.2.0, confluent-kafka-python 1.0.0

skmatti avatar Apr 09 '19 19:04 skmatti

I would be interested to see how this changes if you run with multiple worker processes

martindurant avatar Apr 09 '19 19:04 martindurant

I tried varying number of processes (5 and 10) and number of partitions(1 and 5). I get best performance with 5 processes and 1 partition (44.2 MBps). With 5 processes and 5 partitions I am getting around 38 MBps.

skmatti avatar Apr 09 '19 22:04 skmatti

How are you setting up the topic, and putting data into kafka? How are you putting data into kafka? I notice that you are sinking back to kafka again, but this is not a batched method; would suggest not doing anything with the messages to try to measure the read throughput alone.

martindurant avatar Apr 09 '19 22:04 martindurant

I am creating continuous stream of data in the following two steps

  1. Writing to a base topic from a python script at 10 msgs/sec.
  2. Set up a spark job to read from base topic and explode(using an exploding constant) to a different topic.

Sinking to Kafka is not batched because we log only one record for each batch.

skmatti avatar Apr 09 '19 22:04 skmatti

I decided to try a test outside of Dask and Streamz to see what numbers could be achieved. I inserted 10,000,000 1kb messages into a topic with 20 partitions into Kafka that is running in a container on my local machine. Note, that this topic has a bit more than 10 million messages as I started to run the insert script more than once. This machine is a bit old: Intel(R) Core(TM) i7-4770K CPU @ 3.50GHz. 4 cores (8 hyper-threaded), 32 GB of RAM.

I'm using the Consumer.consume() method in the Kafka consumer to read messages in batches. This code is simple. It creates one thread per partition. Each thread gets its own consumer as a consumer can only be assigned one set of partitions. Each thread iterates over the messages in its given partition. Unless I'm misunderstanding something, the threads here should map to tasks in Dask.

I'm consistently getting read rates of over 600 MB/s unless this script is wrong somewhere.

$ python main.py
read: 10141980 msgs 10141980000 b in: 15.36656379699707 s rate: 629.4280212077073 mb/s
$ python main.py
read: 10141980 msgs 10141980000 b in: 14.956122636795044 s rate: 646.70142645865 mb/s
$ python main.py
read: 10141980 msgs 10141980000 b in: 14.976755619049072 s rate: 645.8104872328803 mb/s
from concurrent.futures.thread import ThreadPoolExecutor
from time import time

import confluent_kafka as ck


def main():
    topic = 'metrics2'
    batch_size = 1000

    conf = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'my-group'
    }

    start = time()
    master_consumer = ck.Consumer(conf)
    parts_metadata = master_consumer.list_topics(topic).topics[topic].partitions

    total_offset = [0]

    def r(part_no):
        c = ck.Consumer(conf)
        tp = ck.TopicPartition(topic, part_no)
        low, high = c.get_watermark_offsets(tp)
        tp = ck.TopicPartition(topic, part_no, low)

        last_offset = low
        c.assign([tp])
        while True:
            for m in c.consume(min(batch_size, high - last_offset)):
                last_offset = m.offset()

            if last_offset >= high - batch_size:
                total_offset[0] += last_offset
                break

    with ThreadPoolExecutor(max_workers=len(parts_metadata)) as e:
        for p in range(len(parts_metadata)):
            e.submit(r, p)

    took = time() - start
    rate = (total_offset[0] * 1000) / 1024 / 1024 / took
    print('read:', total_offset[0], 'msgs', 1000 * total_offset[0], 'b', 'in:', took, 's', 'rate:', rate, 'mb/s')


if __name__ == '__main__':
    main()

jsmaupin avatar Apr 09 '19 22:04 jsmaupin

One other thing I should note is that .consume(...) will block if the batch is not full. Therefore, I exit when last_offset >= high - batch_size.

jsmaupin avatar Apr 09 '19 22:04 jsmaupin

In the original code, I still think that to_kafka can be a problem, because it forces upstream to wait until the message is sent and receipt acknowledged. You should at least have a buffer stage before it.

I would love to have a go at accelerating things up towards the "raw" example, but still need a script to fill the message queue, ideally that can be called in the context of kafka alone (e.g., the spotify/kafka docker container). I would start by looking on the distributed dashboard, to see how many tasks are really running in parallel and what is taking the majority of the time in the workers.

martindurant avatar Apr 10 '19 14:04 martindurant

I think we can sink to a list instead of writing to kafka. This list may be appended just a few values(equal to number of partitions) every few seconds(poll_interval).

Also, for the sake of this experiment, can we try to read from a standalone topic (with some pre-written data) and create fixed size batches if the number of offsets is very high?.

Something like in FromKafkaBatched:

current_position = self.positions[partition]
lowest = max(current_position, low)
max_batch_size = 100000
while lowest < high:
    curr_high = min(lowest + max_batch_size, high)
    out.append((self.consumer_params, self.topic, partition,
                lowest, curr_high-1))
    lowest = curr_high

skmatti avatar Apr 10 '19 17:04 skmatti

It seems like calling .value() has a significant impact on performance. I know there's no way around this. We need the value of the message. But, I'm just going to post this here for info.

With call to .value():

read: 9999999 msgs 9999999000 b in: 24.51851987838745 s rate: 388.96076344292777 mb/s
(venv) [jsmaupin@yard79 from-kafka-tests]$ python main.py

Without call to .value():

read: 9999999 msgs 9999999000 b in: 10.61636209487915 s rate: 898.3060416701757 mb/s
(venv) [jsmaupin@yard79 from-kafka-tests]$ python main.py

jsmaupin avatar Apr 10 '19 19:04 jsmaupin

Maybe the message is not actually fetched until .value(), I don't know how this is done in the C-level library. Indeed, I don't think it makes any sense to avoid it, but interesting nevertheless.

martindurant avatar Apr 10 '19 19:04 martindurant

My guess is that the message is fetched by the c-extensions, but not handed to the Python runtime until .value() is called.

jsmaupin avatar Apr 10 '19 19:04 jsmaupin

(note that this repo just moved to its own org, don't be surprised by the redirect)

martindurant avatar Apr 10 '19 19:04 martindurant

(24.51851987838745 - 10.61636209487915) / 9999999 = 1.39us to transform a few bytes, I suppose possible.

Note that your code uses .consume(), but the streamz implementation in get_message_batch uses .poll(0). The latter always gets one message at a time, the former gets a number of messages as a list.

martindurant avatar Apr 10 '19 19:04 martindurant

Would anyone like to try with the following:

def get_message_batch(kafka_params, topic, partition, low, high, timeout=None):
    """Fetch a batch of kafka messages in given topic/partition

    This will block until messages are available, or timeout is reached.
    """
    import confluent_kafka as ck
    t0 = time.time()
    consumer = ck.Consumer(kafka_params)
    tp = ck.TopicPartition(topic, partition, low)
    consumer.assign([tp])
    out = []
    try:
        while True:
            msg = consumer.poll(0.01)
            if msg:
                v = msg.value()
                if v and msg.error() is None:
                    off = msg.offset()
                    if high >= off:
                        out.append(v)
                    if high <= off:
                        break
            if timeout is not None and time.time() - t0 > timeout:
                break
    finally:
        pass
    return out

Note that this does not close the consumer, which is actually what takes by far most of the time. It does use .poll(), though, which maybe should be .consume(), which can do the num-messages and timeout in one place. This leads back to the suggestion of the consumer factory ( see #233 ).

martindurant avatar Apr 10 '19 20:04 martindurant

I've written another version of my script above that uses Dask. I'm waiting for a new pre-populating script I wrote to finish running. I can try yours right after.

jsmaupin avatar Apr 10 '19 20:04 jsmaupin

@martindurant I am getting much better results with multiple workers. I got throughput close to 200 MBps reading from standalone topic (with 7GB of data). I can try removing .close() to see if it works better. Also, poll performs almost as good as consume as kafka internally uses a buffer. In fact better results with poll in some cases.

skmatti avatar Apr 10 '19 21:04 skmatti

It seems that nearly the same script I had before, but converted to Dask takes significantly longer. It does not even finish. I am new to Dask, so maybe someone can help me out here. One thing I have noticed is that the tasks do not run in a sequential order with respect to the offset.

from concurrent.futures.thread import ThreadPoolExecutor
from random import shuffle

from distributed import Client, LocalCluster
from time import time

import confluent_kafka as ck


def main():
   topic = 'metrics4'
   batch_size = 1000

   conf = {
       'bootstrap.servers': 'localhost:9092',
       'group.id': 'my-group'
   }

   cluster = LocalCluster(n_workers=20)
   client = Client(cluster)

   start = time()
   master_consumer = ck.Consumer(conf)
   parts_metadata = master_consumer.list_topics(topic).topics[topic].partitions

   def r(v):
       print(v)
       part_no, offset = v
       c = ck.Consumer(conf)
       tp = ck.TopicPartition(topic, part_no, offset)

       c.assign([tp])
       for m in c.consume(batch_size):
           m.offset()

   parts = []
   for part_no in range(len(parts_metadata)):
       low, high = master_consumer.get_watermark_offsets(ck.TopicPartition(topic, part_no))
       for offset in range(low, high, batch_size):
           parts.append((part_no, offset))

   fut = client.scatter(parts, broadcast=True)
   fut_res = [client.submit(r, f) for f in fut]
   [f.result() for f in fut_res]

   took = time() - start
   count = (len(parts) * 1000)
   rate = count / 1024 / 1024 / took
   print('read:', count, 'msgs', 1024 * count, 'b', 'in:', took, 's', 'rate:', rate, 'mb/s')


if __name__ == '__main__':
   main()

Sample output of (partition number, offset):

(1, 375000)
(1, 202000)
(1, 164000)
(1, 77000)
(1, 83000)
(1, 34000)
(1, 405000)
(1, 38000)
(1, 89000)
(1, 156000)
(1, 71000)
(1, 86000)
(1, 53000)
(1, 408000)
(1, 355000)
(1, 182000)
(1, 92000)
(1, 67000)
(1, 120000)
(1, 110000)
(1, 57000)
(1, 61000)
(1, 63000)
(1, 385000)
(1, 136000)
(1, 66000)
(1, 43000)
(1, 394000)
(1, 33000)
(1, 398000)
(1, 335000)
(1, 144000)
(1, 69000)
(1, 47000)
(1, 51000)
(1, 37000)
(1, 41000)
(1, 365000)
(1, 388000)

jsmaupin avatar Apr 10 '19 21:04 jsmaupin

Ok, I've changed it so that it is 1 partition per task. I'm getting almost 700 Mb/s.

from time import time

import confluent_kafka as ck
from distributed import Client, LocalCluster


def main():
    topic = 'metrics4'
    batch_size = 10000

    conf = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'my-group'
    }

    cluster = LocalCluster(n_workers=1, threads_per_worker=20)
    client = Client(cluster)

    start = time()
    master_consumer = ck.Consumer(conf)
    parts_metadata = master_consumer.list_topics(topic).topics[topic].partitions

    def r(v):
        print(v)
        part_no, low, high = v
        c = ck.Consumer(conf)
        tp = ck.TopicPartition(topic, part_no, low)

        last_offset = low
        c.assign([tp])
        while True:
            for m in c.consume(min(batch_size, high - last_offset)):
                last_offset = m.offset()
                m.value()

            if last_offset >= high - batch_size:
                break

    print('Creating partition sets')
    parts = []
    for part_no in range(len(parts_metadata)):
        low, high = master_consumer.get_watermark_offsets(ck.TopicPartition(topic, part_no))
        parts.append((part_no, low, high))
    print('Partition set count:', len(parts))

    print('Scattering')
    fut = client.scatter(parts, broadcast=True)

    print('Submitting')
    fut_res = [client.submit(r, f) for f in fut]

    print('Gathering results')
    [f.result() for f in fut_res]

    took = time() - start
    count = sum(x[2] for x in parts)
    rate = (count / 1024) / took
    print('read:', count, 'msgs', 1024 * count, 'b', 'in:', took, 's', 'rate:', rate, 'mb/s')


if __name__ == '__main__':
    main()
$ python dask_main.py 
Creating partition sets
Partition set count: 20
Scattering
Submitting
Gathering results
(11, 0, 500696)
(10, 0, 499223)
(9, 0, 500215)
(8, 0, 500114)
(7, 0, 499904)
(6, 0, 500022)
(5, 0, 498319)
(4, 0, 501291)
(3, 0, 499556)
(2, 0, 499730)
(1, 0, 498943)
(0, 0, 499825)
(19, 0, 500231)
(18, 0, 501669)
(17, 0, 499241)
(16, 0, 499884)
(15, 0, 501094)
(14, 0, 499997)
(13, 0, 499913)
(12, 0, 500133)
read: 10000000 msgs 10240000000 b in: 14.280932426452637 s rate: 683.8226460557357 mb/s

jsmaupin avatar Apr 10 '19 22:04 jsmaupin

After changing the above to use 20 workers and only 1 task, I'm seeing rates of over 1GB/s.

read: 10000000 msgs 10240000000 b in: 9.476426839828491 s rate: 1030.517637613793 mb/s

I think the idea here is that the normal usage pattern of Kafka is one thread per partition. The unit of parallelism in Kafka is the partition. We may be running into issues by having several workers shuffle around on different partitions.

jsmaupin avatar Apr 10 '19 22:04 jsmaupin

@skmatti , is this improved now? Can you post your results?

jsmaupin avatar Apr 18 '19 17:04 jsmaupin