2 votes

Csv writer - Comment écrire des lignes dans plusieurs fichiers, en fonction d'un seuil ?

Je veux écrire des lignes dans un fichier csv, mais le fichier ne doit contenir que X lignes au maximum. Si le seuil est dépassé, il doit commencer un nouveau fichier. Donc si j'ai les données suivantes :

csv_max_rows=3
columns = ["A", "B", "C"]
rows = [
    ["a1", "b1", "c1"],
    ["a2", "b2", "c2"],
    ["a3", "b3", "c3"],
    ["a4", "b4", "c4"],
    ["a5", "b5", "c5"],
    ["a6", "b6", "c6"],
    ["a7", "b7", "c7"],
    ["a8", "b8", "c8"],
    ["a9", "b9", "c9"],
    ["a10", "b10", "c10"]
]

Je veux terminer avec 4 fichiers, où les fichiers 1, 2, 3 auront chacun 3 lignes et le fichier 4 n'aura qu'une seule ligne. Y a-t-il une option intégrée pour cela dans le module csv de Python?

1voto

user23952 Points 194

Je pense que vos besoins sont trop spécifiques pour attendre une option intégrée dans la bibliothèque standard. La solution ci-dessous est un peu bidouillée, mais je pense que c'est exactement ce que vous voulez.

import csv

csv_max_rows = 3
columns = ["A", "B", "C"]
rows = [
    ["a1", "b1", "c1"],
    ["a2", "b2", "c2"],
    ["a3", "b3", "c3"],
    ["a4", "b4", "c4"],
    ["a5", "b5", "c5"],
    ["a6", "b6", "c6"],
    ["a7", "b7", "c7"],
    ["a8", "b8", "c8"],
    ["a9", "b9", "c9"],
    ["a10", "b10", "c10"],
]

for i, row in enumerate(rows):
    if (i % csv_max_rows) == 0:
        fp = open(f"out_{i//csv_max_rows+1}.csv", "w")
        writer = csv.writer(fp)
        writer.writerow(columns)
    writer.writerow(row)

0voto

Shoham Points 51

Je ne suis pas sûr s'il existe une option intégrée, mais apparemment ce n'est pas si compliqué à réaliser :

from typing import List
import csv
import concurrent

def chunks(lst: List, n: int):
    while lst:
        chunk = lst[0:n]
        lst = lst[n:]
        yield chunk

def write_csv(csv_file_path: str, columns: List[str], rows: List[List]):
    with open(csv_file_path, 'w') as csv_file:
        csv_writer = csv.writer(csv_file)
        csv_writer.writerow(columns)
        for row in rows:
            csv_writer.writerow(row)

def write_csv_parallel(base_csv_file_path: str, columns: List[str], rows: List[List], csv_max_rows: int) -> List[str]:
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        chunked_rows = chunks(rows, csv_max_rows)
        csv_writing_args = ((f"{base_csv_file_path}.{idx + 1}", columns, chunk_of_rows) for idx, chunk_of_rows
                            in enumerate(chunked_rows))
        executor.map(lambda f: write_csv(*f), csv_writing_args)

if __name__ == "__main__":
    columns = ["A", "B", "C"]
    rows = [
        ["a1", "b1", "c1"],
        ["a2", "b2", "c2"],
        ["a3", "b3", "c3"],
        ["a4", "b4", "c4"],
        ["a5", "b5", "c5"],
        ["a6", "b6", "c6"],
        ["a7", "b7", "c7"],
        ["a8", "b8", "c8"],
        ["a9", "b9", "c9"],
        ["a10", "b10", "c10"]
    ]
    base_csv_file_path = "/tmp/test_file.csv"
    csv_file_paths = write_csv_parallel(base_csv_file_path, columns, rows, csv_max_rows=3)
    print("les données ont été écrites dans les fichiers suivants : \n" + "\n".join(csv_file_paths))

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X