diff --git a/afkak/common.py b/afkak/common.py index 9398d368..ec1d3ed9 100644 --- a/afkak/common.py +++ b/afkak/common.py @@ -64,6 +64,7 @@ # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) +ProduceResponse.__new__.func_defaults = (None, None, None, None, -1) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) @@ -100,11 +101,11 @@ "SourcedMessage", TopicAndPartition._fields + OffsetAndMessage._fields) -class Message(namedtuple("Message", ["magic", "attributes", "key", "value"])): +class Message(namedtuple("Message", ["magic", "attributes", "key", "value", "timestamp"])): """ - A Kafka `message`_ in format 0. + A Kafka `message`_ in format 0 or 1. - :ivar int magic: Message format version, always 0. + :ivar int magic: Message format version, always 1. :ivar int attributes: Compression flags. :ivar bytes key: Message key, or ``None`` when the message lacks a key. @@ -117,7 +118,7 @@ class Message(namedtuple("Message", ["magic", "attributes", "key", "value"])): __slots__ = () def __repr__(self): - bits = ['') return ''.join(bits) +Message.__new__.func_defaults = (None, None, None, None, -1) ################# diff --git a/afkak/kafkacodec.py b/afkak/kafkacodec.py index 274a9606..8be15d1b 100644 --- a/afkak/kafkacodec.py +++ b/afkak/kafkacodec.py @@ -4,6 +4,7 @@ from __future__ import absolute_import +import time import logging import struct import zlib @@ -151,17 +152,18 @@ def _encode_message(cls, message): Encode a single message. The magic number of a message is a format version number. The only - supported magic number right now is zero. Format:: + supported magic number right now is one. Format:: - Message => Crc MagicByte Attributes Key Value + Message => Crc MagicByte Attributes Timestamp Key Value Crc => int32 MagicByte => int8 Attributes => int8 + Timestamp => int64 Key => bytes Value => bytes """ - if message.magic == 0: - msg = struct.pack('>BB', message.magic, message.attributes) + if message.magic == 1: + msg = struct.pack('>BBq', message.magic, message.attributes, message.timestamp) msg += write_int_string(message.key) msg += write_int_string(message.value) crc = zlib.crc32(msg) & 0xffffffff # Ensure unsigned @@ -284,8 +286,7 @@ def encode_produce_request(cls, client_id, correlation_id, grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, - KafkaCodec.PRODUCE_KEY) - + KafkaCodec.PRODUCE_KEY, api_version=1) message += struct.pack('>hii', acks, timeout, len(grouped_payloads)) for topic, topic_payloads in grouped_payloads.items(): @@ -314,9 +315,10 @@ def decode_produce_response(cls, data): ((num_partitions,), cur) = relative_unpack('>i', data, cur) for _i in range(num_partitions): ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) - yield ProduceResponse(topic, partition, error, offset) + ((throttle_time,), cur) = relative_unpack('>l', data, cur) + @classmethod def encode_fetch_request(cls, client_id, correlation_id, payloads=None, max_wait_time=100, min_bytes=4096): @@ -630,7 +632,7 @@ def decode_offset_fetch_response(cls, data): metadata, error) -def create_message(payload, key=None): +def create_message(m, key=None, magic=1): """ Construct a :class:`Message` @@ -640,9 +642,15 @@ def create_message(payload, key=None): determine message identity on a compacted topic. :type key: :class:`bytes` or ``None`` """ - assert payload is None or isinstance(payload, bytes), 'payload={!r} should be bytes or None'.format(payload) + assert (m is None or isinstance(m, bytes) or isinstance(m, tuple), + 'payload={!r} should be bytes, a (timestamp, bytes) tuple, or None'.format(m)) assert key is None or isinstance(key, bytes), 'key={!r} should be bytes or None'.format(key) - return Message(0, 0, key, payload) + if isinstance(m, tuple): + ts, payload = m + else: + ts, payload = int(time.time() * 1000), m + + return Message(magic, 0, key, payload, ts) def create_gzip_message(message_set): @@ -657,7 +665,7 @@ def create_gzip_message(message_set): encoded_message_set = KafkaCodec._encode_message_set(message_set) gzipped = gzip_encode(encoded_message_set) - return Message(0, CODEC_GZIP, None, gzipped) + return Message(1, CODEC_GZIP, None, gzipped, -1) def create_snappy_message(message_set): @@ -671,7 +679,7 @@ def create_snappy_message(message_set): """ encoded_message_set = KafkaCodec._encode_message_set(message_set) snapped = snappy_encode(encoded_message_set) - return Message(0, CODEC_SNAPPY, None, snapped) + return Message(1, CODEC_SNAPPY, None, snapped, -1) def create_message_set(requests, codec=CODEC_NONE): diff --git a/afkak/producer.py b/afkak/producer.py index fddca7e2..ce1c039d 100644 --- a/afkak/producer.py +++ b/afkak/producer.py @@ -146,7 +146,7 @@ def __init__(self, client, self._batch_reqs = [] # Current batch (possibly of 1 for unbatched) self._waitingMsgCount = 0 self._waitingByteCount = 0 - self._outstanding = [] # All currently outstanding requests + self._outstanding = set([]) # All currently outstanding requests self._batch_send_d = None # Outstanding client request to send msgs # Are we compressing messages, or just sending 'raw'? @@ -212,6 +212,9 @@ def send_messages(self, topic, key=None, msgs=()): if m is None: continue + if isinstance(m, tuple): + ts, m = m + if not isinstance(m, bytes): raise TypeError('Message {} to topic {} ({!r:.100}) has type {}, but must have type {}'.format( index, topic, m, type(m).__name__, type(bytes).__name__)) @@ -226,7 +229,7 @@ def send_messages(self, topic, key=None, msgs=()): self._waitingByteCount += byte_cnt # Add request to list of outstanding reqs' callback to remove - self._outstanding.append(d) + self._outstanding.add(d) d.addBoth(self._remove_from_outstanding, d) # See if we have enough messages in the batch to do a send. self._check_send_batch() diff --git a/afkak/test/test_kafkacodec.py b/afkak/test/test_kafkacodec.py index f64a931e..87f5dcf1 100644 --- a/afkak/test/test_kafkacodec.py +++ b/afkak/test/test_kafkacodec.py @@ -104,63 +104,64 @@ def test_create_message(self): payload = b"test" key = b"key" msg = create_message(payload, key) - self.assertEqual(msg.magic, 0) + self.assertEqual(msg.magic, 1) self.assertEqual(msg.attributes, 0) self.assertEqual(msg.key, key) self.assertEqual(msg.value, payload) def test_create_gzip(self): - message_list = [create_message(b"v1", None), - create_message(b"v2", key=b'42')] + message_list = [create_message((-1, b"v1"), None), + create_message((-1, b"v2"), key=b'42')] + msg = create_gzip_message(message_list) - self.assertEqual(msg.magic, 0) + self.assertEqual(msg.magic, 1) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) self.assertEqual(msg.key, None) # Need to decode to check since gzipped payload is non-deterministic decoded = gzip_decode(msg.value) expect = b"".join([ struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 16), # MsgSet size - struct.pack(">i", 1285512130), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 24), # MsgSet size + struct.pack(">i", -346518548), # CRC + struct.pack(">bbq", 1, 0, -1), # Magic, flags, timestamp struct.pack(">i", -1), # -1 indicates a null key struct.pack(">i", 2), # Msg length (bytes) b"v1", # Message contents struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 18), # MsgSet size - struct.pack(">i", 1929437987), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 26), # MsgSet size + struct.pack(">i", -19620068), # CRC + struct.pack(">bbq", 1, 0, -1), # Magic, flags, timestamp struct.pack(">i2s", 2, b'42'), # Key is 2 bytes long, 42 struct.pack(">i", 2), # Msg length (bytes) b"v2", # Message contents - ]) - + ]) self.assertEqual(decoded, expect) def test_create_snappy(self): if not has_snappy(): raise SkipTest("Snappy not available") # pragma: no cover - message_list = [create_message(b"v3", key=b'84'), - create_message(b"v4", None)] + message_list = [create_message((47, b"v3"), key=b'84'), + create_message((48, b"v4"), None)] + msg = create_snappy_message(message_list) - self.assertEqual(msg.magic, 0) + self.assertEqual(msg.magic, 1) self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) self.assertEqual(msg.key, None) decoded = snappy_decode(msg.value) expect = b"".join([ struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 18), # MsgSet size - struct.pack(">i", 813233088), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 26), # MsgSet size + struct.pack(">i", -892550681), # CRC + struct.pack(">bbq", 1, 0, 47), # Magic, flags struct.pack(">i2s", 2, b'84'), # Key is 2 bytes long, '84' struct.pack(">i", 2), # Msg length (bytes) b"v3", # Message contents struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 16), # MsgSet size - struct.pack(">i", 1022734157), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 24), # MsgSet size + struct.pack(">i", 100172533), # CRC + struct.pack(">bbq", 1, 0, 48), # Magic, flags struct.pack(">i", -1), # -1 indicates a null key struct.pack(">i", 2), # Msg length (bytes) b"v4", # Message contents @@ -181,11 +182,11 @@ def test_encode_message_header(self): self.assertEqual(encoded, expect) def test_encode_message(self): - message = create_message(b"test", b"key") + message = create_message((47, b"test"), b"key", magic=1) encoded = KafkaCodec._encode_message(message) expect = b"".join([ - struct.pack(">i", -1427009701), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", -1232077792), # CRC + struct.pack(">bbq", 1, 0, 47), # Magic, flags, timestamp struct.pack(">i", 3), # Length of key b"key", # key struct.pack(">i", 4), # Length of value @@ -209,34 +210,34 @@ def test_decode_message(self): KafkaCodec._decode_message(encoded, offset))[0] self.assertEqual(returned_offset, offset) - self.assertEqual(decoded_message, create_message(b"test", b"key")) + self.assertEqual(decoded_message, create_message((-1, b"test"), b"key", magic=0)) def test_encode_message_failure(self): self.assertRaises(ProtocolError, KafkaCodec._encode_message, - Message(1, 0, "key", "test")) + Message(0, 0, "key", "test")) def test_encode_message_set(self): message_set = [ - create_message(b"v1", b"k1"), - create_message(b"v2", b"k2"), + create_message((-1, b"v1"), b"k1"), + create_message((-1, b"v2"), b"k2"), ] - + encoded = KafkaCodec._encode_message_set(message_set) expect = b"".join([ struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", 1474775406), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 26), # Msg Size + struct.pack(">i", -634178223), # CRC + struct.pack(">bbq", 1, 0, -1), # Magic, flags, timestamp struct.pack(">i", 2), # Length of key b"k1", # Key struct.pack(">i", 2), # Length of value b"v1", # Value struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", -16383415), # CRC - struct.pack(">bb", 0, 0), # Magic, flags + struct.pack(">i", 26), # Msg Size + struct.pack(">i", 1926397558), # CRC + struct.pack(">bbq", 1, 0, -1), # Magic, flags, timestamp struct.pack(">i", 2), # Length of key b"k2", # Key struct.pack(">i", 2), # Length of value @@ -274,10 +275,10 @@ def test_decode_message_set(self): returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1", b"k1")) + self.assertEqual(decoded_message1, create_message((-1, b"v1"), b"k1", magic=0)) self.assertEqual(returned_offset2, 1) - self.assertEqual(decoded_message2, create_message(b"v2", b"k2")) + self.assertEqual(decoded_message2, create_message((-1, b"v2"), b"k2", magic=0)) def test_decode_message_gzip(self): gzip_encoded = (b'\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' @@ -293,11 +294,11 @@ def test_decode_message_gzip(self): returned_offset1, decoded_message1 = msg1 self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1")) + self.assertEqual(decoded_message1, create_message((-1, b"v1"), magic=0)) returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset2, 0) - self.assertEqual(decoded_message2, create_message(b"v2")) + self.assertEqual(decoded_message2, create_message((-1, b"v2"), magic=0)) def test_decode_message_snappy(self): if not has_snappy(): @@ -314,11 +315,11 @@ def test_decode_message_snappy(self): returned_offset1, decoded_message1 = msg1 self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1")) + self.assertEqual(decoded_message1, create_message((-1, b"v1"), magic=0)) returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset2, 0) - self.assertEqual(decoded_message2, create_message(b"v2")) + self.assertEqual(decoded_message2, create_message((-1, b"v2"), magic=0)) def test_decode_message_checksum_error(self): invalid_encoded_message = b"This is not a valid encoded message" @@ -361,10 +362,10 @@ def test_decode_message_set_stop_iteration(self): returned_offset2, decoded_message2 = msg2 self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1", b"k1")) + self.assertEqual(decoded_message1, create_message((-1, b"v1"), b"k1", magic=0)) self.assertEqual(returned_offset2, 1) - self.assertEqual(decoded_message2, create_message(b"v2", b"k2")) + self.assertEqual(decoded_message2, create_message((-1, b"v2"), b"k2", magic=0)) def test_get_response_correlation_id(self): t1 = b"topic1" @@ -379,21 +380,21 @@ def test_get_response_correlation_id(self): def test_encode_produce_request(self): requests = [ ProduceRequest("topic1", 0, [ - create_message(b"a"), - create_message(b"b"), + create_message((-1, b"a")), + create_message((-1, b"b")), ]), ProduceRequest(u"topic2", 1, [ - create_message(b"c"), + create_message((-1, b"c")), ]), ] - msg_a_binary = KafkaCodec._encode_message(create_message(b"a")) - msg_b_binary = KafkaCodec._encode_message(create_message(b"b")) - msg_c_binary = KafkaCodec._encode_message(create_message(b"c")) + msg_a_binary = KafkaCodec._encode_message(create_message((-1, b"a"))) + msg_b_binary = KafkaCodec._encode_message(create_message((-1, b"b"))) + msg_c_binary = KafkaCodec._encode_message(create_message((-1, b"c"))) header = b"".join([ struct.pack('>h', 0), # Msg Header, Message type = Produce - struct.pack('>h', 0), # Msg Header, API version + struct.pack('>h', 1), # Msg Header, API version struct.pack('>i', 2), # Msg Header, Correlation ID struct.pack('>h7s', 7, b"client1"), # Msg Header, The client ID struct.pack('>h', 2), # Num acks required @@ -430,19 +431,20 @@ def test_encode_produce_request(self): encoded = KafkaCodec.encode_produce_request( b"client1", 2, requests, 2, 100) + self.assertIn(encoded, [expected1, expected2]) def test_decode_produce_response(self): t1 = "topic1" t2 = u"topic2" - encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)), - 2, 2, len(t1), t1.encode(), 2, 0, 0, 10, 1, 1, 20, - len(t2), t2.encode(), 1, 0, 0, 30) + encoded = struct.pack('>iih%dsiihqlihqlh%dsiihql' % (len(t1), len(t2)), + 2, 2, len(t1), t1.encode(), 2, 0, 0, 10, 4, 1, 1, 20, 5, + len(t2), t2.encode(), 1, 0, 0, 30, 6) responses = list(KafkaCodec.decode_produce_response(encoded)) self.assertEqual(responses, - [ProduceResponse(t1, 0, 0, 10), - ProduceResponse(t1, 1, 1, 20), - ProduceResponse(t2, 0, 0, 30)]) + [ProduceResponse(t1, 0, 0, 10, 4), + ProduceResponse(t1, 1, 1, 20, 5), + ProduceResponse(t2, 0, 0, 30, 6)]) def test_encode_fetch_request(self): requests = [ @@ -494,8 +496,8 @@ def test_decode_fetch_response(self): encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' % (len(t1), len(ms1), len(ms2), len(t2), len(ms3)), - 4, 2, len(t1), t1.encode(), 2, 0, 0, 10, len(ms1), ms1, 1, - 1, 20, len(ms2), ms2, len(t2), t2.encode(), 1, 0, 0, 30, + 4, 2, len(t1), t1.encode(), 2, 0, 1, 10, len(ms1), ms1, 1, + 1, 20, len(ms2), ms2, len(t2), t2.encode(), 1, 0, 1, 30, len(ms3), ms3) responses = list(KafkaCodec.decode_fetch_response(encoded)) diff --git a/setup.py b/setup.py index b890fb5d..a555ba16 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ # NB: This version is extracted by the Makefile using awk; don't change the # formatting here! -version = "3.0.0" +version = "3.0.0+comfy5" with open('README.md', 'r') as fin: readme_lines = fin.readlines()