8 votes

Comment extraire l'heure de publication de Google PubSub dans Apache Beam ?

Mon objectif est de pouvoir accéder à l'heure de publication des messages PubSub telle qu'elle est enregistrée et définie par Google PubSub dans Apache Beam (Dataflow).

    PCollection<PubsubMessage> pubsubMsg
            = pipeline.apply("Read Messages From PubSub",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(pocOptions.getInputSubscription()));

Il ne semble pas en contenir comme attribut. J'ai essayé

 .withTimestampAttribute("publish_time")

Pas de chance non plus. Que me manque-t-il ? Est-il possible d'extraire l'heure de publication de Google PubSub dans le flux de données ?

19voto

Guillem Xercavins Points 3165

Version Java :

PubsubIO lira le message de Pub/Sub et attribuera l'heure de publication du message à l'élément comme horodatage de l'enregistrement. Par conséquent, vous pouvez y accéder en utilisant ProcessContext.timestamp() . A titre d'exemple :

p
    .apply("Read Messages", PubsubIO.readStrings().fromSubscription(subscription))
    .apply("Log Publish Time", ParDo.of(new DoFn<String, Void>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            LOG.info("Message: " + c.element());
            LOG.info("Publish time: " + c.timestamp().toString());
            Date date= new Date();
            Long time = date.getTime();
            LOG.info("Processing time: " + new Instant(time).toString());
        }
    }));

J'ai publié un message un peu à l'avance (pour avoir une différence significative entre l'événement et le temps de traitement) et le résultat avec DirectRunner était :

Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Message: I published this message a little bit before
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Publish time: 2019-03-27T09:57:07.005Z
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Processing time: 2019-03-27T10:03:08.229Z

Code minimal aquí


Version Python :

Maintenant, l'horodatage est accessible par DoFn.TimestampParam de la process méthode ( docs ):

class GetTimestampFn(beam.DoFn):
  """Prints element timestamp"""
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    logging.info(">>> Element timestamp: %s", timestamp_utc.strftime("%Y-%m-%d %H:%M:%S"))
    yield element

Note : analyse de la date grâce à cette réponse .

Salida:

INFO:root:>>> Element timestamp: 2019-08-12 20:16:53

Full code

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X