lib/ruote/worker.rb in ruote-2.2.0 vs lib/ruote/worker.rb in ruote-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -32,90 +32,98 @@
#
# Read more at http://ruote.rubyforge.org/configuration.html
#
class Worker
- EXP_ACTIONS = %w[ reply cancel fail receive dispatched ]
+ EXP_ACTIONS = %w[ reply cancel fail receive dispatched pause resume ]
# 'apply' is comprised in 'launch'
# 'receive' is a ParticipantExpression alias for 'reply'
- PROC_ACTIONS = %w[ cancel_process kill_process ]
- DISP_ACTIONS = %w[ dispatch dispatch_cancel ]
+ PROC_ACTIONS = %w[ cancel kill pause resume ].collect { |a| a + '_process' }
+ DISP_ACTIONS = %w[ dispatch dispatch_cancel dispatch_pause dispatch_resume ]
+ attr_reader :name
+
attr_reader :storage
attr_reader :context
+ attr_reader :state
attr_reader :run_thread
- attr_reader :running
# Given a storage, creates a new instance of a Worker.
#
- def initialize(storage)
+ def initialize(name, storage=nil)
- @subscribers = []
- # must be ready before the storage is created
- # services like Logger to subscribe to the worker
+ if storage.nil?
+ storage = name
+ name = nil
+ end
- @storage = storage
- @context = Ruote::Context.new(storage, self)
+ @name = name || 'worker'
+ if storage.respond_to?(:storage)
+ @storage = storage.storage
+ @context = storage.context
+ else
+ @storage = storage
+ @context = Ruote::Context.new(storage)
+ end
+
+ service_name = @name
+ service_name << '_worker' unless service_name.match(/worker$/)
+
+ @context.add_service(service_name, self)
+
@last_time = Time.at(0.0).utc # 1970...
- @running = true
+ @state = 'running'
@run_thread = nil
+ @state_mutex = Mutex.new
@msgs = []
- @sleep_time = 0.000
+
+ @sleep_time = @context['restless_worker'] ? nil : 0.000
+
+ @info = @context['worker_info_enabled'] == false ? nil : Info.new(self)
end
# Runs the worker in the current thread. See #run_in_thread for running
# in a dedicated thread.
#
def run
- step while @running
+ step while @state != 'stopped'
end
# Triggers the run method of the worker in a dedicated thread.
#
def run_in_thread
- Thread.abort_on_exception = true
- # TODO : remove me at some point
+ #Thread.abort_on_exception = true
- @running = true
+ @state = 'running'
@run_thread = Thread.new { run }
+ @run_thread['ruote_worker'] = self
end
# Joins the run thread of this worker (if there is no such thread, this
# method will return immediately, without any effect).
#
def join
- @run_thread.join if @run_thread
+ @run_thread.join rescue nil
end
- # Loggers and trackers call this method when subscribing for events /
- # actions in this worker.
- #
- def subscribe(actions, subscriber)
-
- @subscribers << [ actions, subscriber ]
- end
-
# Shuts down this worker (makes sure it won't fetch further messages
# and schedules).
#
def shutdown
- @running = false
+ @state_mutex.synchronize { @state = 'stopped' }
- begin
- @run_thread.join
- rescue Exception => e
- end
+ join
end
# Returns true if the engine system is inactive, ie if all the process
# instances are terminated or are stuck in an error.
#
@@ -142,74 +150,196 @@
(wfids - error_wfids == [])
end
protected
- # One worker step, fetches schedules and triggers those whose time has
- # came, then fetches msgs and processes them.
+ # If worker_state_enabled is set, check for a potential new state.
#
- def step
+ def determine_state
+ @state_mutex.synchronize do
+
+ @state = (
+ @storage.get('variables', 'worker') || { 'state' => 'running' }
+ )['state'] if @state != 'stopped' && @context['worker_state_enabled']
+ end
+ end
+
+ # Gets schedules from the storage and if their time has come,
+ # turns them into msg (for immediate execution).
+ #
+ def process_schedules
+
now = Time.now.utc
delta = now - @last_time
- if delta >= 0.8
+ return if delta < 0.8
#
- # at most once per second, deal with 'ats' and 'crons'
+ # consider schedules at most twice per second (don't do that job
+ # too often)
- @last_time = now
+ @last_time = now
- @storage.get_schedules(delta, now).each { |sche| trigger(sche) }
- end
+ @storage.get_schedules(delta, now).each { |s| turn_schedule_to_msg(s) }
+ end
- # msgs
+ # Gets msgs from the storage (if needed) and processes them one by one.
+ #
+ def process_msgs
@msgs = @storage.get_msgs if @msgs.empty?
- processed = 0
collisions = 0
- while msg = @msgs.shift
+ while @msg = @msgs.shift
- r = process(msg)
+ r = process(@msg)
if r != false
- processed += 1
+ @processed_msgs += 1
else
collisions += 1
end
if collisions > 2
@msgs = @msgs[(@msgs.size / 2)..-1] || []
+ collisions = 0
end
- #@msgs.concat(@storage.get_local_msgs)
+ break if Time.now.utc - @last_time >= 0.8
+ end
+ end
- #print r == false ? '*' : '.'
+ # Some storage implementations cache information before a step
+ # begins, reducing the number of requests to the underlying
+ # data system. This begin step notifies the storage that
+ # a new step is on and it should refresh the cached information.
+ #
+ # The storage implementation, if it supports this feature, will cache
+ # the information in the thread local info.
+ #
+ def begin_step
- break if Time.now.utc - @last_time >= 0.8
+ Thread.current['ruote_worker'] = self
+
+ @storage.begin_step if @storage.respond_to?(:begin_step)
+ end
+
+ # One worker step, fetches schedules and triggers those whose time has
+ # came, then fetches msgs and processes them.
+ #
+ def step
+
+ begin_step
+
+ @msg = nil
+ @processed_msgs = 0
+
+ determine_state
+
+ if @state == 'stopped'
+ return
+ elsif @state == 'running'
+ process_schedules
+ process_msgs
end
- #p processed
+ take_a_rest # 'running' or 'paused'
- if processed == 0
+ rescue => err
+
+ handle_step_error(err, @msg) # msg may be nil
+ end
+
+ # This default implementation dumps error information to $stderr as
+ # soon as #step intercepts the error.
+ #
+ # Normally such information should only appear when developing a
+ # storage, the information here is thus helpful for storage developers.
+ # If such info is emitted in production or in application development,
+ # you should pass the info to the storage developer/maintainer.
+ #
+ # Feel free to override this method if you need it to output to
+ # a channel different than $stderr (or rebind $stderr).
+ #
+ # The second parameter is "msg", if the error occured while processing a
+ # msg, then this message is passed to handle_step_error. msg will be
+ # nil if the error occurred while doing get_msgs or get_schedules.
+ #
+ def handle_step_error(err, msg)
+
+ $stderr.puts '#' * 80
+ $stderr.puts
+ $stderr.puts '** worker#step intercepted exception **'
+ $stderr.puts
+ $stderr.puts "Please report issue or fix your #{@storage.class} impl,"
+ $stderr.puts
+ $stderr.puts "or override Ruote::Worker#handle_step_error(e, msg) so that"
+ $stderr.puts "the issue is dealt with appropriately. For example:"
+ $stderr.puts
+ $stderr.puts " class Ruote::Worker"
+ $stderr.puts " def handle_step_error(e, msg)"
+ $stderr.puts " logger.error('ruote step error: ' + e.inspect)"
+ $stderr.puts " mailer.send_error('admin@acme.com', e)"
+ $stderr.puts " end"
+ $stderr.puts " end"
+ $stderr.puts
+ $stderr.puts '# ' * 40
+ $stderr.puts
+ $stderr.puts 'error class/message/backtrace:'
+ $stderr.puts err.class.name
+ $stderr.puts err.message.inspect
+ $stderr.puts *err.backtrace
+ $stderr.puts err.details if err.respond_to?(:details)
+ $stderr.puts
+ $stderr.puts 'msg:'
+ if msg && msg.is_a?(Hash)
+ $stderr.puts msg.select { |k, v|
+ %w[ action wfid fei ].include?(k)
+ }.inspect
+ else
+ $stderr.puts msg.inspect
+ end
+ $stderr.puts
+ $stderr.puts '#' * 80
+
+ $stderr.flush
+ end
+
+ # In order not to hammer the storage for msgs too much, take a rest.
+ #
+ # If the number of processed messages is more than zero, there are probably
+ # more msgs coming, no time for a rest...
+ #
+ # If @sleep_time is nil (restless_worker option set to true), the worker
+ # will never rest.
+ #
+ def take_a_rest
+
+ return if @sleep_time == nil
+
+ if @processed_msgs < 1
+
@sleep_time += 0.001
@sleep_time = 0.499 if @sleep_time > 0.499
+
sleep(@sleep_time)
+
else
+
@sleep_time = 0.000
end
end
# Given a schedule, attempts to trigger it.
#
- # It first tries to
- # reserve the schedule. If the reservation fails (another worker
- # was successful probably), false is returned. The schedule is
- # triggered if the reservation was successful, true is returned.
+ # It first tries to reserve the schedule. If the reservation fails
+ # (another worker was successful probably), false is returned.
+ # The schedule is triggered if the reservation was successful, true
+ # is returned.
#
- def trigger(schedule)
+ def turn_schedule_to_msg(schedule)
msg = Ruote.fulldup(schedule['msg'])
return false unless @storage.reserve(schedule)
@@ -232,55 +362,70 @@
return false unless @storage.reserve(msg)
begin
- action = msg['action']
+ @context.pre_notify(msg)
- if msg['tree']
- #
- # warning here, it could be a reply, with a 'tree' key...
+ case msg['action']
- launch(msg)
+ when 'launch', 'apply', 'regenerate'
- elsif EXP_ACTIONS.include?(action)
+ launch(msg)
- Ruote::Exp::FlowExpression.do_action(@context, msg)
+ when *EXP_ACTIONS
- elsif DISP_ACTIONS.include?(action)
+ Ruote::Exp::FlowExpression.do_action(@context, msg)
- @context.dispatch_pool.handle(msg)
+ when *DISP_ACTIONS
- elsif PROC_ACTIONS.include?(action)
+ @context.dispatch_pool.handle(msg)
- self.send(action, msg)
+ when *PROC_ACTIONS
- #else
- # msg got deleted, might still be interesting for a subscriber
+ self.send(msg['action'], msg)
+
+ when 'reput'
+
+ reput(msg)
+
+ when 'raise'
+
+ handle_msg_error(msg['msg'], msg['error'])
+
+ when 'respark'
+
+ respark(msg)
+
+ #else
+ # no special processing required for message, let it pass
+ # to the subscribers (the notify two lines after)
end
- notify(msg)
+ @context.notify(msg)
+ # notify subscribers of successfully processed msgs
- rescue => exception
+ rescue => err
- @context.error_handler.msg_handle(msg, exception)
+ handle_msg_error(msg, err)
end
+ @context.storage.done(msg) if @context.storage.respond_to?(:done)
+
+ @info << msg if @info
+ # for the stats
+
true
end
- # Given a successfully executed msg, now notifies all the subscribers
- # interested in the kind of action the msg ordered.
+ # Passes the msg and the err it resulted in to the error_handler.
#
- def notify(msg)
+ # Some storage/worker implementation may want to override this.
+ #
+ def handle_msg_error(msg, err)
- @subscribers.each do |actions, subscriber|
-
- if actions == :all || actions.include?(msg['action'])
- subscriber.notify(msg)
- end
- end
+ @context.error_handler.msg_handle(msg, err)
end
# Works for both the 'launch' and the 'apply' msgs.
#
# Creates a new expression, gives and applies it with the
@@ -288,28 +433,47 @@
#
def launch(msg)
tree = msg['tree']
variables = msg['variables']
+ wi = msg['workitem']
exp_class = @context.expmap.expression_class(tree.first)
# msg['wfid'] only : it's a launch
# msg['fei'] : it's a sub launch (a supplant ?)
+ if is_launch?(msg, exp_class)
+
+ name = tree[1]['name'] || tree[1].keys.find { |k| tree[1][k] == nil }
+ revision = tree[1]['revision'] || tree[1]['rev']
+
+ wi['wf_name'] ||= name
+ wi['wf_revision'] ||= revision
+ wi['wf_launched_at'] ||= Ruote.now_to_utc_s
+
+ wi['sub_wf_name'] = name
+ wi['sub_wf_revision'] = revision
+ wi['sub_wf_launched_at'] = Ruote.now_to_utc_s
+ end
+
exp_hash = {
'fei' => msg['fei'] || {
'engine_id' => @context.engine_id,
'wfid' => msg['wfid'],
'subid' => Ruote.generate_subid(msg.inspect),
- 'expid' => '0' },
+ 'expid' => msg['expid'] || '0' },
'parent_id' => msg['parent_id'],
- 'original_tree' => tree,
'variables' => variables,
- 'applied_workitem' => msg['workitem'],
- 'forgotten' => msg['forgotten']
- }
+ 'applied_workitem' => wi,
+ 'forgotten' => msg['forgotten'],
+ 'lost' => msg['lost'],
+ 'flanking' => msg['flanking'],
+ 'stash' => msg['stash'],
+ 'trigger' => msg['trigger'],
+ 'on_reply' => msg['on_reply'],
+ 'supplanted' => msg['supplanted'] }
if not exp_class
exp_class = Ruote::Exp::RefExpression
@@ -318,23 +482,34 @@
def_name, tree = Ruote::Exp::DefineExpression.reorganize(tree)
variables[def_name] = [ '0', tree ] if def_name
exp_class = Ruote::Exp::SequenceExpression
end
- exp = exp_class.new(@context, exp_hash.merge!('original_tree' => tree))
+ exp_hash = exp_hash.reject { |k, v| v.nil? }
+ # compact nils away
+ exp_hash['original_tree'] = tree
+ # keep track of original tree
+
+ exp = exp_class.new(@context, exp_hash)
+
exp.initial_persist
- exp.do_apply(msg)
+ exp.do(:apply, msg)
end
# Returns true if the msg is a "launch" (ie not a simply "apply").
#
def is_launch?(msg, exp_class)
- return false if exp_class != Ruote::Exp::DefineExpression
- return true if msg['action'] == 'launch'
- (msg['trigger'] == 'on_re_apply')
+ if exp_class != Ruote::Exp::DefineExpression
+ false
+ elsif %w[ launch regenerate ].include?(msg['action'])
+ true
+ else
+ (msg['trigger'] == 'on_re_apply')
+ # let re-apply "define" blocks, as in Ruote.define {}
+ end
end
# Handles a 'cancel_process' msg (translates it into a "cancel root
# expression of that process" msg).
#
@@ -344,18 +519,172 @@
root = @storage.find_root_expression(msg['wfid'])
return unless root
- flavour = (msg['action'] == 'kill_process') ? 'kill' : nil
-
@storage.put_msg(
'cancel',
'fei' => root['fei'],
'wfid' => msg['wfid'], # indicates this was triggered by cancel_process
- 'flavour' => flavour)
+ 'flavour' => msg['flavour'])
end
alias kill_process cancel_process
+
+ # Handles 'pause_process' and 'resume_process'.
+ #
+ def pause_process(msg)
+
+ root = @storage.find_root_expression(msg['wfid'])
+
+ return unless root
+
+ @storage.put_msg(
+ msg['action'] == 'pause_process' ? 'pause' : 'resume',
+ 'fei' => root['fei'],
+ 'wfid' => msg['wfid']) # it was triggered by {pause|resume}_process
+ end
+
+ alias resume_process pause_process
+
+ # Reputs a doc or a msg.
+ #
+ # Used by certain storage implementations to pass documents around workers
+ # or to reschedule msgs (see ruote-swf).
+ #
+ def reput(msg)
+
+ if doc = msg['doc']
+
+ r = @storage.put(doc)
+
+ return unless r.is_a?(Hash)
+
+ doc['_rev'] = r['_rev']
+
+ reput(msg)
+
+ elsif msg = msg['msg']
+
+ @storage.put_msg(msg['action'], msg)
+ end
+ end
+
+ # This action resparks a stalled workflow instance. It's usually
+ # triggered via Dashboard#respark
+ #
+ # It's been made into a msg (worker action) in order to facilitate
+ # migration tooling (ruote-swf for example).
+ #
+ def respark(msg)
+
+ wfid = msg['wfid']
+ opts = msg['respark']
+
+ ps = ProcessStatus.fetch(@context, [ wfid ], {}).first
+
+ error_feis = ps.errors.collect(&:fei)
+ errors_too = !! opts['errors_too']
+
+ ps.leaves.each do |fexp|
+
+ next if errors_too == false && error_feis.include?(fexp.fei)
+
+ @context.storage.put_msg(
+ 'cancel', 'fei' => fexp.fei.h, 're_apply' => {})
+ end
+ end
+
+ #
+ # Gathering stats about this worker.
+ #
+ # Those stats can then be obtained via Dashboard#worker_info
+ # (Engine#worker_info).
+ #
+ class Info
+
+ def initialize(worker)
+
+ @worker = worker
+ @ip = Ruote.local_ip
+ @hostname = Socket.gethostname
+ @system = `uname -a`.strip rescue nil
+
+ @since = Time.now
+ @msgs = []
+ @last_save = Time.now - 2 * 60
+ end
+
+ def <<(msg)
+
+ if msg['put_at'].nil?
+ puts '-' * 80
+ puts "msg missing 'put_at':"
+ pp msg
+ puts '-' * 80
+ end
+
+ @msgs << {
+ 'processed_at' => Ruote.now_to_utc_s,
+ 'wait_time' => Time.now - Time.parse(msg['put_at'])
+ #'action' => msg['action']
+ }
+
+ save if Time.now > @last_save + 60
+ end
+
+ protected
+
+ def save
+
+ doc = @worker.storage.get('variables', 'workers') || {}
+
+ doc['type'] = 'variables'
+ doc['_id'] = 'workers'
+
+ now = Time.now
+
+ @msgs = @msgs.drop_while { |msg|
+ Time.parse(msg['processed_at']) < now - 3600
+ }
+ mm = @msgs.drop_while { |msg|
+ Time.parse(msg['processed_at']) < now - 60
+ }
+
+ hour_count = @msgs.size < 1 ? 1 : @msgs.size
+ minute_count = mm.size < 1 ? 1 : mm.size
+
+ key = [
+ @worker.name, @ip.gsub(/\./, '_'), $$.to_s
+ ].join('/')
+
+ (doc['workers'] ||= {})[key] = {
+
+ 'class' => @worker.class.to_s,
+ 'name' => @name,
+ 'ip' => @ip,
+ 'hostname' => @hostname,
+ 'pid' => $$,
+ 'system' => @system,
+ 'put_at' => Ruote.now_to_utc_s,
+ 'uptime' => Time.now - @since,
+
+ 'processed_last_minute' =>
+ minute_count,
+ 'wait_time_last_minute' =>
+ mm.inject(0.0) { |s, m| s + m['wait_time'] } / minute_count.to_f,
+ 'processed_last_hour' =>
+ hour_count,
+ 'wait_time_last_hour' =>
+ @msgs.inject(0.0) { |s, m| s + m['wait_time'] } / hour_count.to_f
+ }
+
+ r = @worker.storage.put(doc)
+
+ @last_save = Time.now
+
+ save if r != nil
+ end
+ end
end
end