@@ -206,6 +206,73 @@ def copy(self):
206206 return self
207207
208208
209+ class HadoopStreamCompressor ():
210+ def add_chunk (self , data : bytes , compress = None ):
211+ """Add a chunk, returning a string that is framed and compressed.
212+
213+ Outputs a single snappy chunk; if it is the very start of the stream,
214+ will also contain the stream header chunk.
215+ """
216+ cdata = _compress (data )
217+ return b"" .join ((len (data ).to_bytes (4 , "big" ), len (cdata ).to_bytes (4 , "big" ), cdata ))
218+
219+ compress = add_chunk
220+
221+ def flush (self ):
222+ # never maintains a buffer
223+ return b""
224+
225+ def copy (self ):
226+ """This method exists for compatibility with the zlib compressobj.
227+ """
228+ return self
229+
230+
231+ class HadoopStreamDecompressor ():
232+ def __init__ (self ):
233+ self .remains = b""
234+
235+ @staticmethod
236+ def check_format (data ):
237+ """Checks that there are enough bytes for a hadoop header
238+
239+ We cannot actually determine if the data is really hadoop-snappy
240+ """
241+ if len (data ) < 8 :
242+ raise UncompressError ("Too short data length" )
243+ chunk_length = int .from_bytes (data [4 :8 ], "big" )
244+
245+ def decompress (self , data : bytes ):
246+ """Decompress 'data', returning a string containing the uncompressed
247+ data corresponding to at least part of the data in string. This data
248+ should be concatenated to the output produced by any preceding calls to
249+ the decompress() method. Some of the input data may be preserved in
250+ internal buffers for later processing.
251+ """
252+ if self .remains :
253+ data = self .remains + data
254+ self .remains = None
255+ if len (data ) < 8 :
256+ self .remains = data
257+ return b""
258+ out = []
259+ while True :
260+ chunk_length = int .from_bytes (data [4 :8 ], "big" )
261+ if len (data ) < 8 + chunk_length :
262+ self .remains = data
263+ break
264+ out .append (_uncompress (data [8 :8 + chunk_length ]))
265+ data = data [8 + chunk_length :]
266+ return b"" .join (out )
267+
268+ def flush (self ):
269+ return b""
270+
271+ def copy (self ):
272+ return self
273+
274+
275+
209276def stream_compress (src ,
210277 dst ,
211278 blocksize = _STREAM_TO_STREAM_BLOCK_SIZE ,
0 commit comments