require 'thread' require 'opener/daemons/sqs' require 'json' Encoding.default_internal = Encoding::UTF_8 Encoding.default_external = Encoding::UTF_8 module Opener module Daemons class Daemon attr_reader :batch_size, :buffer_size, :sleep_interval, :input_queue, :output_queue, :input_buffer, :output_buffer, :klass, :logger attr_accessor :threads, :thread_counts def initialize(klass, options={}) @threads = {:readers=>[], :workers=>[], :writers=>[], :reporters=>[]} @thread_counts = {:readers => options.fetch(:readers, 1), :workers => options.fetch(:workers, 5), :writers => options.fetch(:writers, 1)} @relentless = options.fetch(:relentless, false) @sleep_interval = options.fetch(:sleep_interval, 5) # Initialize queues @input_queue = Opener::Daemons::SQS.find(options.fetch(:input_queue)) if options[:output_queue] @output_queue = Opener::Daemons::SQS.find(options[:output_queue]) end # Initialize Buffers @input_buffer = Queue.new @output_buffer = Queue.new # Batch and Buffer size for a smooth flow. @batch_size = options.fetch(:batch_size, 10) @buffer_size = options[:buffer_size] # Working component @klass = klass script_name = File.basename($0, ".rb") @logger = Logger.new(options.fetch(:log, STDOUT)) @logger.level = if options.fetch(:debug, false) Logger::DEBUG else Logger::INFO end logger.debug(options.to_json) end def buffer_new_messages return if input_buffer.size > buffer_size return if output_buffer.size > buffer_size messages = input_queue.receive_messages(batch_size) if messages.nil? sleep(sleep_interval) return end messages.each do |message| input_buffer << message end end def start Thread.abort_on_exception = true start_readers start_workers start_writers start_reporters threads[:readers].each(&:join) threads[:workers].each(&:join) threads[:writers].each(&:join) threads[:reporters].each(&:join) end def start_readers thread_counts[:readers].times do |t| threads[:readers] << Thread.new do logger.info "Reader #{t+1} ready for action..." loop do buffer_new_messages end end end end def start_workers thread_counts[:workers].times do |t| threads[:workers] << Thread.new do logger.info "Worker #{t+1} launching..." identifier = klass.new loop do message = input_buffer.pop input = JSON.parse(message[:body])["input"] input,* = input if input.kind_of?(Array) begin output, * = identifier.run(input) if output.empty? raise "The component returned an empty response." end rescue Exception => e if relentless? raise else logger.error(e) output = input end end output_buffer.push({ :output=>output, :handle=>message[:receipt_handle]}) end end end end def start_writers thread_counts[:writers].times do |t| threads[:writers] << Thread.new do logger.info "Pusher #{t+1} ready for action..." loop do message = output_buffer.pop payload = {:input=>message[:output].force_encoding("UTF-8")}.to_json output_queue.send_message(payload) if output_queue input_queue.delete_message(message[:handle]) end end end end def start_reporters threads[:reporters] << Thread.new do loop do log = {:buffers=>{:input=>input_buffer.size}} log[:buffers][:output] = output_buffer.size if output_buffer logger.debug log.to_json sleep(2) end end threads[:reporters] << Thread.new do loop do thread_types = threads.keys - [:reporters] thread_counts = thread_types.map do |type| threads[type].select{|thread| thread.status}.count end zip = thread_types.zip(thread_counts) logger.debug "active thread counts: #{zip}" sleep(10) end end end def buffer_size @buffer_size ||= (4 * batch_size) end def relentless? @relentless end end end end