3 votes

Comment traiter chaque ligne/buffer de manière synchrone lors de la lecture asynchrone d'un fichier à l'aide d'un flux ?

Comme vous pouvez le voir, j'ai un js qui prend un .csv et appelle une fonction asynchrone pour chaque ligne (4 fonctions différentes de manière itérative).

Le problème est que je dois attendre la fin de la fonction dans la i-ème itération avant de procéder à la i+1 itération .

const csv = require('csv-parser');
const fs = require('fs');

var i=1;

fs.createReadStream('table.csv')
  .pipe(csv())
  .on('data', (row) => {
      switch(i%4){
          case 1: org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 2: org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 3: org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 0: org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      }
    i++;
  })
  .on('end', () => {
    console.log('CSV file successfully processed');
  });

  async function org1createPatient(patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
    ...
  }

  async function org2createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
    ...
  }

  async function org3createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
   ...
  }

  async function org4createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
   ...
  }

Comment puis-je obtenir ce que je veux ? J'espère que ma question est suffisamment claire !

2voto

vbuzze Points 359

Le site readStream que vous utilisez ici est asynchrone, c'est-à-dire que .on(event, callback) se déclenchera chaque fois qu'un nouveau morceau de données est lu, indépendamment de toute callback déclenché. En d'autres termes, l'exécution de la callback ici n'a pas d'impact sur ce processus, il sera exécuté en parallèle, à chaque fois que event reçu.

Cela signifie qu'en cas callback a été d'exécuter un morceau de code qui est asynchrone, vous pouvez très bien vous retrouver dans une situation où plusieurs instances de cette fonction peuvent encore être en cours d'exécution au moment où la prochaine lecture de la fonction event est reçu.

Remarque : ceci est valable pour tout événement, y compris le 'end' événement.

Si vous deviez utiliser async/await en callback si cela rendait seulement la logique interne de cette fonction synchrone. Cela n'aurait toujours pas d'impact sur la vitesse à laquelle vos données sont lues.

Pour ce faire, vous devez utiliser les deux éléments suivants async/await en callback (pour le rendre synchrone en interne) et avoir callback interrompre et reprendre manuellement l'opération de lecture qui se déroule en parallèle.

const csv = require('csv-parser');
const fs = require('fs');

let i = 1;

const stream = fs.createReadStream('table.csv').pipe(csv());

stream.on('data', async (row) => {
   // pause overall stream until this row is processed
   stream.pause();

   // process row
   switch (i%4){
      case 1: await org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      case 2: await org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      case 3: await org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      case 0: await org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
   }
   i++;

   // resume overall stream
   stream.resume();
});

stream.on('end', () => {
  // now guaranteed that no instances of `callback` is still running in parallel when this event is fired
  console.log('CSV file successfully processed');
});

1voto

user1102051 Points 152

La solution ci-dessous utilise iter-ops qui est très efficace dans ce cas, car pipe(csv()) renvoie un AsyncIterable Il doit donc être traité en conséquence.

Puisque vous ne vous souciez pas de ce que ces fonctions de traitement retournent, nous pouvons simplement papillon des gaz le traitement pour chaque ligne :

const {pipe, throttle, onEnd, catchError} = require('iter-ops');
const csv = require('csv-parser');
const fs = require('fs');

const asyncIterable = fs.createReadStream('table.csv').pipe(csv());

const i = pipe(
    asyncIterable,
    throttle(async (row, index) => {
        switch (index % 4) {
            case 1: await org1createPatient(row.patientId, ...); break;
            case 2: await org2createPatient(row.patientId, ...); break;
            case 3: await org3createPatient(row.patientId, ...); break;
            case 0: await org4createPatient(row.patientId, ...); break;
            default: break;
        }
    }),
    onEnd(s => {
        console.log(`Completed ${s.count} rows, in ${s.duration}ms`);
    }),
    catchError((err, ctx) => {
        console.log(`Failed on row with index ${ctx.index}:`, err);
        throw err; // to stop the iteration
    })
);

async function processCSV() {
    // this will trigger the iteration:
    for await(const a of i) {
        // iterate and process the CSV
    }
}

P.S. Je suis l'auteur de iter-ops .

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X