Vous pouvez utiliser la bibliothèque libarchive en Python par l'intermédiaire de ctypes - il offre des moyens de manipuler les données ZIP en mémoire, en mettant l'accent sur le streaming (du moins historiquement).
Supposons que nous voulions décompresser des fichiers ZIP à la volée lors d'un téléchargement à partir d'un serveur HTTP. Le code ci-dessous
from contextlib import contextmanager
from ctypes import CFUNCTYPE, POINTER, create_string_buffer, cdll, byref, c_ssize_t, c_char_p, c_int, c_void_p, c_char
from ctypes.util import find_library
import httpx
def get_zipped_chunks(url, chunk_size=6553):
with httpx.stream('GET', url) as r:
yield from r.iter_bytes()
def stream_unzip(zipped_chunks, chunk_size=65536):
# Library
libarchive = cdll.LoadLibrary(find_library('archive'))
# Callback types
open_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
read_callback_type = CFUNCTYPE(c_ssize_t, c_void_p, c_void_p, POINTER(POINTER(c_char)))
close_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
# Function types
libarchive.archive_read_new.restype = c_void_p
libarchive.archive_read_open.argtypes = [c_void_p, c_void_p, open_callback_type, read_callback_type, close_callback_type]
libarchive.archive_read_finish.argtypes = [c_void_p]
libarchive.archive_entry_new.restype = c_void_p
libarchive.archive_read_next_header.argtypes = [c_void_p, c_void_p]
libarchive.archive_read_support_compression_all.argtypes = [c_void_p]
libarchive.archive_read_support_format_all.argtypes = [c_void_p]
libarchive.archive_entry_pathname.argtypes = [c_void_p]
libarchive.archive_entry_pathname.restype = c_char_p
libarchive.archive_read_data.argtypes = [c_void_p, POINTER(c_char), c_ssize_t]
libarchive.archive_read_data.restype = c_ssize_t
libarchive.archive_error_string.argtypes = [c_void_p]
libarchive.archive_error_string.restype = c_char_p
ARCHIVE_EOF = 1
ARCHIVE_OK = 0
it = iter(zipped_chunks)
compressed_bytes = None # Make sure not garbage collected
@contextmanager
def get_archive():
archive = libarchive.archive_read_new()
if not archive:
raise Exception('Unable to allocate archive')
try:
yield archive
finally:
libarchive.archive_read_finish(archive)
def read_callback(archive, client_data, buffer):
nonlocal compressed_bytes
try:
compressed_bytes = create_string_buffer(next(it))
except StopIteration:
return 0
else:
buffer[0] = compressed_bytes
return len(compressed_bytes) - 1
def uncompressed_chunks(archive):
uncompressed_bytes = create_string_buffer(chunk_size)
while (num := libarchive.archive_read_data(archive, uncompressed_bytes, len(uncompressed_bytes))) > 0:
yield uncompressed_bytes.value[:num]
if num < 0:
raise Exception(libarchive.archive_error_string(archive))
with get_archive() as archive:
libarchive.archive_read_support_compression_all(archive)
libarchive.archive_read_support_format_all(archive)
libarchive.archive_read_open(
archive, 0,
open_callback_type(0), read_callback_type(read_callback), close_callback_type(0),
)
entry = c_void_p(libarchive.archive_entry_new())
if not entry:
raise Exception('Unable to allocate entry')
while (status := libarchive.archive_read_next_header(archive, byref(entry))) == ARCHIVE_OK:
yield (libarchive.archive_entry_pathname(entry), uncompressed_chunks(archive))
if status != ARCHIVE_EOF:
raise Exception(libarchive.archive_error_string(archive))
peut être utilisé comme suit
zipped_chunks = get_zipped_chunks('https://domain.test/file.zip')
files = stream_unzip(zipped_chunks)
for name, uncompressed_chunks in stream_unzip(zipped_chunks):
print(name)
for uncompressed_chunk in uncompressed_chunks:
print(uncompressed_chunk)
En fait, libarchive prend en charge de nombreux formats d'archives, et rien de ce qui précède n'est particulièrement spécifique à ZIP, il peut très bien fonctionner avec d'autres formats.