require 'flydata-core/logger' require 'flydata-core/errors' module Flydata module Helper module Worker include FlydataCore::Logger def initialize @stop_flag = ServerEngine::BlockingFlag.new super end # ServerEngine hook point def run add_log_context_items(object_id: self.object_id) log_debug("run") until @stop_flag.set? run_once end rescue => e log_error_with_backtrace("Unexpected error.", error: e) @stop_flag.wait_for_set(5.0) raise e end # ServerEngine hook point def stop log_debug("stop") @stop_flag.set! end # ServerEngine hook point def reload log_debug("reload") super self.stop end # ServerEngine hook point def after_start log_debug("after_start") end private def run_once # Get action ownership action = wait_action_ownership return if action.nil? action_ownership = action[:action_ownership] action_info = action[:action_info] # Process action action_id = action_info && action_info[:id] ? "[#{action_info[:id]}]" : "" #scheduled actions (eg:- check_remote_action) don't have id set_log_context_item(:prefix, "[worker][#{action_ownership.action_name}]#{action_id}") begin action_ownership.action_class.new(config).execute(action_info) do |new_action_name, new_action_info = nil| # request action if the action requires server.action_ownership_channel.request_action(new_action_name, new_action_info) end action_ownership.reset_retry_count rescue => err handle_error(action, err) ensure # Return action ownership to channel server.action_ownership_channel. return_action_ownership(action_ownership) end end def handle_error(action, err) action_ownership = action[:action_ownership] action_info = action[:action_info] action_name = action_ownership.action_name r_cnt = action_ownership.increment_retry_count retryable = err.kind_of?(FlydataCore::RetryableError) log_level = :error # logging msg = ["Failed to handle action.", { action: action_name, retry_cnt: r_cnt, retryable: retryable, error: err, }] if retryable if r_cnt > helper_conf.helper_retry_alert_limit msg[1][:error] = err.original_exception else log_level = :warn end end case log_level when :warn log_warn(*msg) else log_error_with_backtrace(*msg) end if r_cnt < helper_conf.helper_retry_limit return if @stop_flag.wait_for_set(helper_conf.helper_retry_interval) server.action_ownership_channel.request_action(action_name, action_info) else log_error("Retry limit reached. Not retrying anymore.") action_ownership.reset_retry_count end end def helper_conf config[:helper] end def wait_action_ownership action = nil loop do action = server.action_ownership_channel.take_action_ownership(self) break if action return nil if @stop_flag.wait_for_set(0.5) end action end def custom_log_items super.merge(prefix: '[worker]') end end end end