Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions kafka/protocol/types.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from __future__ import absolute_import

from struct import pack, unpack, error
import struct
from struct import error

from kafka.protocol.abstract import AbstractType


def _pack(f, value):
try:
return pack(f, value)
return f(value)
except error as e:
raise ValueError("Error encountered when attempting to convert value: "
"{!r} to struct format: '{}', hit error: {}"
Expand All @@ -16,7 +17,7 @@ def _pack(f, value):

def _unpack(f, data):
try:
(value,) = unpack(f, data)
(value,) = f(data)
return value
except error as e:
raise ValueError("Error encountered when attempting to convert value: "
Expand All @@ -25,43 +26,55 @@ def _unpack(f, data):


class Int8(AbstractType):
_pack = struct.Struct('>b').pack
_unpack = struct.Struct('>b').unpack

@classmethod
def encode(cls, value):
return _pack('>b', value)
return _pack(cls._pack, value)

@classmethod
def decode(cls, data):
return _unpack('>b', data.read(1))
return _unpack(cls._unpack, data.read(1))


class Int16(AbstractType):
_pack = struct.Struct('>h').pack
_unpack = struct.Struct('>h').unpack

@classmethod
def encode(cls, value):
return _pack('>h', value)
return _pack(cls._pack, value)

@classmethod
def decode(cls, data):
return _unpack('>h', data.read(2))
return _unpack(cls._unpack, data.read(2))


class Int32(AbstractType):
_pack = struct.Struct('>i').pack
_unpack = struct.Struct('>i').unpack

@classmethod
def encode(cls, value):
return _pack('>i', value)
return _pack(cls._pack, value)

@classmethod
def decode(cls, data):
return _unpack('>i', data.read(4))
return _unpack(cls._unpack, data.read(4))


class Int64(AbstractType):
_pack = struct.Struct('>q').pack
_unpack = struct.Struct('>q').unpack

@classmethod
def encode(cls, value):
return _pack('>q', value)
return _pack(cls._pack, value)

@classmethod
def decode(cls, data):
return _unpack('>q', data.read(8))
return _unpack(cls._unpack, data.read(8))


class String(AbstractType):
Expand Down Expand Up @@ -108,13 +121,16 @@ def repr(cls, value):


class Boolean(AbstractType):
_pack = struct.Struct('>?').pack
_unpack = struct.Struct('>?').unpack

@classmethod
def encode(cls, value):
return _pack('>?', value)
return _pack(cls._pack, value)

@classmethod
def decode(cls, data):
return _unpack('>?', data.read(1))
return _unpack(cls._unpack, data.read(1))


class Schema(AbstractType):
Expand Down