61 votes

Hash simultané Pure-Ruby

Quelle est la meilleure façon de mettre en œuvre un Hachage qui peut être modifié entre plusieurs threads, mais avec le plus petit nombre de verrous. Pour les fins de cette question, vous pouvez supposer que le Hachage sera lu-lourds. Il doit être thread-safe dans tous les Rubis des mises en œuvre, y compris ceux qui opèrent dans un véritable simultanée de la mode, tels que JRuby, et il doit être écrit dans le plus pur Rubis (pas le C ou le Java est autorisé).

N'hésitez pas à soumettre un naïf solution qui verrouille toujours, mais qui n'est probablement pas la meilleure solution. Points pour l'élégance, mais une plus petite probabilité de blocage des victoires sur les petites code.

22voto

ara t howard Points 224

Bon, maintenant que vous avez spécifié le sens de "thread-safe', voici les deux implémentations. Le code suivant s'exécute indéfiniment dans l'IRM et JRuby. Le lockless mise en œuvre suit un éventuel modèle de cohérence où chaque thread dispose de son propre point de vue de la valeur de hachage si le maître est en perpétuelle évolution. Il y a un peu de ruse nécessaire pour s'assurer que le stockage de toutes les informations dans le thread n'a pas de fuite de mémoire, mais c'est manipulés et testés ― processus de taille ne pousse pas l'exécution de ce code. Les deux implémentations aurait besoin de plus de travail pour être "complète", ce qui signifie supprimer, mettre à jour, etc. besoin d'un peu de réflexion, mais l'un des deux concepts ci-dessous seront répondre à vos exigences.

Il est très important pour les gens qui lisent ce thread pour réaliser l'ensemble de la question est exclusif à JRuby ― en IRM intégré de Hachage est suffisant.

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if $0 == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end

10voto

josh Points 308

Publier une solution de base / naïve, juste pour renforcer mon crédit Stack Overflow:

 require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end
 

7voto

josh Points 308

Yehuda, je pense que vous avez mentionné qu'IVAR était atomique? Qu'en est-il d'une simple copie et swap alors?

 require 'thread'

class ConcurrentHash
  def initialize
    @reader, @writer = {}, {}
    @lock = Mutex.new
  end

  def [](key)
    @reader[key]
  end

  def []=(key, value)
    @lock.synchronize {
      @writer[key] = value
      @reader, @writer = @writer, @reader
      @writer[key] = value
    }
  end
end
 

4voto

Michael Sofaer Points 2139

C'est une classe wrapper autour de Hachage qui permet aux lecteurs simultanés, mais les verrous les choses pour tous les autres types d'accès (y compris réitéré au lit).

class LockedHash
  def initialize
    @hash = Hash.new
    @lock = ThreadAwareLock.new()
    @reader_count = 0
  end

  def [](key)
    @lock.lock_read
    ret = @hash[key]
    @lock.unlock_read
    ret
  end

  def []=(key, value)
    @lock.lock_write
    @hash[key] = value
    @lock.unlock_write
  end

  def method_missing(method_sym, *arguments, &block)
    if @hash.respond_to? method_sym
      @lock.lock_block
      val = lambda{@hash.send(method_sym,*arguments, &block)}.call
      @lock.unlock_block
      return val
    end
    super
  end
end

Voici le code de verrouillage qu'il utilise:

class RWLock
  def initialize
    @outer = Mutex.new
    @inner = Mutex.new
    @reader_count = 0
  end
  def lock_read
    @outer.synchronize{@inner.synchronize{@reader_count += 1}}
  end
  def unlock_read
    @inner.synchronize{@reader_count -= 1}
  end
  def lock_write
    @outer.lock
    while @reader_count > 0 ;end
  end
  def unlock_write
    @outer.unlock
  end
end

class ThreadAwareLock < RWLock
  def initialize
    @owner = nil
    super
  end
  def lock_block
    lock_write
    @owner = Thread.current.object_id
  end
  def unlock_block
    @owner = nil
    unlock_write
  end
  def lock_read
    super unless my_block?
  end
  def unlock_read
    super unless my_block?
  end
  def lock_write
    super unless my_block?
  end
  def unlock_write
    super unless my_block?
  end
  def my_block?
    @owner == Thread.current.object_id
  end
end

Le thread courant de verrouillage est de vous permettre de verrouiller la classe une fois, et appeler les méthodes qui seraient normalement de verrouillage, et de ne pas se verrouiller. Vous avez besoin de cela, parce que vous le rendement en blocs à l'intérieur de certaines méthodes, et ces blocs peut appeler les méthodes de verrouillage sur l'objet, et vous ne voulez pas d'un blocage ou d'une double erreur de verrouillage. Vous pouvez utiliser un comptage de verrouillage de la place pour cet.

Voici une tentative de mettre en œuvre seau niveau des verrous en lecture / écriture:

class SafeBucket
  def initialize
    @lock = RWLock.new()
    @value_pairs = []
  end

  def get(key)
    @lock.lock_read
    pair = @value_pairs.select{|p| p[0] == key}
    unless pair && pair.size > 0
      @lock.unlock_read
      return nil
    end
    ret = pair[0][1]
    @lock.unlock_read
    ret
  end

  def set(key, value)
    @lock.lock_write
    pair = @value_pairs.select{|p| p[0] == key}
    if pair && pair.size > 0
      pair[0][1] = value
      @lock.unlock_write
      return
    end
    @value_pairs.push [key, value]
    @lock.unlock_write
    value
  end

  def each
    @value_pairs.each{|p| yield p[0],p[1]}
  end

end

class MikeConcurrentHash
  def initialize
    @buckets = []
    100.times {@buckets.push SafeBucket.new}
  end

  def [](key)
    bucket(key).get(key)
  end

  def []=(key, value)
    bucket(key).set(key, value)
  end

  def each
    @buckets.each{|b| b.each{|key, value| yield key, value}}
  end

  def bucket(key)
    @buckets[key.hash % 100]
  end
end

J'ai arrêté de travailler sur ce point car il est trop lent, donc chaque méthode est dangereuse (permet de mutations par d'autres threads au cours d'une itération) et il ne prend pas en charge la plupart des méthodes de hachage.

Et voici un harnais de test simultané de hachages:

require 'thread'
class HashHarness
  Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
          :that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
    	  :for, :all, :ruby, :implementations]

  def self.go
    h = new
    r = h.writiness_range(20, 10000, 0, 0)
    r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
    return
  end
  def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
    @classes = classes
  end
  def writiness_range(basic_threads, ops, each_threads, loops)
    result = {}
    @classes.each do |hash_class|
      res = []
      0.upto 10 do |i|
        writiness = i.to_f / 10
        res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
      end
      result[hash_class.name] = res
    end
    result
  end
  def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
    time = Time.now
    threads = []
    hash = hash_class.new
    populate_hash(hash)
    begin
    basic_threads.times do
      threads.push Thread.new{run_basic_test(hash, writiness, ops)}
    end
    each_threads.times do
      threads.push Thread.new{run_each_test(hash, writiness, loops)}
    end
    threads.each{|t| t.join}
    rescue ThreadError => e
      p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
      return -1
    end
    p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
    return Time.now - time
  end
  def run_basic_test(hash, writiness, ops)
    ops.times do
      rand < writiness ? hash[choose_key]= rand : hash[choose_key]
    end
  end
  def run_each_test(hash, writiness, loops)
    loops.times do
      hash.each do |k, v|
        if rand < writiness
    	  each_write_work(hash, k, v)
    	else
    	  each_read_work(k, v)
    	end
      end
    end
  end
  def each_write_work(hash, key, value)
    hash[key] = rand
  end
  def each_read_work(key, value)
    key.to_s + ": " + value.to_s
  end
  def choose_key
    Keys[rand(Keys.size)]
  end
  def populate_hash(hash)
    Keys.each{|key| hash[key]=rand}  
  end
end

Numéros: Jruby

Writiness      0.0   0.1   0.2   0.3   0.4   0.5   0.6   0.7   0.8   0.9   1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash     1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash           0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001

Et l'IRM

Writiness      0.0    0.1    0.2    0.3    0.4    0.5    0.6    0.7    0.8    0.9    1.0
ConcurrentHash  9.214  9.913  9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash     19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash            0.535  0.537  0.534  0.599  0.594  0.676  0.635  0.650  0.654  0.661  0.692

L'IRM chiffres sont assez frappante. Verrouillage de l'IRM, ça craint vraiment.

2voto

ms-tg Points 1087

Ce pourrait être un cas d'utilisation pour le hamster gem

Hamster met en œuvre de Hachage Tableau Mappé Tente (HAMT), ainsi que quelques autres persistante des structures de données, dans le plus pur Ruby.

Persistante des structures de données sont immuables, et au lieu de la mutation (changement) de la structure, par exemple par l'ajout ou le remplacement d'une paire clé-valeur dans une table de Hachage, vous, au lieu de retourner une nouvelle structure de données qui contient le changement. Le truc, avec la persistance de l'immuable structures de données, est que le nouveau retourné structure de données ré-utilise plus de l'prédécesseur que possible.

Je pense que pour mettre en œuvre à l'aide de hamster, vous devez utiliser leur mutable hachage wrapper, qui passe toutes les lectures de la valeur actuelle de la persistance de l'immuable de hachage (par exemple, devrait être rapide), tout en gardant toutes les écritures avec un mutex, et d'échanger sur la nouvelle valeur de la persistance de l'immuable de hachage après l'écriture.

Par exemple:

require 'hamster'
require 'hamster/experimental/mutable_hash'    
hsh = Hamster.mutable_hash(:name => "Simon", :gender => :male)

# reading goes directly to hash
puts hsh[:name] # Simon

# writing is actually swapping to new value of underlying persistent data structure
hsh.put(:name, "Joe")
puts hsh[:name] # Joe

Donc, nous allons l'utiliser pour un même type de problème à celui décrit:

(résumé ici)

require 'hamster'
require 'hamster/experimental/mutable_hash'

# a bunch of threads with a read/write ratio of 10:1
num_threads = 100
num_reads_per_write = 10
num_loops = 100 
hsh = Hamster.mutable_hash

puts RUBY_DESCRIPTION
puts "#{num_threads} threads x #{num_loops} loops, #{num_reads_per_write}:1 R/W ratio"

t0 = Time.now
Thread.abort_on_exception = true
threads = (0...num_threads).map do |n|
  Thread.new do
    write_key = n % num_reads_per_write
    read_keys = (0...num_reads_per_write).to_a.shuffle # random order
    last_read = nil

    num_loops.times do
      read_keys.each do |k|
        # Reads
        last_read = hsh[k]

        Thread.pass

        # Atomic increments in the correct ratio to reads
        hsh.put(k) { |v| (v || 0) + 1 } if k == write_key
      end
    end
  end
end

threads.map { |t| t.join }
t1 = Time.now

puts "Error in keys" unless (0...num_reads_per_write).to_a == hsh.keys.sort.to_a
puts "Error in values" unless hsh.values.all? { |v| v == (num_loops * num_threads) / num_reads_per_write }
puts "Time elapsed: #{t1 - t0} s"

J'obtiens les résultats suivants:

ruby 1.9.2p320 (2012-04-20 revision 35421) [x86_64-linux]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 5.763414627 s

jruby 1.7.0 (1.9.3p203) 2012-10-22 ff1ebbe on Java HotSpot(TM) 64-Bit Server VM 1.6.0_26-b03 [linux-amd64]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 1.697 s

Que pensez-vous de cela?

Cette solution est plus similaire à la façon dont on peut résoudre ce en Scala ou Clojure, bien que dans ces langues qu'on serait plus susceptible d'être à l'aide de logiciels de la mémoire transactionnelle avec faible niveau de support CPU pour le atomique de comparer et d'opérations de swap qui sont mis en œuvre.

Edit: Il est intéressant de noter que l'une des raisons du hamster de la mise en œuvre est rapide, c'est qu'il dispose d'un lock-lire gratuitement chemin. Merci de répondre dans les commentaires si vous avez des questions à ce sujet ou comment il fonctionne.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X