3 votes

Consommez des messages SQS à partir de Celery

J'ai une implémentation Celery dans mon application Python. Le courtier que j'utilise est SQS. Les messages qui vont à SQS proviennent d'une autre application via l'api send_message() de Boto3. Maintenant, ma confusion est de savoir comment déclencher Celery pour prendre le message de SQS à traiter. Il y aura des tâches qui s'exécuteront dans Celery qui devraient traiter les messages de SQS, n'est-ce pas. Mon besoin est similaire à Celery Consumer SQS Messages.

D'après ce que je comprends, Celery scrute SQS jusqu'à ce que les messages arrivent là-bas. Est-ce que quelqu'un peut m'aider avec cela?

1voto

user2176999 Points 33

J'appelle cette tâche toutes les 20 secondes :

@app.task(name='listen_to_sqs_telemetry')
def listen_to_sqs_telemetry():
    logger.info('démarrer listen_to_telemetry')
    sqs = get_sqs_client()
    queue_url = 'https://sqs.us-east-2.amazonaws.com/xxx'
    logger.info('Utilisation de ' + queue_url)

    keep_going = True
    num = 0
    while keep_going:
        keep_going = False
        try:
            response = sqs.receive_message(
                QueueUrl=queue_url,
                AttributeNames=[
                    'SentTimestamp',
                ],
                MaxNumberOfMessages=5,
                MessageAttributeNames=[
                    'All'
                ],
                WaitTimeSeconds=20
            )
            # logger.info(response)
            if 'Messages' in response:
                keep_going = True
                for rec in response['Messages']:
                    # Traiter le message
                    sqs.delete_message(
                        QueueUrl=queue_url,
                        ReceiptHandle=rec['ReceiptHandle']
                    )
                    num = num + 1
            else:
                pass
                # logger.info(response)
        except Exception as e:
            logger.error(str(e))
    logger.info('terminé avec listen_to_sqs_telemetry')
    return "Traité {} message(s)".format(num)

0voto

kta Points 4702

Si je vous comprends bien, essayez d'exécuter le travailleur en tant que démon. Utilisez un outil comme supervisord pour le faire.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X