C'est une classe wrapper autour de Hachage qui permet aux lecteurs simultanés, mais les verrous les choses pour tous les autres types d'accès (y compris réitéré au lit).
class LockedHash
def initialize
@hash = Hash.new
@lock = ThreadAwareLock.new()
@reader_count = 0
end
def [](key)
@lock.lock_read
ret = @hash[key]
@lock.unlock_read
ret
end
def []=(key, value)
@lock.lock_write
@hash[key] = value
@lock.unlock_write
end
def method_missing(method_sym, *arguments, &block)
if @hash.respond_to? method_sym
@lock.lock_block
val = lambda{@hash.send(method_sym,*arguments, &block)}.call
@lock.unlock_block
return val
end
super
end
end
Voici le code de verrouillage qu'il utilise:
class RWLock
def initialize
@outer = Mutex.new
@inner = Mutex.new
@reader_count = 0
end
def lock_read
@outer.synchronize{@inner.synchronize{@reader_count += 1}}
end
def unlock_read
@inner.synchronize{@reader_count -= 1}
end
def lock_write
@outer.lock
while @reader_count > 0 ;end
end
def unlock_write
@outer.unlock
end
end
class ThreadAwareLock < RWLock
def initialize
@owner = nil
super
end
def lock_block
lock_write
@owner = Thread.current.object_id
end
def unlock_block
@owner = nil
unlock_write
end
def lock_read
super unless my_block?
end
def unlock_read
super unless my_block?
end
def lock_write
super unless my_block?
end
def unlock_write
super unless my_block?
end
def my_block?
@owner == Thread.current.object_id
end
end
Le thread courant de verrouillage est de vous permettre de verrouiller la classe une fois, et appeler les méthodes qui seraient normalement de verrouillage, et de ne pas se verrouiller. Vous avez besoin de cela, parce que vous le rendement en blocs à l'intérieur de certaines méthodes, et ces blocs peut appeler les méthodes de verrouillage sur l'objet, et vous ne voulez pas d'un blocage ou d'une double erreur de verrouillage. Vous pouvez utiliser un comptage de verrouillage de la place pour cet.
Voici une tentative de mettre en œuvre seau niveau des verrous en lecture / écriture:
class SafeBucket
def initialize
@lock = RWLock.new()
@value_pairs = []
end
def get(key)
@lock.lock_read
pair = @value_pairs.select{|p| p[0] == key}
unless pair && pair.size > 0
@lock.unlock_read
return nil
end
ret = pair[0][1]
@lock.unlock_read
ret
end
def set(key, value)
@lock.lock_write
pair = @value_pairs.select{|p| p[0] == key}
if pair && pair.size > 0
pair[0][1] = value
@lock.unlock_write
return
end
@value_pairs.push [key, value]
@lock.unlock_write
value
end
def each
@value_pairs.each{|p| yield p[0],p[1]}
end
end
class MikeConcurrentHash
def initialize
@buckets = []
100.times {@buckets.push SafeBucket.new}
end
def [](key)
bucket(key).get(key)
end
def []=(key, value)
bucket(key).set(key, value)
end
def each
@buckets.each{|b| b.each{|key, value| yield key, value}}
end
def bucket(key)
@buckets[key.hash % 100]
end
end
J'ai arrêté de travailler sur ce point car il est trop lent, donc chaque méthode est dangereuse (permet de mutations par d'autres threads au cours d'une itération) et il ne prend pas en charge la plupart des méthodes de hachage.
Et voici un harnais de test simultané de hachages:
require 'thread'
class HashHarness
Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
:that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
:for, :all, :ruby, :implementations]
def self.go
h = new
r = h.writiness_range(20, 10000, 0, 0)
r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
return
end
def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
@classes = classes
end
def writiness_range(basic_threads, ops, each_threads, loops)
result = {}
@classes.each do |hash_class|
res = []
0.upto 10 do |i|
writiness = i.to_f / 10
res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
end
result[hash_class.name] = res
end
result
end
def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
time = Time.now
threads = []
hash = hash_class.new
populate_hash(hash)
begin
basic_threads.times do
threads.push Thread.new{run_basic_test(hash, writiness, ops)}
end
each_threads.times do
threads.push Thread.new{run_each_test(hash, writiness, loops)}
end
threads.each{|t| t.join}
rescue ThreadError => e
p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
return -1
end
p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
return Time.now - time
end
def run_basic_test(hash, writiness, ops)
ops.times do
rand < writiness ? hash[choose_key]= rand : hash[choose_key]
end
end
def run_each_test(hash, writiness, loops)
loops.times do
hash.each do |k, v|
if rand < writiness
each_write_work(hash, k, v)
else
each_read_work(k, v)
end
end
end
end
def each_write_work(hash, key, value)
hash[key] = rand
end
def each_read_work(key, value)
key.to_s + ": " + value.to_s
end
def choose_key
Keys[rand(Keys.size)]
end
def populate_hash(hash)
Keys.each{|key| hash[key]=rand}
end
end
Numéros:
Jruby
Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash 1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash 0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001
Et l'IRM
Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0
ConcurrentHash 9.214 9.913 9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash 19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash 0.535 0.537 0.534 0.599 0.594 0.676 0.635 0.650 0.654 0.661 0.692
L'IRM chiffres sont assez frappante. Verrouillage de l'IRM, ça craint vraiment.