7 votes

Quel est le bon Clojure construct pour la concurrence pour maintenir une file de données à traiter?

Si je veux maintenir une file d'images côté serveur que j'enverrai au client, quelle structure de données devrais-je utiliser ?

Je essaie de créer une application simple où j'enverrai des images au serveur et le serveur les enverra à d'autres clients.

Devrais-je maintenir cette file comme un atom ou comme une ref ?

6voto

Rörd Points 4728

Vous pourriez simplement utiliser l'une des classes de file d'attente de java.util.concurrent. Un accès facile à la bibliothèque standard Java est après tout l'un des points forts de Clojure, vous n'avez donc pas à tout construire vous-même à partir des blocs de construction fournis par Clojure si Java fournit déjà quelque chose qui fait le travail.

Je suggérerais de choisir quelque chose parmi les implémentations de l'interface BlockingQueue.

2voto

vemv Points 2212

L'opération la plus importante d'une file d'attente est de retirer des éléments : c'est-à-dire obtenir et supprimer un élément de la collection.

Comme cette opération est composite, les références sont mieux adaptées pour la réaliser de manière atomique, c'est-à-dire empêcher les conditions de concurrence (par exemple, deux threads obtenant le même élément).

(defn remove-and-get [queue]
  (dosync
   (let [i (peek @queue)]
     (alter queue pop)
     i)))

(def q (ref (clojure.lang.PersistentQueue/EMPTY)))

(dosync
 (commute q conj 42)
 (commute q conj :foo)
 (commute q conj []))

[(seq @q) (remove-and-get q) (seq @q)]
;; évalué à [(42 :foo []) 42 (:foo [])]

Une fonctionnalité équivalente peut également être implémentée en termes d'atomes.

(defn remove-and-get [queue]
  (let [snapshot @queue
        i (peek snapshot)]
    (if (compare-and-set! queue snapshot (pop snapshot))
      i
      (recur queue))))

(def q (atom (clojure.lang.PersistentQueue/EMPTY)))

(swap! q conj 42)
(swap! q conj :foo)
(swap! q conj [])

[(seq @q) (remove-and-get q) (seq @q)]

2voto

monoid Points 1620

Semble être une possibilité pour core.async.

2voto

olieidel Points 847

Danger! Ce qui ne fonctionnera pas

Même si Clojure vous offre d'excellentes structures de données et primitives, cela ne vous sauvera pas de la création de conditions de concurrence. Découvrez ces exemples qui ne fonctionneront pas :

;; NE FONCTIONNERA PAS

(def queue (atom '(:foo :bar :baz :qux)))

;; le code ci-dessous est appelé à partir de threads

;; prenez la première valeur de la "queue", en déréférençant

(let [peeked-val (first @queue)]
  (do-something-with peeked-val)
  ;; mettez à jour l'atome pour supprimer la première valeur
  ;; DANGER: Vous avez déréférencé la valeur ci-dessus mais vous la swappez en une étape distincte
  ;; (ici). D'autres threads peuvent avoir lu la même valeur ou l'ont potentiellement mutée
  ;; entre-temps!
  (swap! queue rest))

Et les refs?

;; NE FONCTIONNERA PAS

(def queue (ref '(:foo :bar :baz :qux)))

;; le code ci-dessous est appelé à partir de threads

;; prenez la première valeur de la "queue", en déréférençant, cette fois dans une transaction!

(dosync
  (let [peeked-val (first @queue)]
    (do-something-with peeked-val)
    ;; mettez à jour le ref pour supprimer la première valeur dans la transaction
    ;; DANGER: Les Refs vous offrent une cohérence transactionnelle (c'est-à-dire la cohérence
    ;; entre l'accès en lecture/écriture à d'autres refs) mais cela ne s'applique pas vraiment
    ;; ici car nous n'avons qu'un seul ref. D'autres threads peuvent avoir lu la même valeur
    ;; ou l'ont potentiellement mutée entre-temps!
    (alter queue rest)))

Résolvez-le en Clojure

Vous pouvez accomplir cela avec les structures de données et les atomes de Clojure. La clé est d'utiliser swap-vals! pour ne toucher l'atome qu'une seule fois - sinon vous rencontrerez des conditions de concurrence car vous avez deux opérations comme dans l'exemple ci-dessus : Déréférencer l'atome (obtenir sa valeur) et le swap (changer sa valeur).

(def queue (atom '(:foo :bar :baz :qux)))

;; utilisez `swap-vals!` sur les atomes pour obtenir à la fois les anciennes et nouvelles valeurs de l'atome.
;; parfait si vous voulez peek (obtenir la première valeur) tout en faisant un pop (supprimer
;; la première valeur de l'atome, modifiant ainsi son état)

;; utilisez le code ci-dessous dans des threads (ou quelque part dans core.async)
;; il est quelque peu non-idiomatique d'utiliser `rest` et `first`, consultez le texte ci-dessous pour
;; d'autres options

(let [[old new] (swap-vals! queue rest)]
  (println "voici la valeur extraite :" (first old)))

Vous pouvez également utiliser une PersistentQueue à la place, construisez-la avec (clojure.lang.PersistentQueue/EMPTY) - ensuite vous pouvez appeler peek et pop dessus au lieu de first et rest. N'oubliez pas de la mettre dans un atome comme la liste ci-dessus :)

Utilisez les Structures de Données Java

Vous pourriez aussi utiliser quelque chose comme un java.util.concurrent.LinkedBlockingDeque. Regardez ce commit qui introduit des fonctionnalités de compilation parallèle au compilateur ClojureScript pour un exemple d'utilisation d'un LinkedBlockingDeque.

0voto

subsub Points 1344

Vous pourriez essayer agents, l'idée étant la suivante:

Pour chaque client, vous avez un agent, et vous envoyez simplement la commande transmit-frame au client. Puisque les actions sur les agents sont exécutées dans l'ordre FIFO (du moins tant que vous n'avez qu'un seul thread d'envoi).

 (send-off client transmit-frame frame)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X