From 38dfc0d6c5f54d1e72748b72f4af08141ddfe76c Mon Sep 17 00:00:00 2001 From: Stephen Dawson-Haggerty Date: Fri, 17 May 2019 09:23:03 -0700 Subject: [PATCH 1/8] I'd like to get some feedback on this -- the basics seem pretty straightforward and I'd like see if there's anything I missed. We need to figure out what to do with the throttle time, and see if this breaks the consumer. --- afkak/common.py | 10 +++++----- afkak/kafkacodec.py | 28 +++++++++++++++++----------- afkak/producer.py | 3 +++ 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/afkak/common.py b/afkak/common.py index 9398d368..fa559aec 100644 --- a/afkak/common.py +++ b/afkak/common.py @@ -63,7 +63,7 @@ # Response payloads ProduceResponse = namedtuple("ProduceResponse", - ["topic", "partition", "error", "offset"]) + ["topic", "partition", "error", "offset", "throttle_time"]) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) @@ -100,11 +100,11 @@ "SourcedMessage", TopicAndPartition._fields + OffsetAndMessage._fields) -class Message(namedtuple("Message", ["magic", "attributes", "key", "value"])): +class Message(namedtuple("Message", ["magic", "attributes", "key", "value", "timestamp"])): """ - A Kafka `message`_ in format 0. + A Kafka `message`_ in format 1. - :ivar int magic: Message format version, always 0. + :ivar int magic: Message format version, always 1. :ivar int attributes: Compression flags. :ivar bytes key: Message key, or ``None`` when the message lacks a key. @@ -117,7 +117,7 @@ class Message(namedtuple("Message", ["magic", "attributes", "key", "value"])): __slots__ = () def __repr__(self): - bits = [' Crc MagicByte Attributes Key Value Crc => int32 @@ -160,8 +160,8 @@ def _encode_message(cls, message): Key => bytes Value => bytes """ - if message.magic == 0: - msg = struct.pack('>BB', message.magic, message.attributes) + if message.magic == 1: + msg = struct.pack('>BBq', message.magic, message.attributes, message.timestamp) msg += write_int_string(message.key) msg += write_int_string(message.value) crc = zlib.crc32(msg) & 0xffffffff # Ensure unsigned @@ -284,7 +284,7 @@ def encode_produce_request(cls, client_id, correlation_id, grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, - KafkaCodec.PRODUCE_KEY) + KafkaCodec.PRODUCE_KEY, api_version=1) message += struct.pack('>hii', acks, timeout, len(grouped_payloads)) @@ -313,9 +313,9 @@ def decode_produce_response(cls, data): topic, cur = read_short_ascii(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) for _i in range(num_partitions): - ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) + ((partition, error, offset, throttle_time), cur) = relative_unpack('>ihql', data, cur) - yield ProduceResponse(topic, partition, error, offset) + yield ProduceResponse(topic, partition, error, offset, throttle_time) @classmethod def encode_fetch_request(cls, client_id, correlation_id, payloads=None, @@ -630,7 +630,7 @@ def decode_offset_fetch_response(cls, data): metadata, error) -def create_message(payload, key=None): +def create_message(m, key=None): """ Construct a :class:`Message` @@ -640,9 +640,15 @@ def create_message(payload, key=None): determine message identity on a compacted topic. :type key: :class:`bytes` or ``None`` """ - assert payload is None or isinstance(payload, bytes), 'payload={!r} should be bytes or None'.format(payload) + assert (m is None or isinstance(m, bytes) or isinstance(m, tuple), + 'payload={!r} should be bytes, a (timestamp, bytes) tuple, or None'.format(m)) assert key is None or isinstance(key, bytes), 'key={!r} should be bytes or None'.format(key) - return Message(0, 0, key, payload) + if isinstance(m, tuple): + ts, payload = m + else: + payload = m + + return Message(1, 0, key, payload, ts) def create_gzip_message(message_set): @@ -657,7 +663,7 @@ def create_gzip_message(message_set): encoded_message_set = KafkaCodec._encode_message_set(message_set) gzipped = gzip_encode(encoded_message_set) - return Message(0, CODEC_GZIP, None, gzipped) + return Message(1, CODEC_GZIP, None, gzipped, -1) def create_snappy_message(message_set): @@ -671,7 +677,7 @@ def create_snappy_message(message_set): """ encoded_message_set = KafkaCodec._encode_message_set(message_set) snapped = snappy_encode(encoded_message_set) - return Message(0, CODEC_SNAPPY, None, snapped) + return Message(1, CODEC_SNAPPY, None, snapped, -1) def create_message_set(requests, codec=CODEC_NONE): diff --git a/afkak/producer.py b/afkak/producer.py index fddca7e2..9b373814 100644 --- a/afkak/producer.py +++ b/afkak/producer.py @@ -212,6 +212,9 @@ def send_messages(self, topic, key=None, msgs=()): if m is None: continue + if isinstance(m, tuple): + ts, m = m + if not isinstance(m, bytes): raise TypeError('Message {} to topic {} ({!r:.100}) has type {}, but must have type {}'.format( index, topic, m, type(m).__name__, type(bytes).__name__)) From d38e71c0b01eac53b2f9a03806667d5632b9de0f Mon Sep 17 00:00:00 2001 From: Stephen Dawson-Haggerty Date: Fri, 17 May 2019 09:26:27 -0700 Subject: [PATCH 2/8] Missing timestamp --- afkak/kafkacodec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/afkak/kafkacodec.py b/afkak/kafkacodec.py index 0c604993..5ad9ecc2 100644 --- a/afkak/kafkacodec.py +++ b/afkak/kafkacodec.py @@ -646,7 +646,7 @@ def create_message(m, key=None): if isinstance(m, tuple): ts, payload = m else: - payload = m + ts, payload = -1, m return Message(1, 0, key, payload, ts) From c1481b09605258f35fc0036298db246c433ce4b3 Mon Sep 17 00:00:00 2001 From: Stephen Dawson-Haggerty Date: Fri, 17 May 2019 09:42:55 -0700 Subject: [PATCH 3/8] Set producer timestamp if not provided --- afkak/kafkacodec.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/afkak/kafkacodec.py b/afkak/kafkacodec.py index 5ad9ecc2..a2c96422 100644 --- a/afkak/kafkacodec.py +++ b/afkak/kafkacodec.py @@ -4,6 +4,7 @@ from __future__ import absolute_import +import time import logging import struct import zlib @@ -646,7 +647,7 @@ def create_message(m, key=None): if isinstance(m, tuple): ts, payload = m else: - ts, payload = -1, m + ts, payload = int(time.time() * 1000), m return Message(1, 0, key, payload, ts) From 0e15343a66ed4ca63a02005700ed81f45ed2c2fc Mon Sep 17 00:00:00 2001 From: Stephen Dawson-Haggerty Date: Fri, 17 May 2019 12:59:01 -0700 Subject: [PATCH 4/8] Fix some tests --- afkak/common.py | 6 ++++-- afkak/kafkacodec.py | 4 ++-- afkak/test/test_kafkacodec.py | 28 ++++++++++++++-------------- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/afkak/common.py b/afkak/common.py index fa559aec..3b8701d0 100644 --- a/afkak/common.py +++ b/afkak/common.py @@ -64,6 +64,7 @@ # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset", "throttle_time"]) +ProduceResponse.__new__.func_defaults = (None, None, None, None, -1) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) @@ -102,7 +103,7 @@ class Message(namedtuple("Message", ["magic", "attributes", "key", "value", "timestamp"])): """ - A Kafka `message`_ in format 1. + A Kafka `message`_ in format 0 or 1. :ivar int magic: Message format version, always 1. :ivar int attributes: Compression flags. @@ -117,7 +118,7 @@ class Message(namedtuple("Message", ["magic", "attributes", "key", "value", "tim __slots__ = () def __repr__(self): - bits = ['') return ''.join(bits) +Message.__new__.func_defaults = (None, None, None, None, -1) ################# diff --git a/afkak/kafkacodec.py b/afkak/kafkacodec.py index a2c96422..104db1f3 100644 --- a/afkak/kafkacodec.py +++ b/afkak/kafkacodec.py @@ -631,7 +631,7 @@ def decode_offset_fetch_response(cls, data): metadata, error) -def create_message(m, key=None): +def create_message(m, key=None, magic=1): """ Construct a :class:`Message` @@ -649,7 +649,7 @@ def create_message(m, key=None): else: ts, payload = int(time.time() * 1000), m - return Message(1, 0, key, payload, ts) + return Message(magic, 0, key, payload, ts) def create_gzip_message(message_set): diff --git a/afkak/test/test_kafkacodec.py b/afkak/test/test_kafkacodec.py index f64a931e..bf32077b 100644 --- a/afkak/test/test_kafkacodec.py +++ b/afkak/test/test_kafkacodec.py @@ -181,11 +181,11 @@ def test_encode_message_header(self): self.assertEqual(encoded, expect) def test_encode_message(self): - message = create_message(b"test", b"key") + message = create_message((47, b"test"), b"key", magic=1) encoded = KafkaCodec._encode_message(message) expect = b"".join([ - struct.pack(">i", -1427009701), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", -1232077792), # CRC + struct.pack(">bbq", 1, 0, 47), # Magic, flags, timestamp struct.pack(">i", 3), # Length of key b"key", # key struct.pack(">i", 4), # Length of value @@ -209,7 +209,7 @@ def test_decode_message(self): KafkaCodec._decode_message(encoded, offset))[0] self.assertEqual(returned_offset, offset) - self.assertEqual(decoded_message, create_message(b"test", b"key")) + self.assertEqual(decoded_message, create_message((-1, b"test"), b"key", magic=0)) def test_encode_message_failure(self): self.assertRaises(ProtocolError, @@ -314,11 +314,11 @@ def test_decode_message_snappy(self): returned_offset1, decoded_message1 = msg1 self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1")) + self.assertEqual(decoded_message1, create_message((-1, b"v1"), magic=0)) returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset2, 0) - self.assertEqual(decoded_message2, create_message(b"v2")) + self.assertEqual(decoded_message2, create_message((-1, b"v2"), magic=0)) def test_decode_message_checksum_error(self): invalid_encoded_message = b"This is not a valid encoded message" @@ -361,10 +361,10 @@ def test_decode_message_set_stop_iteration(self): returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1", b"k1")) + self.assertEqual(decoded_message1, create_message((-1, b"v1"), b"k1", magic=0)) self.assertEqual(returned_offset2, 1) - self.assertEqual(decoded_message2, create_message(b"v2", b"k2")) + self.assertEqual(decoded_message2, create_message((-1, b"v2"), b"k2", magic=0)) def test_get_response_correlation_id(self): t1 = b"topic1" @@ -435,14 +435,14 @@ def test_encode_produce_request(self): def test_decode_produce_response(self): t1 = "topic1" t2 = u"topic2" - encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)), - 2, 2, len(t1), t1.encode(), 2, 0, 0, 10, 1, 1, 20, - len(t2), t2.encode(), 1, 0, 0, 30) + encoded = struct.pack('>iih%dsiihqlihqlh%dsiihql' % (len(t1), len(t2)), + 2, 2, len(t1), t1.encode(), 2, 0, 0, 10, 4, 1, 1, 20, 5, + len(t2), t2.encode(), 1, 0, 0, 30, 6) responses = list(KafkaCodec.decode_produce_response(encoded)) self.assertEqual(responses, - [ProduceResponse(t1, 0, 0, 10), - ProduceResponse(t1, 1, 1, 20), - ProduceResponse(t2, 0, 0, 30)]) + [ProduceResponse(t1, 0, 0, 10, 4), + ProduceResponse(t1, 1, 1, 20, 5), + ProduceResponse(t2, 0, 0, 30, 6)]) def test_encode_fetch_request(self): requests = [ From 7f0941c49e3bb867697e461d88910baab705d554 Mon Sep 17 00:00:00 2001 From: Stephen Dawson-Haggerty Date: Fri, 17 May 2019 15:24:10 -0700 Subject: [PATCH 5/8] Fix some encoding tests --- afkak/test/test_kafkacodec.py | 90 ++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/afkak/test/test_kafkacodec.py b/afkak/test/test_kafkacodec.py index bf32077b..87f5dcf1 100644 --- a/afkak/test/test_kafkacodec.py +++ b/afkak/test/test_kafkacodec.py @@ -104,63 +104,64 @@ def test_create_message(self): payload = b"test" key = b"key" msg = create_message(payload, key) - self.assertEqual(msg.magic, 0) + self.assertEqual(msg.magic, 1) self.assertEqual(msg.attributes, 0) self.assertEqual(msg.key, key) self.assertEqual(msg.value, payload) def test_create_gzip(self): - message_list = [create_message(b"v1", None), - create_message(b"v2", key=b'42')] + message_list = [create_message((-1, b"v1"), None), + create_message((-1, b"v2"), key=b'42')] + msg = create_gzip_message(message_list) - self.assertEqual(msg.magic, 0) + self.assertEqual(msg.magic, 1) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) self.assertEqual(msg.key, None) # Need to decode to check since gzipped payload is non-deterministic decoded = gzip_decode(msg.value) expect = b"".join([ struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 16), # MsgSet size - struct.pack(">i", 1285512130), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 24), # MsgSet size + struct.pack(">i", -346518548), # CRC + struct.pack(">bbq", 1, 0, -1), # Magic, flags, timestamp struct.pack(">i", -1), # -1 indicates a null key struct.pack(">i", 2), # Msg length (bytes) b"v1", # Message contents struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 18), # MsgSet size - struct.pack(">i", 1929437987), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 26), # MsgSet size + struct.pack(">i", -19620068), # CRC + struct.pack(">bbq", 1, 0, -1), # Magic, flags, timestamp struct.pack(">i2s", 2, b'42'), # Key is 2 bytes long, 42 struct.pack(">i", 2), # Msg length (bytes) b"v2", # Message contents - ]) - + ]) self.assertEqual(decoded, expect) def test_create_snappy(self): if not has_snappy(): raise SkipTest("Snappy not available") # pragma: no cover - message_list = [create_message(b"v3", key=b'84'), - create_message(b"v4", None)] + message_list = [create_message((47, b"v3"), key=b'84'), + create_message((48, b"v4"), None)] + msg = create_snappy_message(message_list) - self.assertEqual(msg.magic, 0) + self.assertEqual(msg.magic, 1) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) self.assertEqual(msg.key, None) decoded = snappy_decode(msg.value) expect = b"".join([ struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 18), # MsgSet size - struct.pack(">i", 813233088), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 26), # MsgSet size + struct.pack(">i", -892550681), # CRC + struct.pack(">bbq", 1, 0, 47), # Magic, flags struct.pack(">i2s", 2, b'84'), # Key is 2 bytes long, '84' struct.pack(">i", 2), # Msg length (bytes) b"v3", # Message contents struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 16), # MsgSet size - struct.pack(">i", 1022734157), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 24), # MsgSet size + struct.pack(">i", 100172533), # CRC + struct.pack(">bbq", 1, 0, 48), # Magic, flags struct.pack(">i", -1), # -1 indicates a null key struct.pack(">i", 2), # Msg length (bytes) b"v4", # Message contents @@ -214,29 +215,29 @@ def test_decode_message(self): def test_encode_message_failure(self): self.assertRaises(ProtocolError, KafkaCodec._encode_message, - Message(1, 0, "key", "test")) + Message(0, 0, "key", "test")) def test_encode_message_set(self): message_set = [ - create_message(b"v1", b"k1"), - create_message(b"v2", b"k2"), + create_message((-1, b"v1"), b"k1"), + create_message((-1, b"v2"), b"k2"), ] - + encoded = KafkaCodec._encode_message_set(message_set) expect = b"".join([ struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", 1474775406), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 26), # Msg Size + struct.pack(">i", -634178223), # CRC + struct.pack(">bbq", 1, 0, -1), # Magic, flags, timestamp struct.pack(">i", 2), # Length of key b"k1", # Key struct.pack(">i", 2), # Length of value b"v1", # Value struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", -16383415), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 26), # Msg Size + struct.pack(">i", 1926397558), # CRC + struct.pack(">bbq", 1, 0, -1), # Magic, flags, timestamp struct.pack(">i", 2), # Length of key b"k2", # Key struct.pack(">i", 2), # Length of value @@ -274,10 +275,10 @@ def test_decode_message_set(self): returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1", b"k1")) + self.assertEqual(decoded_message1, create_message((-1, b"v1"), b"k1", magic=0)) self.assertEqual(returned_offset2, 1) - self.assertEqual(decoded_message2, create_message(b"v2", b"k2")) + self.assertEqual(decoded_message2, create_message((-1, b"v2"), b"k2", magic=0)) def test_decode_message_gzip(self): gzip_encoded = (b'\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' @@ -293,11 +294,11 @@ def test_decode_message_gzip(self): returned_offset1, decoded_message1 = msg1 self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1")) + self.assertEqual(decoded_message1, create_message((-1, b"v1"), magic=0)) returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset2, 0) - self.assertEqual(decoded_message2, create_message(b"v2")) + self.assertEqual(decoded_message2, create_message((-1, b"v2"), magic=0)) def test_decode_message_snappy(self): if not has_snappy(): @@ -379,21 +380,21 @@ def test_get_response_correlation_id(self): def test_encode_produce_request(self): requests = [ ProduceRequest("topic1", 0, [ - create_message(b"a"), - create_message(b"b"), + create_message((-1, b"a")), + create_message((-1, b"b")), ]), ProduceRequest(u"topic2", 1, [ - create_message(b"c"), + create_message((-1, b"c")), ]), ] - msg_a_binary = KafkaCodec._encode_message(create_message(b"a")) - msg_b_binary = KafkaCodec._encode_message(create_message(b"b")) - msg_c_binary = KafkaCodec._encode_message(create_message(b"c")) + msg_a_binary = KafkaCodec._encode_message(create_message((-1, b"a"))) + msg_b_binary = KafkaCodec._encode_message(create_message((-1, b"b"))) + msg_c_binary = KafkaCodec._encode_message(create_message((-1, b"c"))) header = b"".join([ struct.pack('>h', 0), # Msg Header, Message type = Produce - struct.pack('>h', 0), # Msg Header, API version + struct.pack('>h', 1), # Msg Header, API version struct.pack('>i', 2), # Msg Header, Correlation ID struct.pack('>h7s', 7, b"client1"), # Msg Header, The client ID struct.pack('>h', 2), # Num acks required @@ -430,6 +431,7 @@ def test_encode_produce_request(self): encoded = KafkaCodec.encode_produce_request( b"client1", 2, requests, 2, 100) + self.assertIn(encoded, [expected1, expected2]) def test_decode_produce_response(self): @@ -494,8 +496,8 @@ def test_decode_fetch_response(self): encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' % (len(t1), len(ms1), len(ms2), len(t2), len(ms3)), - 4, 2, len(t1), t1.encode(), 2, 0, 0, 10, len(ms1), ms1, 1, - 1, 20, len(ms2), ms2, len(t2), t2.encode(), 1, 0, 0, 30, + 4, 2, len(t1), t1.encode(), 2, 0, 1, 10, len(ms1), ms1, 1, + 1, 20, len(ms2), ms2, len(t2), t2.encode(), 1, 0, 1, 30, len(ms3), ms3) responses = list(KafkaCodec.decode_fetch_response(encoded)) From 11c5b93dad9194e1dc47b50fc231416ea81c29c9 Mon Sep 17 00:00:00 2001 From: Stephen Dawson-Haggerty Date: Sat, 25 May 2019 15:19:28 -0700 Subject: [PATCH 6/8] Fix throttle time parsing... misread the docs --- afkak/common.py | 2 +- afkak/kafkacodec.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/afkak/common.py b/afkak/common.py index 3b8701d0..ec1d3ed9 100644 --- a/afkak/common.py +++ b/afkak/common.py @@ -63,7 +63,7 @@ # Response payloads ProduceResponse = namedtuple("ProduceResponse", - ["topic", "partition", "error", "offset", "throttle_time"]) + ["topic", "partition", "error", "offset"]) ProduceResponse.__new__.func_defaults = (None, None, None, None, -1) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", diff --git a/afkak/kafkacodec.py b/afkak/kafkacodec.py index 104db1f3..8be15d1b 100644 --- a/afkak/kafkacodec.py +++ b/afkak/kafkacodec.py @@ -154,10 +154,11 @@ def _encode_message(cls, message): The magic number of a message is a format version number. The only supported magic number right now is one. Format:: - Message => Crc MagicByte Attributes Key Value + Message => Crc MagicByte Attributes Timestamp Key Value Crc => int32 MagicByte => int8 Attributes => int8 + Timestamp => int64 Key => bytes Value => bytes """ @@ -286,7 +287,6 @@ def encode_produce_request(cls, client_id, correlation_id, message = cls._encode_message_header(client_id, correlation_id, KafkaCodec.PRODUCE_KEY, api_version=1) - message += struct.pack('>hii', acks, timeout, len(grouped_payloads)) for topic, topic_payloads in grouped_payloads.items(): @@ -314,9 +314,10 @@ def decode_produce_response(cls, data): topic, cur = read_short_ascii(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) for _i in range(num_partitions): - ((partition, error, offset, throttle_time), cur) = relative_unpack('>ihql', data, cur) + ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) + yield ProduceResponse(topic, partition, error, offset) - yield ProduceResponse(topic, partition, error, offset, throttle_time) + ((throttle_time,), cur) = relative_unpack('>l', data, cur) @classmethod def encode_fetch_request(cls, client_id, correlation_id, payloads=None, From ca91fccd3f92993a38f77cf798d177c94113026a Mon Sep 17 00:00:00 2001 From: Stephen Dawson-Haggerty Date: Mon, 27 May 2019 09:32:27 -0700 Subject: [PATCH 7/8] Change outstanding to dict to scale better with large numbers of outstanding requests --- afkak/producer.py | 8 ++++---- setup.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/afkak/producer.py b/afkak/producer.py index 9b373814..e8b716ab 100644 --- a/afkak/producer.py +++ b/afkak/producer.py @@ -146,7 +146,7 @@ def __init__(self, client, self._batch_reqs = [] # Current batch (possibly of 1 for unbatched) self._waitingMsgCount = 0 self._waitingByteCount = 0 - self._outstanding = [] # All currently outstanding requests + self._outstanding = {} # All currently outstanding requests self._batch_send_d = None # Outstanding client request to send msgs # Are we compressing messages, or just sending 'raw'? @@ -229,7 +229,7 @@ def send_messages(self, topic, key=None, msgs=()): self._waitingByteCount += byte_cnt # Add request to list of outstanding reqs' callback to remove - self._outstanding.append(d) + self._outstanding[id(d)] = d d.addBoth(self._remove_from_outstanding, d) # See if we have enough messages in the batch to do a send. self._check_send_batch() @@ -657,11 +657,11 @@ def _check_retry_payloads(failed_payloads_with_errs): def _remove_from_outstanding(self, result, d): """ Remove 'd' from the list of outstanding requests""" - self._outstanding.remove(d) + self._outstanding.pop(id(d)) return result def _cancel_outstanding(self): """Cancel all of our outstanding requests""" - for d in list(self._outstanding): + for d in self._outstanding.values(): d.addErrback(lambda _: None) # Eat any uncaught errors d.cancel() diff --git a/setup.py b/setup.py index b890fb5d..a555ba16 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ # NB: This version is extracted by the Makefile using awk; don't change the # formatting here! -version = "3.0.0" +version = "3.0.0+comfy5" with open('README.md', 'r') as fin: readme_lines = fin.readlines() From 01e4ce92f644cda0e3e21d1bfc7d9b17ca774afc Mon Sep 17 00:00:00 2001 From: Stephen Dawson-Haggerty Date: Tue, 4 Jun 2019 16:10:31 -0700 Subject: [PATCH 8/8] Change to set --- afkak/producer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/afkak/producer.py b/afkak/producer.py index e8b716ab..ce1c039d 100644 --- a/afkak/producer.py +++ b/afkak/producer.py @@ -146,7 +146,7 @@ def __init__(self, client, self._batch_reqs = [] # Current batch (possibly of 1 for unbatched) self._waitingMsgCount = 0 self._waitingByteCount = 0 - self._outstanding = {} # All currently outstanding requests + self._outstanding = set([]) # All currently outstanding requests self._batch_send_d = None # Outstanding client request to send msgs # Are we compressing messages, or just sending 'raw'? @@ -229,7 +229,7 @@ def send_messages(self, topic, key=None, msgs=()): self._waitingByteCount += byte_cnt # Add request to list of outstanding reqs' callback to remove - self._outstanding[id(d)] = d + self._outstanding.add(d) d.addBoth(self._remove_from_outstanding, d) # See if we have enough messages in the batch to do a send. self._check_send_batch() @@ -657,11 +657,11 @@ def _check_retry_payloads(failed_payloads_with_errs): def _remove_from_outstanding(self, result, d): """ Remove 'd' from the list of outstanding requests""" - self._outstanding.pop(id(d)) + self._outstanding.remove(d) return result def _cancel_outstanding(self): """Cancel all of our outstanding requests""" - for d in self._outstanding.values(): + for d in list(self._outstanding): d.addErrback(lambda _: None) # Eat any uncaught errors d.cancel()