Pure-Ruby并发哈希

实现一个可以在多个线程中修改的Hash的最佳方式是什么,但是锁的数量是最less的。 为了这个问题的目的,你可以假设哈希将是重读的。 它必须在所有Ruby实现中都是线程安全的,包括那些以真正同步的方式运行的实现,比如JRuby,它必须以纯Ruby(不允许C或Java)编写。

随意提交一个总是locking的天真的解决scheme,但这不可能是最好的解决scheme。 优雅的点,但较小的可能性locking胜过较小的代码。

好吧,现在您已经指定了“线程安全”的实际含义,这里有两个潜在的实现。 以下代码将在MRI和JRuby中永远运行。 无锁实现遵循最终的一致性模型,其中每个线程使用它自己的哈希视图,如果主控不断变化的话。 有一点小技巧需要确保存储线程中的所有信息不会泄漏内存,但是这是处理和testing – 进程大小不增长运行此代码。 这两个实现都需要更多的工作来完成,这意味着删除,更新等需要一些思考,但是下面两个概念中的任何一个都可以满足你的要求。

对于阅读这个线程的人来说,认识到整个问题是JRuby专有的非常重要 – 在MRI中,内置的Hash就足够了。

module Cash def Cash.new(*args, &block) env = ENV['CASH_IMPL'] impl = env ? Cash.const_get(env) : LocklessImpl klass = defined?(JRUBY_VERSION) ? impl : ::Hash klass.new(*args) end class LocklessImpl def initialize @hash = {} end def thread_hash thread = Thread.current thread[:cash] ||= {} hash = thread[:cash][thread_key] if hash hash else hash = thread[:cash][thread_key] = {} ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) } hash end end def thread_key [Thread.current.object_id, object_id] end def []=(key, val) time = Time.now.to_f tuple = [time, val] @hash[key] = tuple thread_hash[key] = tuple val end def [](key) # check the master value # val = @hash[key] # someone else is either writing the key or it has never been set. we # need to invalidate our own copy in either case # if val.nil? thread_val = thread_hash.delete(key) return(thread_val ? thread_val.last : nil) end # check our own thread local value # thread_val = thread_hash[key] # in this case someone else has written a value that we have never seen so # simply return it # if thread_val.nil? return(val.last) end # in this case there is a master *and* a thread local value, if the master # is newer juke our own cached copy # if val.first > thread_val.first thread_hash.delete(key) return val.last else return thread_val.last end end end class LockingImpl < ::Hash require 'sync' def initialize(*args, &block) super ensure extend Sync_m end def sync(*args, &block) sync_synchronize(*args, &block) end def [](key) sync(:SH){ super } end def []=(key, val) sync(:EX){ super } end end end if $0 == __FILE__ iteration = 0 loop do n = 42 hash = Cash.new threads = Array.new(10) { Thread.new do Thread.current.abort_on_exception = true n.times do |key| hash[key] = key raise "#{ key }=nil" if hash[key].nil? end end } threads.map{|thread| thread.join} puts "THREADSAFE: #{ iteration += 1 }" end end 

发表基地/天真的解决scheme,只是为了提高我的堆栈溢出债权:

 require 'thread' class ConcurrentHash < Hash def initialize super @mutex = Mutex.new end def [](*args) @mutex.synchronize { super } end def []=(*args) @mutex.synchronize { super } end end 

耶胡达,我想你提到伊娃的设置是primefaces? 那么简单的复制和交换呢?

 require 'thread' class ConcurrentHash def initialize @reader, @writer = {}, {} @lock = Mutex.new end def [](key) @reader[key] end def []=(key, value) @lock.synchronize { @writer[key] = value @reader, @writer = @writer, @reader @writer[key] = value } end end 

这是一个围绕Hash的包装类,它允许并发的读取器,但是为所有其他types的访问(包括迭代读取)locking事物。

 class LockedHash def initialize @hash = Hash.new @lock = ThreadAwareLock.new() @reader_count = 0 end def [](key) @lock.lock_read ret = @hash[key] @lock.unlock_read ret end def []=(key, value) @lock.lock_write @hash[key] = value @lock.unlock_write end def method_missing(method_sym, *arguments, &block) if @hash.respond_to? method_sym @lock.lock_block val = lambda{@hash.send(method_sym,*arguments, &block)}.call @lock.unlock_block return val end super end end 

这是它使用的locking代码:

 class RWLock def initialize @outer = Mutex.new @inner = Mutex.new @reader_count = 0 end def lock_read @outer.synchronize{@inner.synchronize{@reader_count += 1}} end def unlock_read @inner.synchronize{@reader_count -= 1} end def lock_write @outer.lock while @reader_count > 0 ;end end def unlock_write @outer.unlock end end class ThreadAwareLock < RWLock def initialize @owner = nil super end def lock_block lock_write @owner = Thread.current.object_id end def unlock_block @owner = nil unlock_write end def lock_read super unless my_block? end def unlock_read super unless my_block? end def lock_write super unless my_block? end def unlock_write super unless my_block? end def my_block? @owner == Thread.current.object_id end end 

线程感知locking允许您locking一次类,然后调用通常会locking的方法,并让它们不locking。 您需要这样做是因为您在某些方法中产生了块,并且这些块可以调用该对象的locking方法,并且不需要死锁或双锁错误。 你可以使用计数锁而不是这个。

这里试图实现桶级别的读写locking:

 class SafeBucket def initialize @lock = RWLock.new() @value_pairs = [] end def get(key) @lock.lock_read pair = @value_pairs.select{|p| p[0] == key} unless pair && pair.size > 0 @lock.unlock_read return nil end ret = pair[0][1] @lock.unlock_read ret end def set(key, value) @lock.lock_write pair = @value_pairs.select{|p| p[0] == key} if pair && pair.size > 0 pair[0][1] = value @lock.unlock_write return end @value_pairs.push [key, value] @lock.unlock_write value end def each @value_pairs.each{|p| yield p[0],p[1]} end end class MikeConcurrentHash def initialize @buckets = [] 100.times {@buckets.push SafeBucket.new} end def [](key) bucket(key).get(key) end def []=(key, value) bucket(key).set(key, value) end def each @buckets.each{|b| b.each{|key, value| yield key, value}} end def bucket(key) @buckets[key.hash % 100] end end 

我停止了这个工作,因为它太慢了,所以每个方法都是不安全的(在迭代期间允许其他线程的变化),并且不支持大多数哈希方法。

这里有一个用于并发哈希的testing工具:

 require 'thread' class HashHarness Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes, :that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash, :for, :all, :ruby, :implementations] def self.go h = new r = h.writiness_range(20, 10000, 0, 0) r.each{|k, v| pk + ' ' + v.map{|p| p[1]}.join(' ')} return end def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash]) @classes = classes end def writiness_range(basic_threads, ops, each_threads, loops) result = {} @classes.each do |hash_class| res = [] 0.upto 10 do |i| writiness = i.to_f / 10 res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)] end result[hash_class.name] = res end result end def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness) time = Time.now threads = [] hash = hash_class.new populate_hash(hash) begin basic_threads.times do threads.push Thread.new{run_basic_test(hash, writiness, ops)} end each_threads.times do threads.push Thread.new{run_each_test(hash, writiness, loops)} end threads.each{|t| t.join} rescue ThreadError => e p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ') return -1 end p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ') return Time.now - time end def run_basic_test(hash, writiness, ops) ops.times do rand < writiness ? hash[choose_key]= rand : hash[choose_key] end end def run_each_test(hash, writiness, loops) loops.times do hash.each do |k, v| if rand < writiness each_write_work(hash, k, v) else each_read_work(k, v) end end end end def each_write_work(hash, key, value) hash[key] = rand end def each_read_work(key, value) key.to_s + ": " + value.to_s end def choose_key Keys[rand(Keys.size)] end def populate_hash(hash) Keys.each{|key| hash[key]=rand} end end 

数字:Jruby

 Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0 ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881 LockedHash 1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630 Hash 0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001 

和MRI

 Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0 ConcurrentHash 9.214 9.913 9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036 LockedHash 19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146 Hash 0.535 0.537 0.534 0.599 0.594 0.676 0.635 0.650 0.654 0.661 0.692 

核磁共振数字是相当惊人的。 locking在核磁共振真的很糟糕。

这可能是仓鼠gem的一个用例

Hamster在纯Ruby中实现了Hash Array Mapped Tries(HAMT)以及其他一些持久化的数据结构 。

持久化的数据结构是不可变的,而不是通过增加或replace哈希中的键值对来改变(改变)结构,而是返回一个包含改变的新数据结构。 诀窍,持久不变的数据结构,是新返回的数据结构尽可能多地重用前辈。

我认为要实现使用仓鼠,你会使用他们的可变哈希包装,它传递所有的读取持久性不变的哈希当前值(即,应该是快),同时守卫一个互斥体的所有写入,并交换到新值写之后的持久性不可变哈希。

例如:

 require 'hamster' require 'hamster/experimental/mutable_hash' hsh = Hamster.mutable_hash(:name => "Simon", :gender => :male) # reading goes directly to hash puts hsh[:name] # Simon # writing is actually swapping to new value of underlying persistent data structure hsh.put(:name, "Joe") puts hsh[:name] # Joe 

所以,让我们用这个来描述类似的问题:

( 在这里主要 )

 require 'hamster' require 'hamster/experimental/mutable_hash' # a bunch of threads with a read/write ratio of 10:1 num_threads = 100 num_reads_per_write = 10 num_loops = 100 hsh = Hamster.mutable_hash puts RUBY_DESCRIPTION puts "#{num_threads} threads x #{num_loops} loops, #{num_reads_per_write}:1 R/W ratio" t0 = Time.now Thread.abort_on_exception = true threads = (0...num_threads).map do |n| Thread.new do write_key = n % num_reads_per_write read_keys = (0...num_reads_per_write).to_a.shuffle # random order last_read = nil num_loops.times do read_keys.each do |k| # Reads last_read = hsh[k] Thread.pass # Atomic increments in the correct ratio to reads hsh.put(k) { |v| (v || 0) + 1 } if k == write_key end end end end threads.map { |t| t.join } t1 = Time.now puts "Error in keys" unless (0...num_reads_per_write).to_a == hsh.keys.sort.to_a puts "Error in values" unless hsh.values.all? { |v| v == (num_loops * num_threads) / num_reads_per_write } puts "Time elapsed: #{t1 - t0} s" 

我得到以下输出:

 ruby 1.9.2p320 (2012-04-20 revision 35421) [x86_64-linux] 100 threads x 100 loops, 10:1 R/W ratio Time elapsed: 5.763414627 s jruby 1.7.0 (1.9.3p203) 2012-10-22 ff1ebbe on Java HotSpot(TM) 64-Bit Server VM 1.6.0_26-b03 [linux-amd64] 100 threads x 100 loops, 10:1 R/W ratio Time elapsed: 1.697 s 

你觉得这怎么样?

这个解决scheme更类似于Scala或Clojure可以解决这个问题,但是在这些语言中,更有可能使用软件事务内存和低级CPU支持primefaces比较和交换操作。

编辑 :值得注意的是仓鼠实现速度快的一个原因是它具有无锁读取path 。 如果您对此有任何疑问,请回复评论。

这个( video , pdf )是关于用Java实现的无锁哈希表。

破坏者:使用primefaces比较和交换(CAS)操作,如果在Ruby中不可用,你可以用锁来模拟它们。 不知道这是否会比简单的锁守哈希表有什么优势

没有testing,并在优化阅读天真刺。 它假定大部分时间,这个值不会被locking。 如果是这样,紧密的循环将尝试,直到它。 我把Thread.critical放在那里,以帮助确保读取线程不会运行,直到写入完成。 不确定是否需要关键部分,这实际上取决于你的读数是多么的重要,所以一些基准testing是按顺序进行的。

 class ConcurrentHash < Hash def initialize(*args) @semaphore = Mutex.new super end def []=(k,v) begin old_crit = Thread.critical Thread.critical = true unless old_crit @semaphore.synchronize { super } ensure Thread.critical = old_crit end end def [](k) while(true) return super unless @semaphore.locked? end end end 

可能还有一些其他的读取方法需要检查@semaphorelocking,我不知道是否所有的东西都是用#[]来实现的。

我很不清楚这是什么意思。 我认为最简单的实现是简单的

 Hash 

也就是说内置的ruby散列线程安全的,如果通过线程安全,你的意思是不会爆炸,如果> 1线程试图访问它。 这段代码将永远安全运行

 n = 4242 hash = {} loop do a = Thread.new do n.times do hash[:key] = :val end end b = Thread.new do n.times do hash.delete(:key) end end c = Thread.new do n.times do val = hash[:key] raise val.inspect unless [nil, :val].include?(val) end end a.join b.join c.join p :THREADSAFE end 

我怀疑通过线程安全,你真的意味着酸 – 例如一个像哈希[:键] =:如果有[:钥匙]将返回:瓦尔后面读一个读。 但没有任何locking的欺骗可以提供 – 最后一个将永远赢。 例如,假设你有42个线程全部更新一个线程安全散列 – 哪个值应该由43'rd?读取? 当然,通过threasafe你并不意味着写某种总的顺序 – 因此,如果42个线程正在积极地写'正确的'值是正确的? 但是ruby内置的哈希就是这样工作的…

也许你的意思是类似的

 hash.each do ... 

在一个线程和

 hash.delete(key) 

不会互相干扰? 我可以想象,要想成为线程安全的,但这不是一个单一的线程与核磁共振ruby(显然你不能修改一个散列,而迭代它)

所以你可以更具体说明你的意思是“线程安全”?

给ACID语义的唯一方法就是一个严格的locking(当然,这可能是一个方法,但是仍然是一个外部锁)。

ruby的线程调度器不仅仅是在一些任意的c函数(比如内置的散列函数方法)中调度一个线程,所以这些都是线程安全的。

不幸的是,我不能添加评论给迈克尔·沙洛尔回答他介绍的地方:类RWLock和类的locking哈希@Reader_count等(还没有足够的业力)

该解决scheme不起作用。 它给出了一个错误:在“解锁”:尝试解锁未locking的互斥锁(ThreadError)

由于逻辑错误:当解锁的时候,解锁会发生1次额外的时间(因为缺less检查my_block?(),而是解锁,即使解锁不是必要的“是我的块”),所以第二次解锁已经解锁哑巴引发了一个例外。 (我将粘贴完整的代码,如何在这篇文章的末尾重现这个错误)。

此外,迈克尔提到“每种方法都是不安全的(允许在迭代过程中由其他线程进行突变)”,所以我最终得到了这个简化的解决scheme,它适用于我所有的用例,并且只要在任何调用当从不同的线程调用任何哈希方法(来自同一个线程,拥有锁的调用不阻塞,以避免死锁):

 # # This TrulyThreadSafeHash works! # # Note if one thread iterating the hash by #each method # then the hash will be locked for all other threads (they will not be # able to even read from it) # class TrulyThreadSafeHash def initialize @mutex = Mutex.new @hash = Hash.new end def method_missing(method_sym, *arguments, &block) if !@mutex.owned? # Returns true if this lock is currently held by current thread # We're trying to lock only if mutex is not owned by the current thread (is not locked or is locked by some other thread). # Following call will be blocking if mutex locked by other thread: @mutex.synchronize{ return lambda{@hash.send(method_sym,*arguments, &block)}.call } end # We already own the lock (from current thread perspective). # We don't even check if @hash.respond_to?(method_sym), let's make Hash # respond properly on all calls (including bad calls (example: wrong method names)) lambda{@hash.send(method_sym,*arguments, &block)}.call end # since we're tyring to mimic Hash we'll pretend to respond as Hash would def self.respond_to?(method_sym, include_private = false) Hash.respond_to(method_sym, include_private) end # override Object's to_s because our method_missing won't be called for to_s def to_s(*arguments) @mutex.synchronize{ return @hash.to_s } end # And for those, who want to run extra mile: # to make our class json-friendly we shoud require 'json' and uncomment this: #def to_json(*options) # @mutex.synchronize{ # return @hash.to_json(*options) # } #end end 

现在完整的例子来演示/重现Michael Sofaer解决scheme中的双重解锁错误:

 #!/usr/bin/env ruby # ======= unchanged copy-paste part from Michael Sofaer answer (begin) ======= class LockedHash def initialize @hash = Hash.new @lock = ThreadAwareLock.new() @reader_count = 0 end def [](key) @lock.lock_read ret = @hash[key] @lock.unlock_read ret end def []=(key, value) @lock.lock_write @hash[key] = value @lock.unlock_write end def method_missing(method_sym, *arguments, &block) if @hash.respond_to? method_sym @lock.lock_block val = lambda{@hash.send(method_sym,*arguments, &block)}.call @lock.unlock_block return val end super end end class RWLock def initialize @outer = Mutex.new @inner = Mutex.new @reader_count = 0 end def lock_read @outer.synchronize{@inner.synchronize{@reader_count += 1}} end def unlock_read @inner.synchronize{@reader_count -= 1} end def lock_write @outer.lock while @reader_count > 0 ;end end def unlock_write @outer.unlock end end class ThreadAwareLock < RWLock def initialize @owner = nil super end def lock_block lock_write @owner = Thread.current.object_id end def unlock_block @owner = nil unlock_write end def lock_read super unless my_block? end def unlock_read super unless my_block? end def lock_write super unless my_block? end def unlock_write super unless my_block? end def my_block? @owner == Thread.current.object_id end end # ======= unchanged copy-paste part from Michael Sofaer answer (end) ======= # global hash object, which will be 'shared' across threads $h = LockedHash.new # hash_reader is just iterating through the 'shared' hash $h # and prints specified delimeter (capitalized when last hash item read) def hash_reader(delim) loop{ count = 0 $h.each{ count += 1 if count != $h.size $stderr.print delim else $stderr.puts delim.upcase end } } end # fill hash with 10 items 10.times{|i| $h[i] = i } # create a thread which will read $h hash t1 = Thread.new(){ hash_reader("o") } t1.join # will never happen, but for completeness 

,这给出了以下错误:

 ./LockedHash_fails_to_unlock.rb oooooooooO ./LockedHash_fails_to_unlock.rb:55:in `unlock': Attempt to unlock a mutex which is not locked (ThreadError) from ./LockedHash_fails_to_unlock.rb:55:in `unlock_write' from ./LockedHash_fails_to_unlock.rb:82:in `unlock_write' from ./LockedHash_fails_to_unlock.rb:70:in `unlock_block' from ./LockedHash_fails_to_unlock.rb:29:in `method_missing' from ./LockedHash_fails_to_unlock.rb:100:in `block in hash_reader' from ./LockedHash_fails_to_unlock.rb:98:in `loop' from ./LockedHash_fails_to_unlock.rb:98:in `hash_reader' from ./LockedHash_fails_to_unlock.rb:119:in `block in <main>' 

既然你提到Hash会被读取得很重,只要有一个互斥锁同时读取和写入,就会导致竞争条件最有可能被read读取。 如果你没有问题,那就忽略这个答案。

如果你想给写入一个优先级,一个读写locking将有所帮助。 下面的代码是基于一些旧的c ++作业系统类的分配,所以可能不是最好的质量,但给出了一个大致的想法。

 require 'thread' class ReadWriteLock def initialize @critical_section = Mutex.new @are_writers_finished = ConditionVariable.new @are_readers_finished = ConditionVariable.new @readers = 0 @writers = 0 @writer_locked = false end def read begin start_read yield ensure end_read end end def start_read @critical_section.lock while (@writers != 0 || @writer_locked) @are_writers_finished.wait(@critical_section) end @readers += 1 @critical_section.unlock end def end_read @critical_section.lock if (@readers -= 1) == 0 @are_readers_finished.broadcast end @critical_section.unlock end def write begin start_write yield ensure end_write end end def start_write @critical_section.lock @writers += 1 while @readers > 0 @are_readers_finished.wait(@critical_section) end while @writer_locked @are_writers_finished.wait(@critical_section) end @writers -= 1 @writer_locked = true @critical_section.unlock end def end_write @critical_section.lock @writer_locked = false @are_writers_finished.broadcast @critical_section.unlock end end 

然后在lock.write和lock.read中包装[] =和[]。 可能会对性能产生影响,但可以保证写入操作能够“通过”读取操作。 这个有用性取决于实际上是多么重读。