lib/ruote/worker.rb in ruote-2.1.11 vs lib/ruote/worker.rb in ruote-2.2.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -45,11 +45,13 @@ attr_reader :context attr_reader :run_thread attr_reader :running - def initialize (storage) + # Given a storage, creates a new instance of a Worker. + # + def initialize(storage) @subscribers = [] # must be ready before the storage is created # services like Logger to subscribe to the worker @@ -91,21 +93,25 @@ def join @run_thread.join if @run_thread end - def subscribe (actions, subscriber) + # Loggers and trackers call this method when subscribing for events / + # actions in this worker. + # + def subscribe(actions, subscriber) @subscribers << [ actions, subscriber ] end + # Shuts down this worker (makes sure it won't fetch further messages + # and schedules). + # def shutdown @running = false - return unless @run_thread - begin @run_thread.join rescue Exception => e end end @@ -136,10 +142,13 @@ (wfids - error_wfids == []) end protected + # One worker step, fetches schedules and triggers those whose time has + # came, then fetches msgs and processes them. + # def step now = Time.now.utc delta = now - @last_time @@ -147,13 +156,11 @@ # # at most once per second, deal with 'ats' and 'crons' @last_time = now - @storage.get_schedules(delta, now).each do |sche| - trigger(sche) - end + @storage.get_schedules(delta, now).each { |sche| trigger(sche) } end # msgs @msgs = @storage.get_msgs if @msgs.empty? @@ -191,25 +198,40 @@ else @sleep_time = 0.000 end end - def trigger (schedule) + # Given a schedule, attempts to trigger it. + # + # It first tries to + # reserve the schedule. If the reservation fails (another worker + # was successful probably), false is returned. The schedule is + # triggered if the reservation was successful, true is returned. + # + def trigger(schedule) msg = Ruote.fulldup(schedule['msg']) return false unless @storage.reserve(schedule) @storage.put_msg(msg.delete('action'), msg) true end - def process (msg) + # Processes one msg. + # + # Will return false immediately if the msg reservation failed (another + # worker grabbed the message. + # + # Else will execute the action ordered in the msg, and return true. + # + # Exceptions in execution are intercepted here and passed to the + # engine's (context's) error_handler. + # + def process(msg) - return false if cannot_handle(msg) - return false unless @storage.reserve(msg) begin action = msg['action'] @@ -236,42 +258,37 @@ # msg got deleted, might still be interesting for a subscriber end notify(msg) - rescue Exception => exception + rescue => exception @context.error_handler.msg_handle(msg, exception) end true end - def notify (msg) + # Given a successfully executed msg, now notifies all the subscribers + # interested in the kind of action the msg ordered. + # + def notify(msg) @subscribers.each do |actions, subscriber| if actions == :all || actions.include?(msg['action']) subscriber.notify(msg) end end end - # Should always return false. Except when the message is a 'dispatch' - # and it's for a participant only available to an 'engine_worker' - # (block participants, stateful participants) - # - def cannot_handle (msg) - - return false if msg['action'] != 'dispatch' - - @context.engine.nil? && msg['for_engine_worker?'] - end - # Works for both the 'launch' and the 'apply' msgs. # - def launch (msg) + # Creates a new expression, gives and applies it with the + # workitem contained in the msg. + # + def launch(msg) tree = msg['tree'] variables = msg['variables'] exp_class = @context.expmap.expression_class(tree.first) @@ -281,11 +298,11 @@ exp_hash = { 'fei' => msg['fei'] || { 'engine_id' => @context.engine_id, 'wfid' => msg['wfid'], - 'sub_wfid' => msg['sub_wfid'], + 'subid' => Ruote.generate_subid(msg.inspect), 'expid' => '0' }, 'parent_id' => msg['parent_id'], 'original_tree' => tree, 'variables' => variables, 'applied_workitem' => msg['workitem'], @@ -294,32 +311,38 @@ if not exp_class exp_class = Ruote::Exp::RefExpression - #elsif msg['action'] == 'launch' && exp_class == Ruote::Exp::DefineExpression elsif is_launch?(msg, exp_class) def_name, tree = Ruote::Exp::DefineExpression.reorganize(tree) variables[def_name] = [ '0', tree ] if def_name exp_class = Ruote::Exp::SequenceExpression end exp = exp_class.new(@context, exp_hash.merge!('original_tree' => tree)) exp.initial_persist - exp.do_apply + exp.do_apply(msg) end - def is_launch? (msg, exp_class) + # Returns true if the msg is a "launch" (ie not a simply "apply"). + # + def is_launch?(msg, exp_class) return false if exp_class != Ruote::Exp::DefineExpression return true if msg['action'] == 'launch' (msg['trigger'] == 'on_re_apply') end - def cancel_process (msg) + # Handles a 'cancel_process' msg (translates it into a "cancel root + # expression of that process" msg). + # + # Also works for 'kill_process' msgs. + # + def cancel_process(msg) root = @storage.find_root_expression(msg['wfid']) return unless root @@ -330,9 +353,9 @@ 'fei' => root['fei'], 'wfid' => msg['wfid'], # indicates this was triggered by cancel_process 'flavour' => flavour) end - alias :kill_process :cancel_process + alias kill_process cancel_process end end