Sha256: 6d0a8a1f28428f8d6df45c39c633653e14556c9e7797b0619c16a3c4babfc9b1
Contents?: true
Size: 1.6 KB
Versions: 2
Compression:
Stored size: 1.6 KB
Contents
module KinesisSupervisor def supervisor_thread() until @stop_flag do active_shard_ids = get_shard_ids() update_maping(active_shard_ids) sleep(@load_shard_interval) end end def update_maping(active_shard_ids) active_shard_ids.each do |shard_id| if @map.has_key?(shard_id) if @map[shard_id].status.nil? $log.error "Thread dead => shard : #{shard_id}" thread_kill(shard_id) elsif @thread_stop_map[shard_id] thread_kill(shard_id) else next end else @thread_stop_map[shard_id] = false t = Thread.new(shard_id, &method(:load_records_thread)) @map[shard_id] = t end end map_shard_ids = @map.keys map_shard_ids.each do |map_shard_id| unless active_shard_ids.include?(map_shard_id) @thread_stop_map[shard_id] = true thread_kill(map_shard_id) end end end def thread_kill(shard_id) $log.info "Thread killing => shard : #{shard_id}" @map[shard_id].join @dead_thread << shard_id @thread_stop_map.delete(shard_id) @map.delete(shard_id) end def get_shard_ids() active_shard_ids = [] shards = @client.describe_stream(stream_name: @stream_name).stream_description.shards shards.each do |shard| if @describe_shard & !@describe_use_shards.include?(shard.shard_id) next end unless @dead_thread.include?(shard.shard_id) active_shard_ids << shard.shard_id end end active_shard_ids rescue => e $log.error "get_shard_ids : #{e.message}" end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-in-kinesis-0.0.2 | lib/fluent/plugin/thread_supervisor.rb |
fluent-plugin-in-kinesis-0.0.1 | lib/fluent/plugin/thread_supervisor.rb |