require 'rubygems' require 'factor' require 'mustache' #require 'facter' module Factor module Runtime class Engine attr_accessor :channel_modules, :workflows, :message_bus, :listeners # Engine needs modules that contain the code, workflows to run, and message bus for communication def initialize(username,token) @channel_modules={} @channel_definitions=[] @workflows = {} @message_bus = MessageBus.new(username,token) @credentials = {} @tags = {} @listeners={} end def tag(key,value) @tags[key]=value end # load the channel by referencing the .rb file # the filename is lowercase with "_" for spaces # and the module inside must be camal cased to match # once loaded it is in the channel_modules Hash def load_channel filename, definition filename = File.absolute_path(File.expand_path(filename.path)) if filename.is_a?(File) # just in case someone passes in a File not a String (i.e. me) require filename # channel_module_name = File.basename(filename).gsub('.rb','').split('_').map{|ea| ea.capitalize}.join('') channel_module_name = definition['module_name'] channel_module= self.class.const_get(channel_module_name) @channel_modules[channel_module_name]=channel_module @channel_definitions << definition end def load_credentials credentials,secret=nil credentials.each do |service_name,services| @credentials[service_name] = {} if !@credentials.include?(service_name) services.each do |credential_name,credential| @credentials[service_name][credential_name] = credential["value"] end end # @credentials["credentials"] = credentials end # adds the workflow to the workflows list # the object must be a Workflow type def load_workflow workflow @workflows[workflow.name] = workflow end def logs routing_key="#", &code @message_bus.start do @message_bus.listen routing_key do |message| code.call message end end end def send_start_workflow workflow, params, keep_open=false instance_id=SecureRandom.hex @message_bus.start do message = WorkflowStepMessage.new message.position << "start" message.workflow=workflow message.add_values params message.workflow_instance_id= instance_id @message_bus.send message, !keep_open end instance_id end def send_start_listener workflow_name, keep_open=false @message_bus.start do message = ListenerMessage.new("start",workflow_name) @message_bus.send message, !keep_open end end def send_stop_listener workflow_name, keep_open=false @message_bus.start do message = ListenerMessage.new("stop",workflow_name) @message_bus.send message, !keep_open end end # start your engines. vroom vrooooom! def start begin @message_bus.start do @message_bus.listen("listener") do |payload| message = ListenerMessage.new message.from_queue(payload) listener=@workflows[message.workflow].definition["listener"] params = render_template(listener["params"],@credentials) if message.command=="start" start_listening(listener["channel"],listener["name"],listener["event"],message.workflow,params) else stop_listening(listener["channel"],listener["name"],listener["event"],message.workflow) end end @message_bus.listen("workflow") do |payload| step = WorkflowStepMessage.new step.from_queue(payload) if @workflows.include? step.workflow workflow = @workflows[step.workflow] activity = workflow.get_activity(step.position) if !activity.nil? action = activity["action"] channel = activity["channel"] target = activity["target"] params_template = activity["params"] # if match(target) values = step.body.merge(@credentials) # puts "Values: #{values.inspect}" # this maps the input values passed in with the templated defined in the workflow params = render_template(params_template,values) puts "content: #{params}" event = call_channel_method(channel,action,params) response_message = step.respond(event.params,event.class.name.split("::").last) @message_bus.send response_message # end else puts "[error] no activity found for position '#{step.position}'" end else # workflow doesn't exist puts "[warning] '#{step.workflow}' workflow wasn't found" end end end rescue SystemExit, Interrupt puts "done" rescue Exception => ex puts ex end end def call_channel_method(channel_name,action_name,params) channel_module_name = get_channel_module(channel_name) channel_module = @channel_modules[channel_module_name] action_class_name = get_class_name("actions",channel_name,action_name) command = channel_module.const_get(action_class_name) command.new.do_work(params) end def start_listening(channel_name,listener_name,event,workflow_name,params) channel_module_name = get_channel_module(channel_name) channel_module = @channel_modules[channel_module_name] listener_class_name = get_class_name("listeners",channel_name,listener_name) command = channel_module.const_get(listener_class_name) job_info = {:channel_name=>channel_name,:listener_name=>listener_name,:workflow_name=>workflow_name,:event=>event}.to_json @job = Thread.new do puts "Creating listener #{channel_name}::#{listener_name} on event '#{event}' => #{workflow_name}" listener=command.new(workflow_name,event) puts "Starting listener thread #{params}" listener.start params do |event_params| send_start_workflow workflow_name, event_params end end @listeners[job_info]=@job end def stop_listening(channel_name,listener_name,event,workflow_name) # channel_module_name = get_channel_module(channel_name) # channel_module = @channel_modules[channel_module_name] # listener_class = get_class_name("listeners",channel_name,listener_name) # command = channel_module.const_get(listener_name) job_info = {:channel_name=>channel_name,:listener_name=>listener_name,:workflow_name=>workflow_name,:event=>event}.to_json @listeners[job_info].kill if @listeners.include?(job_info) end private def get_channel_module(channel_name) @channel_definitions.select { |channel_definition| channel_definition['name']==channel_name }.first['module_name'] end # def get_action_class(channel_name,action_name) # @channel_definitions.each do |channel_definition| # if channel_definition['name']==channel_name # channel_definition['actions'].each do |action| # return action["class_name"] if action['name']==action_name # end # end # end # end # def get_listener_class(channel_name,listener_name) # @channel_definitions.each do |channel_definition| # if channel_definition['name']==channel_name # channel_definition['listeners'].each do |listener| # return listener["class_name"] if listener['name']==listener_name # end # end # end # end def get_class_name(type,channel_name,class_name) @channel_definitions.each do |channel_definition| if channel_definition['name']==channel_name channel_definition[type].each do |definition| return definition["class_name"] if definition['name']==class_name end end end end def render_template(source,values) #params = Hash.new if source.is_a?(Hash) params = Hash.new source.each do |key,template| params[key]=render_template(template,values) end else params = Mustache.render(source,values) end # activity_params.each do |key,template| # params[key]=Mustache.render(template,values) # end params end # def match(target) # return true if target==nil || target=="" # facts={} # #Facter.each {|k,v| facts[k]=v} # @tags.each {|k,v| facts[k]=v} # key,value=target.split("=") # facts[key]==value # end end end end