4 votes

Téléchargement de fichiers en mode multithread avec le client HTTP Guzzle : Objets EachPromises vs Pool

À des fins de test, j'ai un tableau de 2000 images URI (chaînes de caractères) que je télécharge de manière asynchrone avec cette fonction. Après quelques recherches sur Internet, des tests et des essais, j'ai trouvé la solution suivante 2 fonctions qui fonctionnent toutes les deux (pour être honnête downloadFilesAsync2 lance un InvalidArgumentException à la dernière ligne).

La fonction downloadFilesAsync2 est basé sur la classe GuzzleHttp \Promise\EachPromise y downloadFilesAsync1 est basé sur le GuzzleHttp \Pool classe.

Les deux fonctions téléchargent assez bien les 2000 fichiers de manière asynchrone, avec la limite de 10 threads en même temps.

Je sais qu'ils fonctionnent, mais rien d'autre. Je me demande si quelqu'un pourrait m'expliquer les deux approches, si l'une est meilleure que l'autre, les implications, etc.

// for the purpose of this question i've reduced the array to 5 files!
$uris = array /
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120.jpg",
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120-01.jpg",
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120-02.jpg",
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120-03.jpg",
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120-04.jpg",
);

function downloadFilesAsync2(array $uris, string $dir, $overwrite=true) {
    $client   = new \GuzzleHttp\Client();
    $requests = array();
    foreach ($uris as $i => $uri) {
        $loc = $dir . DIRECTORY_SEPARATOR . basename($uri);
        if ($overwrite && file_exists($loc)) unlink($loc);
        $requests[] = new GuzzleHttp\Psr7\Request('GET', $uri, ['sink' => $loc]);
        echo "Downloading $uri to $loc" . PHP_EOL;
    }
    $pool = new \GuzzleHttp\Pool($client, $requests, [
        'concurrency' => 10,
        'fulfilled' => function (\Psr\Http\Message\ResponseInterface $response, $index) {
            // this is delivered each successful response
            echo 'success: '.$response->getStatusCode().PHP_EOL;
        },
        'rejected' => function ($reason, $index) {
            // this is delivered each failed request
            echo 'failed: '.$reason.PHP_EOL;
        },
    ]);
    $promise = $pool->promise();  // Start transfers and create a promise
    $promise->wait();   // Force the pool of requests to complete.
}

function downloadFilesAsync1(array $uris, string $dir, $overwrite=true) {
    $client = new \GuzzleHttp\Client();
    $promises = (function () use ($client, $uris, $dir, $overwrite) {
        foreach ($uris as $uri) {
            $loc = $dir . DIRECTORY_SEPARATOR . basename($uri);
            if ($overwrite && file_exists($loc)) unlink($loc);
            yield $client->requestAsync('GET', $uri, ['sink' => $loc]);
            echo "Downloading $uri to $loc" . PHP_EOL;
        }
    })();
    (new \GuzzleHttp\Promise\EachPromise(
        $promises, [
        'concurrency' => 10,
        'fulfilled'   => function (\Psr\Http\Message\ResponseInterface $response) {
            //            echo "\t=>\tDONE! status:" . $response->getStatusCode() . PHP_EOL;
        },
        'rejected'    => function ($reason, $index) {
            echo 'ERROR => ' . strtok($reason->getMessage(), "\n") . PHP_EOL;
        },
    ])
    )->promise()->wait();
}

1voto

Shaun Bramley Points 1457

Tout d'abord, j'aborderai le InvalidArgumentException au sein de la downloadFilesAsync2 méthode. Cette méthode pose en fait deux problèmes. Les deux sont liés à ceci :

$requests[] = $client->request('GET', $uri, ['sink' => $loc]);

Le premier problème est le fait que Client::request() est une méthode utilitaire synchrone qui englobe $client->requestAsync()->wait() . $client->request() retournera une instance de Psr\Http\Message\ResponseInterface En conséquence $requests[] sera en fait peuplé de ResponseInterface mises en œuvre. C'est ce qui, en fin de compte, cause le InvalidArgumentException comme le $requests ne contient pas de Psr\Http\Message\RequestInterface et l'exception est levée à partir de l'intérieur de l'application Pool::__construct() .

Une version corrigée de cette méthode devrait contenir un code qui ressemble plus à.. :

$requests = [
    new Request('GET', 'www.google.com', [], null, 1.1),
    new Request('GET', 'www.ebay.com', [], null, 1.1),
    new Request('GET', 'www.cnn.com', [], null, 1.1),
    new Request('GET', 'www.red.com', [], null, 1.1),
];

$pool = new Pool($client, $requests, [
    'concurrency' => 10,
    'fulfilled' => function(ResponseInterface $response) {
        // do something
    },
    'rejected' => function($reason, $index) {
        // do something error handling
    },
    'options' => ['sink' => $some_location,],
]);

$promise = $pool->promise();
$promise->wait();

Pour répondre à votre deuxième question, "Quelle est la différence entre ces deux méthodes", la réponse est simple, il n'y en a pas. Pour expliquer cela, laissez-moi copier et coller Pool::__construct() :

/**
 * @param ClientInterface $client   Client used to send the requests.
 * @param array|\Iterator $requests Requests or functions that return
 *                                  requests to send concurrently.
 * @param array           $config   Associative array of options
 *     - concurrency: (int) Maximum number of requests to send concurrently
 *     - options: Array of request options to apply to each request.
 *     - fulfilled: (callable) Function to invoke when a request completes.
 *     - rejected: (callable) Function to invoke when a request is rejected.
 */
public function __construct(
    ClientInterface $client,
    $requests,
    array $config = []
) {
    // Backwards compatibility.
    if (isset($config['pool_size'])) {
        $config['concurrency'] = $config['pool_size'];
    } elseif (!isset($config['concurrency'])) {
        $config['concurrency'] = 25;
    }

    if (isset($config['options'])) {
        $opts = $config['options'];
        unset($config['options']);
    } else {
        $opts = [];
    }

    $iterable = \GuzzleHttp\Promise\iter_for($requests);
    $requests = function () use ($iterable, $client, $opts) {
        foreach ($iterable as $key => $rfn) {
            if ($rfn instanceof RequestInterface) {
                yield $key => $client->sendAsync($rfn, $opts);
            } elseif (is_callable($rfn)) {
                yield $key => $rfn($opts);
            } else {
                throw new \InvalidArgumentException('Each value yielded by '
                    . 'the iterator must be a Psr7\Http\Message\RequestInterface '
                    . 'or a callable that returns a promise that fulfills '
                    . 'with a Psr7\Message\Http\ResponseInterface object.');
            }
        }
    };

    $this->each = new EachPromise($requests(), $config);
}

Maintenant, si nous comparons cela à une version simplifiée du code dans le downloadFilesAsync1 méthode :

$promises = (function () use ($client, $uris) {
    foreach ($uris as $uri) {
        yield $client->requestAsync('GET', $uri, ['sink' => $some_location]);
    }
})();
(new \GuzzleHttp\Promise\EachPromise(
    $promises, [
    'concurrency' => 10,
    'fulfilled'   => function (\Psr\Http\Message\ResponseInterface $response) {
        // do something
    },
    'rejected'    => function ($reason, $index) {
        // do something
    },
])
)->promise()->wait();

Dans les deux exemples, il y a un générateur qui produit des promesses qui se résolvent en instances de ResponseInterface et ce générateur, ainsi que le tableau de configuration (appelable rempli, appelable rejeté, concurrence), est également introduit dans une nouvelle instance de EachPromise .

En résumé :

  1. downloadFilesAsync1 est fonctionnellement la même chose que d'utiliser Pool mais sans la vérification des erreurs qui a été intégrée dans Pool::__construct() .

  2. Il y a quelques erreurs dans downloadFilesAsync2 qui fera en sorte que les fichiers soient téléchargés de manière synchrone avant de recevoir un message d'erreur. InvalidArgumentException lorsque le Pool est instancié.

Ma seule recommandation est la suivante : utilisez celui qui vous semble le plus intuitif.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X