118 votes

Bibliothèque zip en mémoire Python

Existe-t-il une bibliothèque Python qui permette de manipuler des archives zip en mémoire, sans avoir à utiliser des fichiers disques ?

La bibliothèque ZipFile ne permet pas de mettre à jour l'archive. Le seul moyen semble être de l'extraire dans un répertoire, de faire vos modifications et de créer un nouveau zip à partir de ce répertoire. Je veux modifier des archives zip sans accès au disque, parce que je vais les télécharger, faire des changements, et les télécharger à nouveau, donc je n'ai aucune raison de les stocker.

Quelque chose de similaire à ZipInputStream/ZipOutputStream de Java ferait l'affaire, mais n'importe quelle interface évitant l'accès au disque conviendrait.

1voto

Michal Charemza Points 6269

Je souhaite modifier des archives zip sans accès au disque, car je les téléchargerai, les modifierai et les téléchargerai à nouveau, de sorte que je n'ai aucune raison de les stocker.

Ceci est possible en utilisant les deux bibliothèques https://github.com/uktrade/stream-unzip y https://github.com/uktrade/stream-zip (divulgation complète : c'est moi qui l'ai écrit). Et en fonction des changements, vous n'aurez peut-être même pas besoin de stocker l'intégralité du fichier zip en mémoire en une seule fois.

Disons que vous voulez juste télécharger, dézipper, zipper et recharger. C'est un peu inutile, mais vous pourriez apporter des modifications au contenu décompressé :

from datetime import datetime
import httpx
from stream_unzip import stream_unzip
from stream_zip import stream_zip, ZIP_64

def get_source_bytes_iter(url):
    with httpx.stream('GET', url) as r:
        yield from r.iter_bytes()

def get_target_files(files):
    # stream-unzip doesn't expose perms or modified_at, but stream-zip requires them
    modified_at = datetime.now()
    perms = 0o600

    for name, _, chunks in files:
        # Could change name, manipulate chunks, skip a file, or yield a new file
        yield name.decode(), modified_at, perms, ZIP_64, chunks

source_url = 'https://source.test/file.zip'
target_url = 'https://target.test/file.zip'

source_bytes_iter = get_source_bytes_iter(source_url)
source_files = stream_unzip(source_bytes_iter)
target_files = get_target_files(source_files)
target_bytes_iter = stream_zip(target_files)

httpx.put(target_url, data=target_bytes_iter)

0voto

pymen Points 49

Aide à la création d'un fichier zip en mémoire avec plusieurs fichiers basés sur des données telles que {'1.txt': 'string', '2.txt": b'bytes'}

import io, zipfile

def prepare_zip_file_content(file_name_content: dict) -> bytes:
    """returns Zip bytes ready to be saved with 
    open('C:/1.zip', 'wb') as f: f.write(bytes)
    @file_name_content dict like {'1.txt': 'string', '2.txt": b'bytes'} 
    """
    zip_buffer = io.BytesIO()
    with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file:
        for file_name, file_data in file_name_content.items():
            zip_file.writestr(file_name, file_data)

    zip_buffer.seek(0)
    return zip_buffer.getvalue()

0voto

Michal Charemza Points 6269

Vous pouvez utiliser la bibliothèque libarchive en Python par l'intermédiaire de ctypes - il offre des moyens de manipuler les données ZIP en mémoire, en mettant l'accent sur le streaming (du moins historiquement).

Supposons que nous voulions décompresser des fichiers ZIP à la volée lors d'un téléchargement à partir d'un serveur HTTP. Le code ci-dessous

from contextlib import contextmanager
from ctypes import CFUNCTYPE, POINTER, create_string_buffer, cdll, byref, c_ssize_t, c_char_p, c_int, c_void_p, c_char
from ctypes.util import find_library

import httpx

def get_zipped_chunks(url, chunk_size=6553):
    with httpx.stream('GET', url) as r:
        yield from r.iter_bytes()

def stream_unzip(zipped_chunks, chunk_size=65536):
    # Library
    libarchive = cdll.LoadLibrary(find_library('archive'))

    # Callback types
    open_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)
    read_callback_type = CFUNCTYPE(c_ssize_t, c_void_p, c_void_p, POINTER(POINTER(c_char)))
    close_callback_type = CFUNCTYPE(c_int, c_void_p, c_void_p)

    # Function types
    libarchive.archive_read_new.restype = c_void_p
    libarchive.archive_read_open.argtypes = [c_void_p, c_void_p, open_callback_type, read_callback_type, close_callback_type]
    libarchive.archive_read_finish.argtypes = [c_void_p]

    libarchive.archive_entry_new.restype = c_void_p

    libarchive.archive_read_next_header.argtypes = [c_void_p, c_void_p]
    libarchive.archive_read_support_compression_all.argtypes = [c_void_p]
    libarchive.archive_read_support_format_all.argtypes = [c_void_p]

    libarchive.archive_entry_pathname.argtypes = [c_void_p]
    libarchive.archive_entry_pathname.restype = c_char_p

    libarchive.archive_read_data.argtypes = [c_void_p, POINTER(c_char), c_ssize_t]
    libarchive.archive_read_data.restype = c_ssize_t

    libarchive.archive_error_string.argtypes = [c_void_p]
    libarchive.archive_error_string.restype = c_char_p

    ARCHIVE_EOF = 1
    ARCHIVE_OK = 0

    it = iter(zipped_chunks)
    compressed_bytes = None  # Make sure not garbage collected

    @contextmanager
    def get_archive():
        archive = libarchive.archive_read_new()
        if not archive:
            raise Exception('Unable to allocate archive')

        try:
            yield archive
        finally:
            libarchive.archive_read_finish(archive)

    def read_callback(archive, client_data, buffer):
        nonlocal compressed_bytes

        try:
            compressed_bytes = create_string_buffer(next(it))
        except StopIteration:
            return 0
        else:
            buffer[0] = compressed_bytes
            return len(compressed_bytes) - 1

    def uncompressed_chunks(archive):
        uncompressed_bytes = create_string_buffer(chunk_size)
        while (num := libarchive.archive_read_data(archive, uncompressed_bytes, len(uncompressed_bytes))) > 0:
            yield uncompressed_bytes.value[:num]
        if num < 0:
            raise Exception(libarchive.archive_error_string(archive))

    with get_archive() as archive: 
        libarchive.archive_read_support_compression_all(archive)
        libarchive.archive_read_support_format_all(archive)

        libarchive.archive_read_open(
            archive, 0,
            open_callback_type(0), read_callback_type(read_callback), close_callback_type(0),
        )
        entry = c_void_p(libarchive.archive_entry_new())
        if not entry:
            raise Exception('Unable to allocate entry')

        while (status := libarchive.archive_read_next_header(archive, byref(entry))) == ARCHIVE_OK:
            yield (libarchive.archive_entry_pathname(entry), uncompressed_chunks(archive))

        if status != ARCHIVE_EOF:
            raise Exception(libarchive.archive_error_string(archive))

peut être utilisé comme suit

zipped_chunks = get_zipped_chunks('https://domain.test/file.zip')
files = stream_unzip(zipped_chunks)

for name, uncompressed_chunks in stream_unzip(zipped_chunks):
    print(name)
    for uncompressed_chunk in uncompressed_chunks:
        print(uncompressed_chunk)

En fait, libarchive prend en charge de nombreux formats d'archives, et rien de ce qui précède n'est particulièrement spécifique à ZIP, il peut très bien fonctionner avec d'autres formats.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X