# # Fluentd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'timeout' module Fluent module Counter class MutexHash def initialize(data_store) @mutex = Mutex.new @data_store = data_store @mutex_hash = {} @thread = nil @cleanup_thread = CleanupThread.new(@data_store, @mutex_hash, @mutex) end def start @data_store.start @cleanup_thread.start end def stop @data_store.stop @cleanup_thread.stop end def synchronize(*keys) return if keys.empty? locks = {} loop do @mutex.synchronize do keys.each do |key| mutex = @mutex_hash[key] unless mutex v = Mutex.new @mutex_hash[key] = v mutex = v end if mutex.try_lock locks[key] = mutex else locks.each_value(&:unlock) locks = {} # flush locked keys break end end end next if locks.empty? # failed to lock all keys locks.each do |(k, v)| yield @data_store, k v.unlock end break end end def synchronize_keys(*keys) return if keys.empty? keys = keys.dup while key = keys.shift @mutex.lock mutex = @mutex_hash[key] unless mutex v = Mutex.new @mutex_hash[key] = v mutex = v end if mutex.try_lock @mutex.unlock yield @data_store, key mutex.unlock else # release global lock @mutex.unlock keys.push(key) # failed lock, retry this key end end end end class CleanupThread CLEANUP_INTERVAL = 60 * 15 # 15 min def initialize(store, mutex_hash, mutex) @store = store @mutex_hash = mutex_hash @mutex = mutex @thread = nil @running = false end def start @running = true @thread = Thread.new do while @running sleep CLEANUP_INTERVAL run_once end end end def stop return unless @running @running = false begin # Avoid waiting CLEANUP_INTERVAL Timeout.timeout(1) do @thread.join end rescue Timeout::Error @thread.kill end end private def run_once @mutex.synchronize do last_cleanup_at = (Time.now - CLEANUP_INTERVAL).to_i @mutex_hash.each do |(key, mutex)| v = @store.get(key, raw: true) next unless v next if last_cleanup_at < v['last_modified_at'][0] # v['last_modified_at'] = [sec, nsec] next unless mutex.try_lock @mutex_hash[key] = nil mutex.unlock # Check that a waiting thread is in a lock queue. # Can't get a lock here means this key is used in other places. # So restore a mutex value to a corresponding key. if mutex.try_lock @mutex_hash.delete(key) mutex.unlock else @mutex_hash[key] = mutex end end end end end end end