lib/metacosm/simulation.rb in metacosm-0.2.11 vs lib/metacosm/simulation.rb in metacosm-0.2.12

- old
+ new

@@ -1,37 +1,30 @@ -require 'drb/drb' module Metacosm - class EventStream < Frappuccino::Stream - include DRb::DRbUndumped - end - class Simulation - attr_accessor :running - def watch(model) - Frappuccino::Stream.new(model).on_value(&method(:receive)) - end - def fire(command) command_queue.push(command) end def command_queue @command_queue ||= Queue.new end + def event_queue + @event_queue ||= Queue.new + end + def conduct! @conductor_thread = Thread.new { execute } end def execute while true if (command=command_queue.pop) apply(command) - else - thread.pass - sleep 0.001 + sleep 0.01 end + Thread.pass end end def halt! @conductor_thread.terminate @@ -41,37 +34,99 @@ @mutex = Mutex.new end def apply(command) mutex.synchronize do - handler = handler_for(command) - handler.handle(command.attrs) + if command.is_a?(Hash) + handler_module_name = command.delete(:handler_module) + handler_class_name = command.delete(:handler_class_name) + module_name = handler_module_name + handler = (module_name.constantize). + const_get(handler_class_name).new + handler.handle(command) + else + handler = handler_for(command) + handler.handle(command.attrs) + end end end - def event_stream - @event_stream ||= EventStream.new(self) - end + def apply_event(event) + if !@on_event_callback.nil? + event_dto = event.attrs.merge(listener_module: event.listener_module_name, listener_class_name: event.listener_class_name) + @on_event_callback[event_dto] + end - def has_event_stream? - !@event_stream.nil? + if !@event_publication_channel.nil? + event_dto = event.attrs.merge(listener_module: event.listener_module_name, listener_class_name: event.listener_class_name) + redis = Redis.new + redis.publish(@event_publication_channel, Marshal.dump(event_dto)) + end + + if !local_events_disabled? + listener = listener_for(event) + if event.attrs.any? + listener.receive(event.attrs) + else + listener.receive + end + end end - def receive(event, record: true) - if record - events.push(event) - emit(event) if has_event_stream? + def on_event(publish_to:nil,&blk) + unless publish_to.nil? + @event_publication_channel = publish_to end - listener = listener_for(event) - if event.attrs.any? - listener.receive(event.attrs) - else - listener.receive + if block_given? + @on_event_callback = blk end end + # TODO lift this stuff from socius + # def push_commands_to(channel:) + # @command_push_channel = channel + # end + + def subscribe_for_commands(channel:) + p [ :subscribe_to_command_channel, channel: channel ] + redis = Redis.new + begin + redis.subscribe(channel) do |on| + on.subscribe do |chan, subscriptions| + puts "Subscribed to ##{chan} (#{subscriptions} subscriptions)" + end + + on.message do |chan, message| + puts "##{chan}: #{message}" + apply(Marshal.load(message)) + end + + on.unsubscribe do |chan, subscriptions| + puts "Unsubscribed from ##{chan} (#{subscriptions} subscriptions)" + end + end + rescue Redis::BaseConnectionError => error + puts "#{error}, retrying in 1s" + sleep 1 + retry + end + end + + def disable_local_events + @local_events_disabled = true + end + + def local_events_disabled? + @local_events_disabled ||= false + end + + def receive(event, record: true) + events.push(event) if record + apply_event(event) + end + def events @events ||= [] end def self.current @@ -89,11 +144,10 @@ @handlers[command.self_class_name] ||= construct_handler_for(command) end def construct_handler_for(command) module_name = command.handler_module_name - # module_name = "Object" if module_name.empty? (module_name.constantize). const_get(command.handler_class_name).new rescue => ex binding.pry raise ex @@ -103,11 +157,10 @@ @listeners ||= {} @listeners[event.self_class_name] ||= construct_listener_for(event) end def construct_listener_for(event) - module_name = event.listener_module_name #class.name.deconstantize - # module_name = "Object" if module_name.empty? + module_name = event.listener_module_name listener = (module_name.constantize).const_get(event.listener_class_name).new(self) listener end end end