require 'active_support' require_relative './base' require_relative '../status_server' module HermesMessengerOfTheGods module Concerns module Worker extend ActiveSupport::Concern include HermesMessengerOfTheGods::Concerns::Base included do attr_writer :name attr_accessor :consecutive_failures, :last_task_performed def health_check_enabled? ENV.fetch('ENABLE_HERMES_HEALTH_CHECK', 'false') != 'false' end def expected_work_frequency ENV.fetch('HERMES_MINIMUM_WORK_FREQUENCY', 60).to_i end def healthy? !endpoint.has_pending_work? || ( last_task_performed && Time.now - last_task_performed < expected_work_frequency ) end def work_off WorkerStatusServer.start!(worker: self) if health_check_enabled? trap(:TERM) { endpoint.shutdown! } trap(:INT) { endpoint.shutdown! } instrument(:starting) self.consecutive_failures = 0 endpoint.work_off do |job, original_message| self.last_task_performed = Time.now built_job = deserialize(job, original_message) instrument(:starting_job, job: built_job) built_job.validate! if built_job.respond_to?(:validate!) run_job(built_job) handle_success(built_job, job) rescue StandardError => e handle_failure(job, e) max_consecutive_failures = self.class.max_consecutive_failures if max_consecutive_failures.is_a?(Integer) && HermesMessengerOfTheGods.config.kill_on_consecutive_failures && consecutive_failures >= max_consecutive_failures instrument(:consecutive_failure_shutdown) return Process.kill('TERM', Process.getpgid(Process.ppid)) end circuit_breaker_error = self.class.circuit_breaker_errors[e.class] matching_circuit_breaker_error = self.class.circuit_breaker_errors.keys.detect { |potential_circuit_breaker| e.is_a?(potential_circuit_breaker) } if matching_circuit_breaker_error circuit_breaker_error = self.class.circuit_breaker_errors[matching_circuit_breaker_error] sleep_time = circuit_breaker_error[:sleep] || 0 Kernel.sleep(sleep_time) unless sleep_time.zero? raise e if !!circuit_breaker_error[:fatal] end throw(:skip_delete, true) end rescue StandardError => e instrument(:fatal_error, exception: e) raise ensure endpoint.teardown end def run_job(job) instrument(:run_job, job: job) do if respond_to?(:perform) perform(job) elsif job.respond_to?(:perform) job.perform else raise 'You need to define a run_job method in the worker, or a perform method on the message' end end end def handle_success(built_job, raw_job) instrument(:success, job: built_job) endpoint.handle_success(raw_job) self.consecutive_failures = 0 end def handle_failure(job, e) instrument(:failure, job: job, error: e) endpoint.handle_failure(job, e) self.consecutive_failures ||= 0 self.consecutive_failures += 1 end def deserialize(raw_job, original_message) instrument(:deserialization, job: raw_job) do deserialize_method = self.class.deserialize_method || :from_message self.class.deserialize_with.send(deserialize_method, raw_job).tap do |obj| obj.instance_variable_set :@original_message, original_message end end end def endpoint @endpoint ||= endpoints.values.first end def log_message_prefix "Worker #{name}(pid: #{$PROCESS_ID})" end def name @name ||= self.class.to_s end end class_methods do attr_reader :endpoints attr_accessor :deserialize_with, :deserialize_method def build_worker new end def work_off build_worker.work_off end def max_consecutive_failures @max_consecutive_failures end def max_consecutive_failures=(val) raise 'Expected an Integer' unless val.is_a?(Integer) || val.is_a?(NilClass) @max_consecutive_failures = val end def circuit_breaker_errors @circuit_breaker_errors || {} end def circuit_breaker_errors=(val) val ||= {} raise 'Expected a hash' unless val.is_a?(Hash) val.each do |err, _actions| val[err][:sleep] ||= 0 val[err][:fatal] ||= false end val.each do |_err, actions| raise ':sleep must be a number' unless actions[:sleep].is_a?(Numeric) raise ':fatal must be a boolean' unless [true, false].include?(actions[:fatal]) end @circuit_breaker_errors = val end def endpoints=(val) raise 'Expected an endpoint' unless val.is_a?(Hash) raise 'Workers can only have one defined endpoint' unless val.length == 1 @endpoints = val end end end end end