lib/ruote/worker.rb in ruote-2.2.0 vs lib/ruote/worker.rb in ruote-2.3.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -32,90 +32,98 @@ # # Read more at http://ruote.rubyforge.org/configuration.html # class Worker - EXP_ACTIONS = %w[ reply cancel fail receive dispatched ] + EXP_ACTIONS = %w[ reply cancel fail receive dispatched pause resume ] # 'apply' is comprised in 'launch' # 'receive' is a ParticipantExpression alias for 'reply' - PROC_ACTIONS = %w[ cancel_process kill_process ] - DISP_ACTIONS = %w[ dispatch dispatch_cancel ] + PROC_ACTIONS = %w[ cancel kill pause resume ].collect { |a| a + '_process' } + DISP_ACTIONS = %w[ dispatch dispatch_cancel dispatch_pause dispatch_resume ] + attr_reader :name + attr_reader :storage attr_reader :context + attr_reader :state attr_reader :run_thread - attr_reader :running # Given a storage, creates a new instance of a Worker. # - def initialize(storage) + def initialize(name, storage=nil) - @subscribers = [] - # must be ready before the storage is created - # services like Logger to subscribe to the worker + if storage.nil? + storage = name + name = nil + end - @storage = storage - @context = Ruote::Context.new(storage, self) + @name = name || 'worker' + if storage.respond_to?(:storage) + @storage = storage.storage + @context = storage.context + else + @storage = storage + @context = Ruote::Context.new(storage) + end + + service_name = @name + service_name << '_worker' unless service_name.match(/worker$/) + + @context.add_service(service_name, self) + @last_time = Time.at(0.0).utc # 1970... - @running = true + @state = 'running' @run_thread = nil + @state_mutex = Mutex.new @msgs = [] - @sleep_time = 0.000 + + @sleep_time = @context['restless_worker'] ? nil : 0.000 + + @info = @context['worker_info_enabled'] == false ? nil : Info.new(self) end # Runs the worker in the current thread. See #run_in_thread for running # in a dedicated thread. # def run - step while @running + step while @state != 'stopped' end # Triggers the run method of the worker in a dedicated thread. # def run_in_thread - Thread.abort_on_exception = true - # TODO : remove me at some point + #Thread.abort_on_exception = true - @running = true + @state = 'running' @run_thread = Thread.new { run } + @run_thread['ruote_worker'] = self end # Joins the run thread of this worker (if there is no such thread, this # method will return immediately, without any effect). # def join - @run_thread.join if @run_thread + @run_thread.join rescue nil end - # Loggers and trackers call this method when subscribing for events / - # actions in this worker. - # - def subscribe(actions, subscriber) - - @subscribers << [ actions, subscriber ] - end - # Shuts down this worker (makes sure it won't fetch further messages # and schedules). # def shutdown - @running = false + @state_mutex.synchronize { @state = 'stopped' } - begin - @run_thread.join - rescue Exception => e - end + join end # Returns true if the engine system is inactive, ie if all the process # instances are terminated or are stuck in an error. # @@ -142,74 +150,196 @@ (wfids - error_wfids == []) end protected - # One worker step, fetches schedules and triggers those whose time has - # came, then fetches msgs and processes them. + # If worker_state_enabled is set, check for a potential new state. # - def step + def determine_state + @state_mutex.synchronize do + + @state = ( + @storage.get('variables', 'worker') || { 'state' => 'running' } + )['state'] if @state != 'stopped' && @context['worker_state_enabled'] + end + end + + # Gets schedules from the storage and if their time has come, + # turns them into msg (for immediate execution). + # + def process_schedules + now = Time.now.utc delta = now - @last_time - if delta >= 0.8 + return if delta < 0.8 # - # at most once per second, deal with 'ats' and 'crons' + # consider schedules at most twice per second (don't do that job + # too often) - @last_time = now + @last_time = now - @storage.get_schedules(delta, now).each { |sche| trigger(sche) } - end + @storage.get_schedules(delta, now).each { |s| turn_schedule_to_msg(s) } + end - # msgs + # Gets msgs from the storage (if needed) and processes them one by one. + # + def process_msgs @msgs = @storage.get_msgs if @msgs.empty? - processed = 0 collisions = 0 - while msg = @msgs.shift + while @msg = @msgs.shift - r = process(msg) + r = process(@msg) if r != false - processed += 1 + @processed_msgs += 1 else collisions += 1 end if collisions > 2 @msgs = @msgs[(@msgs.size / 2)..-1] || [] + collisions = 0 end - #@msgs.concat(@storage.get_local_msgs) + break if Time.now.utc - @last_time >= 0.8 + end + end - #print r == false ? '*' : '.' + # Some storage implementations cache information before a step + # begins, reducing the number of requests to the underlying + # data system. This begin step notifies the storage that + # a new step is on and it should refresh the cached information. + # + # The storage implementation, if it supports this feature, will cache + # the information in the thread local info. + # + def begin_step - break if Time.now.utc - @last_time >= 0.8 + Thread.current['ruote_worker'] = self + + @storage.begin_step if @storage.respond_to?(:begin_step) + end + + # One worker step, fetches schedules and triggers those whose time has + # came, then fetches msgs and processes them. + # + def step + + begin_step + + @msg = nil + @processed_msgs = 0 + + determine_state + + if @state == 'stopped' + return + elsif @state == 'running' + process_schedules + process_msgs end - #p processed + take_a_rest # 'running' or 'paused' - if processed == 0 + rescue => err + + handle_step_error(err, @msg) # msg may be nil + end + + # This default implementation dumps error information to $stderr as + # soon as #step intercepts the error. + # + # Normally such information should only appear when developing a + # storage, the information here is thus helpful for storage developers. + # If such info is emitted in production or in application development, + # you should pass the info to the storage developer/maintainer. + # + # Feel free to override this method if you need it to output to + # a channel different than $stderr (or rebind $stderr). + # + # The second parameter is "msg", if the error occured while processing a + # msg, then this message is passed to handle_step_error. msg will be + # nil if the error occurred while doing get_msgs or get_schedules. + # + def handle_step_error(err, msg) + + $stderr.puts '#' * 80 + $stderr.puts + $stderr.puts '** worker#step intercepted exception **' + $stderr.puts + $stderr.puts "Please report issue or fix your #{@storage.class} impl," + $stderr.puts + $stderr.puts "or override Ruote::Worker#handle_step_error(e, msg) so that" + $stderr.puts "the issue is dealt with appropriately. For example:" + $stderr.puts + $stderr.puts " class Ruote::Worker" + $stderr.puts " def handle_step_error(e, msg)" + $stderr.puts " logger.error('ruote step error: ' + e.inspect)" + $stderr.puts " mailer.send_error('admin@acme.com', e)" + $stderr.puts " end" + $stderr.puts " end" + $stderr.puts + $stderr.puts '# ' * 40 + $stderr.puts + $stderr.puts 'error class/message/backtrace:' + $stderr.puts err.class.name + $stderr.puts err.message.inspect + $stderr.puts *err.backtrace + $stderr.puts err.details if err.respond_to?(:details) + $stderr.puts + $stderr.puts 'msg:' + if msg && msg.is_a?(Hash) + $stderr.puts msg.select { |k, v| + %w[ action wfid fei ].include?(k) + }.inspect + else + $stderr.puts msg.inspect + end + $stderr.puts + $stderr.puts '#' * 80 + + $stderr.flush + end + + # In order not to hammer the storage for msgs too much, take a rest. + # + # If the number of processed messages is more than zero, there are probably + # more msgs coming, no time for a rest... + # + # If @sleep_time is nil (restless_worker option set to true), the worker + # will never rest. + # + def take_a_rest + + return if @sleep_time == nil + + if @processed_msgs < 1 + @sleep_time += 0.001 @sleep_time = 0.499 if @sleep_time > 0.499 + sleep(@sleep_time) + else + @sleep_time = 0.000 end end # Given a schedule, attempts to trigger it. # - # It first tries to - # reserve the schedule. If the reservation fails (another worker - # was successful probably), false is returned. The schedule is - # triggered if the reservation was successful, true is returned. + # It first tries to reserve the schedule. If the reservation fails + # (another worker was successful probably), false is returned. + # The schedule is triggered if the reservation was successful, true + # is returned. # - def trigger(schedule) + def turn_schedule_to_msg(schedule) msg = Ruote.fulldup(schedule['msg']) return false unless @storage.reserve(schedule) @@ -232,55 +362,70 @@ return false unless @storage.reserve(msg) begin - action = msg['action'] + @context.pre_notify(msg) - if msg['tree'] - # - # warning here, it could be a reply, with a 'tree' key... + case msg['action'] - launch(msg) + when 'launch', 'apply', 'regenerate' - elsif EXP_ACTIONS.include?(action) + launch(msg) - Ruote::Exp::FlowExpression.do_action(@context, msg) + when *EXP_ACTIONS - elsif DISP_ACTIONS.include?(action) + Ruote::Exp::FlowExpression.do_action(@context, msg) - @context.dispatch_pool.handle(msg) + when *DISP_ACTIONS - elsif PROC_ACTIONS.include?(action) + @context.dispatch_pool.handle(msg) - self.send(action, msg) + when *PROC_ACTIONS - #else - # msg got deleted, might still be interesting for a subscriber + self.send(msg['action'], msg) + + when 'reput' + + reput(msg) + + when 'raise' + + handle_msg_error(msg['msg'], msg['error']) + + when 'respark' + + respark(msg) + + #else + # no special processing required for message, let it pass + # to the subscribers (the notify two lines after) end - notify(msg) + @context.notify(msg) + # notify subscribers of successfully processed msgs - rescue => exception + rescue => err - @context.error_handler.msg_handle(msg, exception) + handle_msg_error(msg, err) end + @context.storage.done(msg) if @context.storage.respond_to?(:done) + + @info << msg if @info + # for the stats + true end - # Given a successfully executed msg, now notifies all the subscribers - # interested in the kind of action the msg ordered. + # Passes the msg and the err it resulted in to the error_handler. # - def notify(msg) + # Some storage/worker implementation may want to override this. + # + def handle_msg_error(msg, err) - @subscribers.each do |actions, subscriber| - - if actions == :all || actions.include?(msg['action']) - subscriber.notify(msg) - end - end + @context.error_handler.msg_handle(msg, err) end # Works for both the 'launch' and the 'apply' msgs. # # Creates a new expression, gives and applies it with the @@ -288,28 +433,47 @@ # def launch(msg) tree = msg['tree'] variables = msg['variables'] + wi = msg['workitem'] exp_class = @context.expmap.expression_class(tree.first) # msg['wfid'] only : it's a launch # msg['fei'] : it's a sub launch (a supplant ?) + if is_launch?(msg, exp_class) + + name = tree[1]['name'] || tree[1].keys.find { |k| tree[1][k] == nil } + revision = tree[1]['revision'] || tree[1]['rev'] + + wi['wf_name'] ||= name + wi['wf_revision'] ||= revision + wi['wf_launched_at'] ||= Ruote.now_to_utc_s + + wi['sub_wf_name'] = name + wi['sub_wf_revision'] = revision + wi['sub_wf_launched_at'] = Ruote.now_to_utc_s + end + exp_hash = { 'fei' => msg['fei'] || { 'engine_id' => @context.engine_id, 'wfid' => msg['wfid'], 'subid' => Ruote.generate_subid(msg.inspect), - 'expid' => '0' }, + 'expid' => msg['expid'] || '0' }, 'parent_id' => msg['parent_id'], - 'original_tree' => tree, 'variables' => variables, - 'applied_workitem' => msg['workitem'], - 'forgotten' => msg['forgotten'] - } + 'applied_workitem' => wi, + 'forgotten' => msg['forgotten'], + 'lost' => msg['lost'], + 'flanking' => msg['flanking'], + 'stash' => msg['stash'], + 'trigger' => msg['trigger'], + 'on_reply' => msg['on_reply'], + 'supplanted' => msg['supplanted'] } if not exp_class exp_class = Ruote::Exp::RefExpression @@ -318,23 +482,34 @@ def_name, tree = Ruote::Exp::DefineExpression.reorganize(tree) variables[def_name] = [ '0', tree ] if def_name exp_class = Ruote::Exp::SequenceExpression end - exp = exp_class.new(@context, exp_hash.merge!('original_tree' => tree)) + exp_hash = exp_hash.reject { |k, v| v.nil? } + # compact nils away + exp_hash['original_tree'] = tree + # keep track of original tree + + exp = exp_class.new(@context, exp_hash) + exp.initial_persist - exp.do_apply(msg) + exp.do(:apply, msg) end # Returns true if the msg is a "launch" (ie not a simply "apply"). # def is_launch?(msg, exp_class) - return false if exp_class != Ruote::Exp::DefineExpression - return true if msg['action'] == 'launch' - (msg['trigger'] == 'on_re_apply') + if exp_class != Ruote::Exp::DefineExpression + false + elsif %w[ launch regenerate ].include?(msg['action']) + true + else + (msg['trigger'] == 'on_re_apply') + # let re-apply "define" blocks, as in Ruote.define {} + end end # Handles a 'cancel_process' msg (translates it into a "cancel root # expression of that process" msg). # @@ -344,18 +519,172 @@ root = @storage.find_root_expression(msg['wfid']) return unless root - flavour = (msg['action'] == 'kill_process') ? 'kill' : nil - @storage.put_msg( 'cancel', 'fei' => root['fei'], 'wfid' => msg['wfid'], # indicates this was triggered by cancel_process - 'flavour' => flavour) + 'flavour' => msg['flavour']) end alias kill_process cancel_process + + # Handles 'pause_process' and 'resume_process'. + # + def pause_process(msg) + + root = @storage.find_root_expression(msg['wfid']) + + return unless root + + @storage.put_msg( + msg['action'] == 'pause_process' ? 'pause' : 'resume', + 'fei' => root['fei'], + 'wfid' => msg['wfid']) # it was triggered by {pause|resume}_process + end + + alias resume_process pause_process + + # Reputs a doc or a msg. + # + # Used by certain storage implementations to pass documents around workers + # or to reschedule msgs (see ruote-swf). + # + def reput(msg) + + if doc = msg['doc'] + + r = @storage.put(doc) + + return unless r.is_a?(Hash) + + doc['_rev'] = r['_rev'] + + reput(msg) + + elsif msg = msg['msg'] + + @storage.put_msg(msg['action'], msg) + end + end + + # This action resparks a stalled workflow instance. It's usually + # triggered via Dashboard#respark + # + # It's been made into a msg (worker action) in order to facilitate + # migration tooling (ruote-swf for example). + # + def respark(msg) + + wfid = msg['wfid'] + opts = msg['respark'] + + ps = ProcessStatus.fetch(@context, [ wfid ], {}).first + + error_feis = ps.errors.collect(&:fei) + errors_too = !! opts['errors_too'] + + ps.leaves.each do |fexp| + + next if errors_too == false && error_feis.include?(fexp.fei) + + @context.storage.put_msg( + 'cancel', 'fei' => fexp.fei.h, 're_apply' => {}) + end + end + + # + # Gathering stats about this worker. + # + # Those stats can then be obtained via Dashboard#worker_info + # (Engine#worker_info). + # + class Info + + def initialize(worker) + + @worker = worker + @ip = Ruote.local_ip + @hostname = Socket.gethostname + @system = `uname -a`.strip rescue nil + + @since = Time.now + @msgs = [] + @last_save = Time.now - 2 * 60 + end + + def <<(msg) + + if msg['put_at'].nil? + puts '-' * 80 + puts "msg missing 'put_at':" + pp msg + puts '-' * 80 + end + + @msgs << { + 'processed_at' => Ruote.now_to_utc_s, + 'wait_time' => Time.now - Time.parse(msg['put_at']) + #'action' => msg['action'] + } + + save if Time.now > @last_save + 60 + end + + protected + + def save + + doc = @worker.storage.get('variables', 'workers') || {} + + doc['type'] = 'variables' + doc['_id'] = 'workers' + + now = Time.now + + @msgs = @msgs.drop_while { |msg| + Time.parse(msg['processed_at']) < now - 3600 + } + mm = @msgs.drop_while { |msg| + Time.parse(msg['processed_at']) < now - 60 + } + + hour_count = @msgs.size < 1 ? 1 : @msgs.size + minute_count = mm.size < 1 ? 1 : mm.size + + key = [ + @worker.name, @ip.gsub(/\./, '_'), $$.to_s + ].join('/') + + (doc['workers'] ||= {})[key] = { + + 'class' => @worker.class.to_s, + 'name' => @name, + 'ip' => @ip, + 'hostname' => @hostname, + 'pid' => $$, + 'system' => @system, + 'put_at' => Ruote.now_to_utc_s, + 'uptime' => Time.now - @since, + + 'processed_last_minute' => + minute_count, + 'wait_time_last_minute' => + mm.inject(0.0) { |s, m| s + m['wait_time'] } / minute_count.to_f, + 'processed_last_hour' => + hour_count, + 'wait_time_last_hour' => + @msgs.inject(0.0) { |s, m| s + m['wait_time'] } / hour_count.to_f + } + + r = @worker.storage.put(doc) + + @last_save = Time.now + + save if r != nil + end + end end end