module Fabriq class SkypeProxy attr_accessor :adapter, :outgoing_messages def initialize(adapter) @adapter = adapter @incoming_mutex = Mutex.new @outgoing_mutex = Mutex.new @incoming_message_callbacks = [] @outgoing_messages = [] end def on_incoming_message(&callback) @incoming_message_callbacks << callback end def subscribe_adapter_message_received @received_messages = [] @adapter.message_received do |message| @received_messages << message end end def start subscribe_adapter_message_received start_queue_worker_threads end def start_queue_worker_threads run_in_thread { start_incoming_queue_worker } run_in_thread { start_outgoing_queue_worker } end def start_incoming_queue_worker run_in_throttled_loop do handle_incoming_messages_synchronized end end def start_outgoing_queue_worker run_in_throttled_loop do handle_outgoing_messages_synchronized end end def handle_incoming_messages_synchronized @incoming_mutex.synchronize do invoke_incoming_message_subscribers(@received_messages.shift) end end def handle_outgoing_messages_synchronized @outgoing_mutex.synchronize do if @outgoing_messages.count > 0 message = @outgoing_messages.shift @adapter.send_message(message) end end end def invoke_incoming_message_subscribers(message) @incoming_message_callbacks.each do |callback| run_in_thread { callback.call(message) } end end private def run_in_throttled_loop(&block) while true yield sleep 0.5 end end def run_in_thread(&block) Thread.new(&block) end end end