5 votes

Gestionnaire d'événements à tarif limité dans erlang/OTP

J'ai une source de données qui produit des points à un taux potentiellement élevé, et j'aimerais effectuer une opération qui peut prendre du temps sur chaque point ; mais je voudrais aussi que le système se dégrade gracieusement lorsqu'il est surchargé, en abandonnant les points de données en excès.

Pour autant que je sache, l'utilisation d'un gen_event ne permet jamais de sauter des événements. Conceptuellement, ce que je voudrais que le gen_event fasse, c'est abandonner tous les événements en attente, sauf les plus récents, avant de relancer les gestionnaires.

Y a-t-il un moyen de faire cela avec l'OTP standard ? ou y a-t-il une bonne raison pour laquelle je ne devrais pas procéder de cette façon ?

Jusqu'à présent, la meilleure solution que j'ai trouvée consiste à utiliser un gen_server et à compter sur le délai d'attente pour déclencher les événements coûteux :

-behaviour(gen_server).
init() -> 
    {ok, Pid} = gen_event:start_link(),
    {ok, {Pid, none}}.

handle_call({add, H, A},_From,{Pid,Data}) ->
    {reply, gen_event:add_handler(Pid,H,A), {Pid,Data}}.

handle_cast(Data,{Pid,_OldData}) -> 
    {noreply, {Pid,Data,0}}.  % set timeout to 0 

handle_info(timeout, {Pid,Data}) ->
    gen_event:sync_notify(Pid,Data),
    {noreply, {Pid,Data}}.

Cette approche est-elle correcte ? (notamment en ce qui concerne la supervision ? )

1voto

ellisbben Points 3213

Je ne peux pas commenter la supervision, mais j'implémenterais ceci comme une file d'attente avec des éléments expirant.

J'ai mis en place quelque chose que vous pouvez utiliser ci-dessous.

J'en ai fait un gen_server ; lorsque vous le créez, vous lui donnez un âge maximum pour les vieux objets.

Son interface permet d'envoyer des éléments à traiter et de demander des éléments qui n'ont pas été mis en file d'attente. Il enregistre l'heure à laquelle il reçoit chaque élément. Chaque fois qu'il reçoit un élément à traiter, il vérifie tous les éléments de la file d'attente, les retire de la file et rejette ceux qui sont plus anciens que l'âge maximum. (Si vous voulez que l'âge maximum soit toujours respecté, vous pouvez filtrer la file d'attente avant de renvoyer les éléments en file d'attente)

Votre source de données diffusera des données ( {process_this, Anything} ) à la file d'attente de travail et votre processus consommateur (potentiellement lent) appellera ( gimme ) pour obtenir des données.

-module(work_queue).
-behavior(gen_server).

-export([init/1, handle_cast/2, handle_call/3]).

init(DiscardAfter) ->
  {ok, {DiscardAfter, queue:new()}}.

handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
  Instant = now(),
  Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
  Queue2 = queue:in({Instant, Data}, Queue1),
  {noreply, {DiscardAfter, Queue2}}.

handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
  case queue:is_empty(Queue0) of
    true ->
      {reply, no_data, State};
    false ->
      {{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
      {reply, {data, Data}, {DiscardAfter, Queue1}}
  end.

delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
  ((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.

too_old(Stamp, Instant, DiscardAfter) ->
  delta(Stamp, Instant) > DiscardAfter.

Petite démo au REPL :

c(work_queue).
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).         
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),      
timer:sleep(11 * 1000),                                                
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.

0voto

Daniel Points 491

Y a-t-il un moyen de faire cela avec un OTP standard ?

Non.

Y a-t-il une bonne raison pour laquelle je ne devrais pas procéder de cette façon ?

Non, un arrêt précoce peut augmenter les performances de l'ensemble du système. Lisez comment aquí .

Cette approche est-elle correcte ? (notamment en ce qui concerne la supervision ? )

Aucune idée, vous n'avez pas fourni le code de supervision.


En guise d'information supplémentaire à votre première question :

Si vous pouvez utiliser des bibliothèques tierces en dehors d'OTP, il en existe quelques-unes qui peuvent ajouter des délais d'attente préemptifs, ce qui est ce que vous décrivez.

Il y en a deux que je connais bien : le premier est le suivant dépouillement et le second est poussin (Je suis l'auteur du poussin, je vais essayer de ne pas faire de publicité pour le projet ici).

Dispcount fonctionne très bien pour les ressources uniques qui n'ont qu'un nombre limité de tâches à exécuter en même temps et ne fait pas de mise en file d'attente. Vous pouvez lire à ce sujet aquí ( avertissement beaucoup d'informations vraiment intéressantes).

Dispcount n'a pas fonctionné pour moi car j'aurais dû créer plus de 4000 pools de processus pour gérer la quantité de files d'attente différentes dans mon application. J'ai écrit chick parce que j'avais besoin d'un moyen d'augmenter et de diminuer dynamiquement la longueur de ma file d'attente, ainsi que d'être capable de mettre en file d'attente des demandes et d'en refuser d'autres, sans avoir à créer plus de 4000 pools de processus.

Si j'étais vous, j'essaierais d'abord discount (car la plupart des solutions n'ont pas besoin de chick), puis si vous avez besoin de quelque chose d'un peu plus dynamique qu'un pool qui peut répondre à un certain nombre de demandes, essayez chick.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X