import zlib
from contextlib import closing, contextmanager
from tempfile import SpooledTemporaryFile
from warcio.limitreader import LimitReader
from warcio.utils import BUFF_SIZE
# =============================================================================
[docs]def no_except_close(closable):
"""Attempts to call the close method of the
supplied object catching all exceptions.
Also tries to call release_conn() in case a requests raw stream
:param closable: The object to be closed
:rtype: None
"""
try:
closable.close()
except Exception:
pass
try:
closable.release_conn()
except Exception:
pass
# =============================================================================
[docs]def StreamIter(stream, header1=None, header2=None, size=BUFF_SIZE, closer=closing):
with closer(stream):
if header1:
yield header1
if header2:
yield header2
while True:
buff = stream.read(size)
if not buff:
break
yield buff
# =============================================================================
[docs]@contextmanager
def call_release_conn(stream):
try:
yield stream
finally:
no_except_close(stream)
# =============================================================================
[docs]def chunk_encode_iter(orig_iter):
for chunk in orig_iter:
if not len(chunk):
continue
chunk_len = b'%X\r\n' % len(chunk)
yield chunk_len
yield chunk
yield b'\r\n'
yield b'0\r\n\r\n'
# =============================================================================
[docs]def buffer_iter(status_headers, iterator, buff_size=BUFF_SIZE * 4):
out = SpooledTemporaryFile(buff_size)
size = 0
for buff in iterator:
size += len(buff)
out.write(buff)
content_length_str = str(size)
# remove existing content length
status_headers.replace_header('Content-Length',
content_length_str)
out.seek(0)
return StreamIter(out)
# =============================================================================
[docs]def compress_gzip_iter(orig_iter):
compressobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16)
for chunk in orig_iter:
buff = compressobj.compress(chunk)
if len(buff) == 0:
continue
yield buff
yield compressobj.flush()
# ============================================================================
[docs]class OffsetLimitReader(LimitReader):
def __init__(self, stream, offset, length):
super(OffsetLimitReader, self).__init__(stream, length)
self.offset = offset
if offset > 0:
self._skip_reader = LimitReader(stream, offset)
else:
self._skip_reader = None
def _skip(self):
while self._skip_reader:
buff = self._skip_reader.read()
if not buff:
self._skip_reader = None
[docs] def read(self, length=None):
self._skip()
return super(OffsetLimitReader, self).read(length)
[docs] def readline(self, length=None):
self._skip()
return super(OffsetLimitReader, self).readline(length)
# ============================================================================
[docs]class StreamClosingReader(object):
def __init__(self, stream):
self.stream = stream
[docs] def read(self, length=None):
return self.stream.read(length)
[docs] def readline(self, length=None):
return self.stream.readline(length)
[docs] def close(self):
no_except_close(self.stream)