Sha256: 3bbca352908e38f8d7870c9c54b8658f46e15899d90360a80994bfa568592b80
Contents?: true
Size: 1.92 KB
Versions: 1
Compression:
Stored size: 1.92 KB
Contents
require "logger" require "concurrent" require "net/http" module Sqspoller class MessageDelegator def initialize(worker_thread_pool_size, waiting_tasks_ratio, worker_task, logger_file) @logger = Logger.new(logger_file) @worker_thread_pool_size = worker_thread_pool_size @max_allowed_queue_size = waiting_tasks_ratio * worker_thread_pool_size @semaphore = Mutex.new @worker_task = worker_task @pending_schedule_tasks = 0 initialize_connection_pool end def initialize_connection_pool @connection_pool = Concurrent::RubyThreadPoolExecutor.new(max_threads: @worker_thread_pool_size, min_threads: 1, max_queue: @max_allowed_queue_size) end def process(queue_controller, message, queue_name) @semaphore.synchronize { @pending_schedule_tasks +=1 if @connection_pool.queue_length == @max_allowed_queue_size while @connection_pool.queue_length > @worker_thread_pool_size || @connection_pool.queue_length + @pending_schedule_tasks >= @max_allowed_queue_size sleep(0.01) end end } begin @logger.info "Scheduling worker task for message: #{message.message_id}" @connection_pool.post do begin @logger.info "Starting worker task for message: #{message.message_id}" @worker_task.process(message.body, message.message_id) @pending_schedule_tasks -= 1 @logger.info "Finished worker task for message: #{message.message_id}" queue_controller.delete_message message.receipt_handle rescue Exception => e @pending_schedule_tasks -= 1 @logger.info "Caught error for message: #{message}, error: #{e.message}, #{e.backtrace.join("\n")}" end end rescue Concurrent::RejectedExecutionError => e @logger.info "Caught Concurrent::RejectedExecutionError for #{e.message}" end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fourkites-sqspoller-0.1.12.20 | lib/sqspoller/message_delegator.rb |