module Apphunkd class Queue attr_accessor :items attr_accessor :mutex attr_accessor :worker def initialize @items = [] @mutex = Mutex.new end def activate! @worker = initialize_worker end def store(message = {}) debug "Receive: #{message.inspect}" return false if message[:message].blank? || message[:token].blank? message[:stored_at] = Time.now @mutex.synchronize do @items = @items[-9999..-1] if @items.size >= 9999 @items << message end @worker.wakeup return true end def initialize_worker Thread.new do begin loop do unless @items.empty? items = get_items_from_stack items.each do |item| result = push_item_to_remote_service(item) if result.status == :ok case result.response.code when '201' debug "Message successfully stored." when '400', '403' log_error "Remote Service refused to store item: #{result.response.code} / #{result.response.body}. Dropped." else put_item_to_stack(item) log_error "Remote Service went crazy. Put item back in queue: #{result.response.code} / #{result.response.body}" end else put_item_to_stack(item) log_error "Could not push to Remote Service: Connection Error. Put item back in queue." end end end sleep end rescue => e log_error "Error: #{e}" end end end private def get_items_from_stack copied_items = [] @mutex.synchronize do copied_items = @items.dup @items.clear end return copied_items end def put_item_to_stack(item) @mutex.synchronize { @items << item } end def push_item_to_remote_service(item) url = generate_api_messages_url(item.delete('token')) result = Apphunkd::Remote.post(url, item, 300) end def generate_api_messages_url(token) path = "v1/#{token}/messages" if DaemonKit.env.to_s == 'production' "http://postbox.apphunk.com/#{path}" else "http://postbox.apphunk.local/#{path}" end end def debug(message) if DaemonKit.env.to_s == 'development' || ENV['DEBUG'] DaemonKit.logger.debug message end end def log_error(message) DaemonKit.logger.info message end end end