Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions afkak/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
# Response payloads
ProduceResponse = namedtuple("ProduceResponse",
["topic", "partition", "error", "offset"])
ProduceResponse.__new__.func_defaults = (None, None, None, None, -1)

FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error",
"highwaterMark", "messages"])
Expand Down Expand Up @@ -100,11 +101,11 @@
"SourcedMessage", TopicAndPartition._fields + OffsetAndMessage._fields)


class Message(namedtuple("Message", ["magic", "attributes", "key", "value"])):
class Message(namedtuple("Message", ["magic", "attributes", "key", "value", "timestamp"])):
"""
A Kafka `message`_ in format 0.
A Kafka `message`_ in format 0 or 1.

:ivar int magic: Message format version, always 0.
:ivar int magic: Message format version, always 1.
:ivar int attributes: Compression flags.
:ivar bytes key:
Message key, or ``None`` when the message lacks a key.
Expand All @@ -117,7 +118,7 @@ class Message(namedtuple("Message", ["magic", "attributes", "key", "value"])):
__slots__ = ()

def __repr__(self):
bits = ['<Message v0']
bits = ['<Message v%i' % self.magic]

if self.attributes != 0:
if self.attributes == CODEC_GZIP:
Expand All @@ -140,6 +141,7 @@ def __repr__(self):

bits.append('>')
return ''.join(bits)
Message.__new__.func_defaults = (None, None, None, None, -1)


#################
Expand Down
32 changes: 20 additions & 12 deletions afkak/kafkacodec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import absolute_import

import time
import logging
import struct
import zlib
Expand Down Expand Up @@ -151,17 +152,18 @@ def _encode_message(cls, message):
Encode a single message.

The magic number of a message is a format version number. The only
supported magic number right now is zero. Format::
supported magic number right now is one. Format::

Message => Crc MagicByte Attributes Key Value
Message => Crc MagicByte Attributes Timestamp Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Timestamp => int64
Key => bytes
Value => bytes
"""
if message.magic == 0:
msg = struct.pack('>BB', message.magic, message.attributes)
if message.magic == 1:
msg = struct.pack('>BBq', message.magic, message.attributes, message.timestamp)
msg += write_int_string(message.key)
msg += write_int_string(message.value)
crc = zlib.crc32(msg) & 0xffffffff # Ensure unsigned
Expand Down Expand Up @@ -284,8 +286,7 @@ def encode_produce_request(cls, client_id, correlation_id,
grouped_payloads = group_by_topic_and_partition(payloads)

message = cls._encode_message_header(client_id, correlation_id,
KafkaCodec.PRODUCE_KEY)

KafkaCodec.PRODUCE_KEY, api_version=1)
message += struct.pack('>hii', acks, timeout, len(grouped_payloads))

for topic, topic_payloads in grouped_payloads.items():
Expand Down Expand Up @@ -314,9 +315,10 @@ def decode_produce_response(cls, data):
((num_partitions,), cur) = relative_unpack('>i', data, cur)
for _i in range(num_partitions):
((partition, error, offset), cur) = relative_unpack('>ihq', data, cur)

yield ProduceResponse(topic, partition, error, offset)

((throttle_time,), cur) = relative_unpack('>l', data, cur)

@classmethod
def encode_fetch_request(cls, client_id, correlation_id, payloads=None,
max_wait_time=100, min_bytes=4096):
Expand Down Expand Up @@ -630,7 +632,7 @@ def decode_offset_fetch_response(cls, data):
metadata, error)


def create_message(payload, key=None):
def create_message(m, key=None, magic=1):
"""
Construct a :class:`Message`

Expand All @@ -640,9 +642,15 @@ def create_message(payload, key=None):
determine message identity on a compacted topic.
:type key: :class:`bytes` or ``None``
"""
assert payload is None or isinstance(payload, bytes), 'payload={!r} should be bytes or None'.format(payload)
assert (m is None or isinstance(m, bytes) or isinstance(m, tuple),
'payload={!r} should be bytes, a (timestamp, bytes) tuple, or None'.format(m))
assert key is None or isinstance(key, bytes), 'key={!r} should be bytes or None'.format(key)
return Message(0, 0, key, payload)
if isinstance(m, tuple):
ts, payload = m
else:
ts, payload = int(time.time() * 1000), m

return Message(magic, 0, key, payload, ts)


def create_gzip_message(message_set):
Expand All @@ -657,7 +665,7 @@ def create_gzip_message(message_set):
encoded_message_set = KafkaCodec._encode_message_set(message_set)

gzipped = gzip_encode(encoded_message_set)
return Message(0, CODEC_GZIP, None, gzipped)
return Message(1, CODEC_GZIP, None, gzipped, -1)


def create_snappy_message(message_set):
Expand All @@ -671,7 +679,7 @@ def create_snappy_message(message_set):
"""
encoded_message_set = KafkaCodec._encode_message_set(message_set)
snapped = snappy_encode(encoded_message_set)
return Message(0, CODEC_SNAPPY, None, snapped)
return Message(1, CODEC_SNAPPY, None, snapped, -1)


def create_message_set(requests, codec=CODEC_NONE):
Expand Down
7 changes: 5 additions & 2 deletions afkak/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def __init__(self, client,
self._batch_reqs = [] # Current batch (possibly of 1 for unbatched)
self._waitingMsgCount = 0
self._waitingByteCount = 0
self._outstanding = [] # All currently outstanding requests
self._outstanding = set([]) # All currently outstanding requests
self._batch_send_d = None # Outstanding client request to send msgs

# Are we compressing messages, or just sending 'raw'?
Expand Down Expand Up @@ -212,6 +212,9 @@ def send_messages(self, topic, key=None, msgs=()):
if m is None:
continue

if isinstance(m, tuple):
ts, m = m

if not isinstance(m, bytes):
raise TypeError('Message {} to topic {} ({!r:.100}) has type {}, but must have type {}'.format(
index, topic, m, type(m).__name__, type(bytes).__name__))
Expand All @@ -226,7 +229,7 @@ def send_messages(self, topic, key=None, msgs=()):
self._waitingByteCount += byte_cnt

# Add request to list of outstanding reqs' callback to remove
self._outstanding.append(d)
self._outstanding.add(d)
d.addBoth(self._remove_from_outstanding, d)
# See if we have enough messages in the batch to do a send.
self._check_send_batch()
Expand Down
Loading