require "logger" require "concurrent" require "net/http" require "yaml" require "erb" require "sqspoller" require "sqspoller/worker_task" require "sqspoller/poll/queue_controller" require "sqspoller/process/worker_controller" require "sqspoller/process/task_finalizer" require "sqspoller/process/message_handler" require 'new_relic/agent' # This is the starting point of SQS polling. Load the Yaml config and starts QueueController and WorkerController. # Waits until process get kill or stop. module SqsPoller class << self def sym(map) if map.class == Hash map = map.inject({}) { |memo, (k, v)| memo[k.to_sym] = sym(v); memo } end map end def daemonize(filename) raise 'Must run as root' if Process.euid != 0 raise 'First fork failed' if (pid = fork) == -1 exit unless pid.nil? Process.setsid raise 'Second fork failed' if (pid = fork) == -1 exit unless pid.nil? puts "Daemon pid: #{Process.pid}" # Or save it somewhere, etc. Dir.chdir '/' File.umask 0000 STDIN.reopen filename STDOUT.reopen '/dev/null', 'a' STDERR.reopen STDOUT end def start_poller_with_config(config, queue_config_name, access_key_id, secret_access_key, region, logger_file, log_level = ::Logger::ERROR) SqsPoller::Logger.set_log_level(log_level) SqsPoller::Logger.set_logger_file(logger_file) ::NewRelic::Agent.manual_start if ENV['START_NEW_RELIC_AGENT'] SqsPoller::Metrics.start_metrics_agent if ENV['START_LOCAL_METRICS_AGENT'] @logger = SqsPoller::Logger.get_new_logger("SqsPoller") @logger.info "Started poller method" queues_config = config[queue_config_name] || config[queue_config_name.to_sym] total_poller_threads = get_total_poller_threads(queues_config) worker_configuration = config[:worker_configuration] total_worker_threads = get_total_worker_threads(worker_configuration, total_poller_threads) waiting_tasks_ratio = get_waiting_tasks_ratio(worker_configuration) aws_config = { :access_key_id => access_key_id, :secret_access_key => secret_access_key, :region => region } task_queue = SizedQueue.new(total_worker_threads * waiting_tasks_ratio) qc = SqsPoller::Poller::QueueController.start queues_config, task_queue, aws_config unless qc.started? @logger.error("Unable to start Queue Pollers.") return end worker_task = worker_configuration[:worker_class].split('::').inject(Object) { |o, c| o.const_get c }.new(worker_configuration) wc = SqsPoller::Process::WorkerController.start total_worker_threads, task_queue, worker_task unless wc.started? @logger.error("Unable to start Workers.") return end wait end def start_poller(filename, queue_config_name, access_key_id, secret_access_key, region, log_filename = nil) begin puts "Starting poller" config = YAML.load(ERB.new(IO.read(filename)).result) config = sym(config) if log_filename.nil? || log_filename.empty? puts "Did not receive log file name" fork do Process.daemon start_poller_with_config config, queue_config_name, access_key_id, secret_access_key, region, STDOUT, ::Logger::DEBUG end else puts "Did receive log file name #{log_filename}" #daemonize log_filename puts "Daemonize log file name #{log_filename}" start_poller_with_config config, queue_config_name, access_key_id, secret_access_key, region, STDOUT, ::Logger::INFO end rescue Exception => e puts "#{e}" end end def get_total_poller_threads(queues_config) total_poller_threads = 0 queues_config.keys.each { |queue| total_poller_threads += queues_config[queue][:polling_threads] } total_poller_threads end def get_waiting_tasks_ratio(worker_configuration) waiting_tasks_ratio = worker_configuration[:waiting_tasks_ratio] waiting_tasks_ratio = 1 if waiting_tasks_ratio.nil? waiting_tasks_ratio end def get_total_worker_threads(worker_configuration, total_poller_threads) worker_thread_count = worker_configuration[:concurrency] worker_thread_count = total_poller_threads if worker_thread_count.nil? worker_thread_count end def timeout n_bytes = [42].pack('i').size n_bits = n_bytes * 8 2 ** (n_bits - 2) - 1 end private def wait t = Thread.new do loop do sleep(60 * 60) end end t.join end end end