3 votes

Rx.Net - Obtenez les changements de prix des actions et traitez-les

Le problème que j'essaie de résoudre

  1. Obtenir des tics d'actions
  2. Pensez toujours à dernier site cours de l'action
  3. Chaque x deuxième prendre un instantané de tiques et envoyer pour traitement

Donc j'ai un Observable source de tiques de stock. Il n'envoie que les ticks des actions qui m'intéressent. Ce que j'ai besoin de faire, c'est de recevoir les prix de ces actions, et après chaque x secondes (pour les besoins de l'exemple, disons toutes les 3 secondes), envoyer une instantané de prix pour le traitement. Si, dans les 3 secondes, je reçois 2 ticks pour la même action, je n'ai besoin que du dernier tick. Ce traitement est lourd en calcul, donc si possible j'aimerais éviter d'envoyer deux fois le même prix de l'action pour traitement.

Pour prendre un exemple.

Disons qu'au début de la séquence j'ai reçu 2 ticks -> MSFT:1$, GOOG:2$. .

Dans les 3 secondes suivantes, je ne reçois rien, donc MSFT ET GOOG les tiques doivent être envoyées pour être traitées.

La seconde suivante, je reçois de nouveaux ticks -> MSFT:1$, GOOG:3$, INTL:3$

Encore une fois, supposons que rien n'arrive dans les 3 secondes qui suivent.

Ici, puisque MSFT Le prix n'a pas changé (il est toujours de 1$), seulement GOOG & INTL doit être envoyé pour traitement.

Et cela se répète tout au long de la journée.

Maintenant, je pense que Rx aide à résoudre ce genre de problèmes de manière simple et élégante. Mais j'ai un problème pour avoir les requêtes appropriées. Voici ce que j'ai fait jusqu'à présent, je vais essayer d'expliquer ce qu'il fait et quel est le problème avec lui.

var finalQuery =
               from priceUpdate in **Observable<StockTick>**
               group priceUpdate by priceUpdate.Stock into grouped
               from combined in Observable.Interval(TimeSpan.FromSeconds(3))
                      .CombineLatest(grouped, (t, pei) => new { PEI = pei, Interval = t })
               group combined by new { combined.Interval } into combined
               select new
               {
                   Interval = combined.Key.Interval,
                   PEI = combined.Select(c => new StockTick(c.PEI.Stock, c.PEI.Price))
               };

            finalQuery
                .SelectMany(combined => combined.PEI)
                .Distinct(pu => new { pu.Stock, pu.Price })
                .Subscribe(priceUpdate =>
                {
                    Process(priceUpdate);
                });

public class StockTick
{
   public StockTick(string stock, decimal price)
   {    
      Stock = stock;
      Price = price;
   }
   public string Stock {get;set;}
   public decimal Price {get;set;}
}

Ça récupère le prix de l'action, le regroupe par action, puis combine les dernières données de ce groupe.

ed avec Observable.Interval . De cette façon, j'essaie de m'assurer que seuls les derniers ticks d'une action sont traités et qu'il se déclenche toutes les 3 secondes.

Cette fois encore, il les regroupe par intervalle, ce qui fait que j'ai un groupe de séquences pour chaque intervalle de 3 secondes qui s'est écoulé.

Et comme dernière étape, j'aplatis cette séquence en une séquence de mises à jour du prix des actions en utilisant SelectMany et je demande aussi Distinct pour s'assurer que le même prix pour le même stock n'est pas traité deux fois.

Il y a deux problèmes avec cette requête que je n'aime pas. Premièrement, je n'aime pas vraiment les doubles regroupements - y a-t-il un moyen de les éviter ? Deuxièmement, avec cette approche, je dois traiter les prix un par un, ce que j'aimerais vraiment avoir est instantanés - c'est à dire dans les 3 secondes, quoi que j'aie, je vais m'attacher et l'envoyer pour traitement, mais je n'arrive pas à trouver comment se mettre au boulot .

Je serai heureux de recevoir des suggestions pour résoudre ce problème d'une autre manière, mais je préfère rester dans le cadre de la Rx, à moins qu'il n'y ait vraiment quelque chose de beaucoup mieux.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X