335 votes

Comment puis-je lire des fichiers texte volumineux en Python, ligne par ligne, sans les charger en mémoire ?

J'ai besoin de lire un gros fichier, ligne par ligne. Disons que ce fichier a plus de 5 Go et que j'ai besoin de lire chaque ligne, mais je ne veux évidemment pas utiliser la fonction readlines() car cela va créer une très grande liste dans la mémoire.

Comment le code ci-dessous fonctionnera-t-il dans ce cas ? Est-ce que xreadlines lui-même en lisant un par un dans la mémoire ? L'expression du générateur est-elle nécessaire ?

f = (line for line in open("log.txt").xreadlines())  # how much is loaded in memory?

f.next()  

De plus, que puis-je faire pour lire ceci dans l'ordre inverse, comme le Linux tail commande ?

J'ai trouvé :

http://code.google.com/p/pytailer/

y

" python head, tail et backward lire par lignes d'un fichier texte "

Les deux ont très bien fonctionné !

0 votes

Et que puis-je faire pour lire cela depuis la queue ? ligne par ligne, en commençant par la dernière ligne.

0 votes

Ceci devrait être une question séparée

1 votes

15voto

Ariel Cabib Points 1886

Voici ce que vous devez faire si vous n'avez pas de nouvelles lignes dans le fichier :

with open('large_text.txt') as f:
  while True:
    c = f.read(1024)
    if not c:
      break
    print(c)

7voto

Richard Bronosky Points 3163

Je ne pouvais pas croire que cela pouvait être aussi facile que la réponse de @john-la-rooy le laissait croire. J'ai donc recréé le cp en utilisant la lecture et l'écriture ligne par ligne. C'est TRÈS RAPIDE.

#!/usr/bin/env python3.6

import sys

with open(sys.argv[2], 'w') as outfile:
    with open(sys.argv[1]) as infile:
        for line in infile:
            outfile.write(line)

5voto

jpp Points 83462

Le site flamme a parcouru un long chemin au cours des six dernières années. Il dispose d'une API simple couvrant un sous-ensemble utile des fonctionnalités de pandas.

dask.dataframe s'occupe du chunking en interne, prend en charge de nombreuses opérations parallélisables et vous permet d'exporter facilement des tranches vers pandas pour des opérations en mémoire.

import dask.dataframe as dd

df = dd.read_csv('filename.csv')
df.head(10)  # return first 10 rows
df.tail(10)  # return last 10 rows

# iterate rows
for idx, row in df.iterrows():
    ...

# group by my_field and return mean
df.groupby(df.my_field).value.mean().compute()

# slice by column
df[df.my_field=='XYZ'].compute()

2voto

Iyvin Jose Points 324

Voici le code pour charger des fichiers texte de n'importe quelle taille sans causer de problèmes de mémoire. Il supporte les fichiers de plusieurs gigaoctets

https://gist.github.com/iyvinjose/e6c1cb2821abd5f01fd1b9065cbc759d

télécharger le fichier data_loading_utils.py et l'importer dans votre code

utilisation

import data_loading_utils.py.py
file_name = 'file_name.ext'
CHUNK_SIZE = 1000000

def process_lines(data, eof, file_name):

    # check if end of file reached
    if not eof:
         # process data, data is one single line of the file

    else:
         # end of file reached

data_loading_utils.read_lines_from_file_as_data_chunks(file_name, chunk_size=CHUNK_SIZE, callback=self.process_lines)

process_lines est la fonction de rappel. Elle sera appelée pour toutes les lignes, avec des données de paramètre représentant une seule ligne du fichier à la fois.

Vous pouvez configurer la variable CHUNK_SIZE en fonction de la configuration matérielle de votre machine.

1voto

Amiga500 Points 679

Je me rends compte que cette question a déjà été traitée il y a un certain temps, mais voici un moyen de le faire en parallèle sans tuer la mémoire (ce qui serait le cas si vous essayiez de lancer chaque ligne dans le pool). Évidemment, remplacez la fonction readJSON_line2 par quelque chose de plus raisonnable - c'est juste pour illustrer le propos !

La vitesse dépendra de la taille des fichiers et de ce que vous faites avec chaque ligne - mais dans le pire des cas, pour un petit fichier et en le lisant simplement avec le lecteur JSON, je vois des performances similaires à celles de la ST avec les paramètres ci-dessous.

J'espère qu'il sera utile à quelqu'un :

def readJSON_line2(linesIn):
  #Function for reading a chunk of json lines
   '''
   Note, this function is nonsensical. A user would never use the approach suggested 
   for reading in a JSON file, 
   its role is to evaluate the MT approach for full line by line processing to both 
   increase speed and reduce memory overhead
   '''
   import json

   linesRtn = []
   for lineIn in linesIn:

       if lineIn.strip() != 0:
           lineRtn = json.loads(lineIn)
       else:
           lineRtn = ""

       linesRtn.append(lineRtn)

   return linesRtn

# -------------------------------------------------------------------
if __name__ == "__main__":
   import multiprocessing as mp

   path1 = "C:\\user\\Documents\\"
   file1 = "someBigJson.json"

   nBuffer = 20*nCPUs  # How many chunks are queued up (so cpus aren't waiting on processes spawning)
   nChunk = 1000 # How many lines are in each chunk
   #Both of the above will require balancing speed against memory overhead

   iJob = 0  #Tracker for SMP jobs submitted into pool
   iiJob = 0  #Tracker for SMP jobs extracted back out of pool

   jobs = []  #SMP job holder
   MTres3 = []  #Final result holder
   chunk = []  
   iBuffer = 0 # Buffer line count
   with open(path1+file1) as f:
      for line in f:

          #Send to the chunk
          if len(chunk) < nChunk:
              chunk.append(line)
          else:
              #Chunk full
              #Don't forget to add the current line to chunk
              chunk.append(line)

              #Then add the chunk to the buffer (submit to SMP pool)                  
              jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
              iJob +=1
              iBuffer +=1
              #Clear the chunk for the next batch of entries
              chunk = []

          #Buffer is full, any more chunks submitted would cause undue memory overhead
          #(Partially) empty the buffer
          if iBuffer >= nBuffer:
              temp1 = jobs[iiJob].get()
              for rtnLine1 in temp1:
                  MTres3.append(rtnLine1)
              iBuffer -=1
              iiJob+=1

      #Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
      if chunk:
          jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
          iJob +=1
          iBuffer +=1

      #And gather up the last of the buffer, including the final chunk
      while iiJob < iJob:
          temp1 = jobs[iiJob].get()
          for rtnLine1 in temp1:
              MTres3.append(rtnLine1)
          iiJob+=1

   #Cleanup
   del chunk, jobs, temp1
   pool.close()

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X