lib/sp/job/back_burner.rb in sp-job-0.1.17 vs lib/sp/job/back_burner.rb in sp-job-0.2.2
- old
+ new
@@ -17,45 +17,144 @@
# along with sp-job. If not, see <http://www.gnu.org/licenses/>.
#
# encoding: utf-8
#
require 'sp/job/pg_connection'
+require 'sp/job/job_db_adapter'
require 'roadie'
+require 'thread'
+module SP
+ module Job
+
+ class JobCancelled < ::StandardError
+
+ end
+
+ class JobAborted < ::StandardError
+ end
+
+ class JobException < ::StandardError
+
+ attr_reader :job
+ attr_reader :args
+
+ def initialize (args:, job: nil)
+ super(args[:message] || $current_job[:tube] || $args[:program_name])
+ @job = job
+ @args = args
+ end
+
+ end
+
+ class Logger < ::Logger
+
+ def task (sequence, text, success = true)
+ if success
+ info "[#{sequence}] #{text} \xE2\x9C\x94".green
+ else
+ info "[#{sequence}] #{text} \xE2\x9D\x8C".red
+ end
+ end
+ end
+
+ class ThreadData < Struct.new(:job_status, :report_time_stamp, :exception_reported, :job_id, :publish_key, :job_key, :current_job, :job_notification, :jsonapi)
+ def initialize
+ @job_status = {}
+ if $config[:options] && $config[:options][:jsonapi] == true
+ @jsonapi = SP::Duh::JSONAPI::Service.new($pg, nil, SP::Job::JobDbAdapter)
+ end
+ end
+ end
+
+ class FauxMutex
+ def synchronize (&block)
+ yield
+ end
+ end
+
+ end
+end
+
+
#
# Initialize global data needed for configuration
#
-$prefix = OS.mac? ? '/usr/local' : '/'
-$rollbar = false
-$bury = false
-$min_progress = 3 # TODO to config??
-$args = {
- stdout: false,
- log_level: 'info',
- program_name: File.basename($PROGRAM_NAME, File.extname($PROGRAM_NAME)),
- config_file: File.join($prefix, 'etc', File.basename($PROGRAM_NAME, File.extname($PROGRAM_NAME)), 'conf.json'),
- log_file: File.join($prefix, 'var', 'log', 'jobs', "#{File.basename($PROGRAM_NAME, File.extname($PROGRAM_NAME))}.log")
+$prefix = OS.mac? ? '/usr/local' : ''
+$rollbar = false
+$min_progress = 3
+$args = {
+ stdout: false,
+ log_level: 'info',
+ program_name: File.basename($PROGRAM_NAME, File.extname($PROGRAM_NAME)),
+ config_file: File.join($prefix, 'etc', File.basename($PROGRAM_NAME, File.extname($PROGRAM_NAME)), 'conf.json'),
+ default_log_file: File.join($prefix, 'var', 'log', 'jobs', "#{File.basename($PROGRAM_NAME, File.extname($PROGRAM_NAME))}.log")
}
#
# Parse command line arguments
#
$option_parser = OptionParser.new do |opts|
opts.banner = "Usage: #{$PROGRAM_NAME} ARGS"
- opts.on('-c', '--config=CONFIG.JSON', "path to json configuration file (default: '#{$args[:config_file]}')") { |v| $args[:config_file] = File.expand_path(v) }
- opts.on('-l', '--log=LOGFILE' , "path to log file (default: '#{$args[:log_file]}')") { |v| $args[:log_file] = File.expand_path(v) }
- opts.on('-d', '--debug' , "developer mode: log to stdout and print job") { $args[:debug] = true }
- opts.on('-v', '--log_level=LEVEL' , "Log level DEBUG, INFO, WARN, ERROR, FATAL") { |v| $args[:log_level] = v }
+ opts.on('-c', '--config=CONFIG.JSON', "path to json configuration file (default: '#{$args[:default_log_file]}')") { |v| $args[:config_file] = File.expand_path(v) }
+ opts.on('-l', '--log=LOGFILE' , "path to log file (default: '#{$args[:log_file]}')") { |v| $args[:log_file] = File.expand_path(v) }
+ opts.on('-d', '--debug' , "developer mode: log to stdout and print job") { $args[:debug] = true }
+ opts.on('-v', '--log_level=LEVEL' , "Log level DEBUG, INFO, WARN, ERROR, FATAL") { |v| $args[:log_level] = v }
+ opts.on('-i', '--index=IDX' , "systemd instance index") { |v| $args[:index] = v }
end
$option_parser.parse!
+if $args[:debug]
+ require 'ruby-debug' if RUBY_ENGINE == 'jruby'
+end
+
+# Adjust log file if need, user specified option always takes precedence
#
+if $args[:log_file].nil?
+ if $args[:index].nil?
+ $args[:log_file] = $args[:default_log_file]
+ else
+ $args[:log_file] = File.join($prefix, 'var', 'log', 'jobs', "#{$args[:program_name]}.#{$args[:index]}.log")
+ end
+end
+
+#
+# Create PID file for this jobs instance
+#
+if OS.mac?
+ Dir.mkdir("#{$prefix}/var/run/jobs") unless Dir.exist? "#{$prefix}/var/run/jobs"
+end
+File.write("#{$prefix}/var/run/jobs/#{$args[:program_name]}#{$args[:index].nil? ? '' : '.' + $args[:index]}.pid", Process.pid)
+
+#
# Read configuration
#
$config = JSON.parse(File.read(File.expand_path($args[:config_file])), symbolize_names: true)
+$min_progress = $config[:options][:min_progress]
#
+# Global data for mutex and sync
+#
+$threads = [ Thread.current ]
+$thread_data = {}
+$thread_data[Thread.current] = ::SP::Job::ThreadData.new
+
+#
+# Sanity check we only support multithreading on JRUBY
+#
+if $config[:options] && $config[:options][:threads].to_i > 1
+ raise 'Multithreading is not supported in MRI/CRuby' unless RUBY_ENGINE == 'jruby'
+ $redis_mutex = Mutex.new
+ $roolbar_mutex = Mutex.new
+ $multithreading = true
+else
+ $redis_mutex = nil
+ $roolbar_mutex = ::SP::Job::FauxMutex.new
+ $multithreading = false
+end
+
+#
# Configure rollbar
#
unless $config[:rollbar].nil?
$rollbar = true
Rollbar.configure do |config|
@@ -67,29 +166,54 @@
#
# Configure backburner queue
#
Backburner.configure do |config|
- config.beanstalk_url = "beanstalk://#{$config[:beanstalkd][:host]}:#{$config[:beanstalkd][:port]}"
- config.on_error = lambda { |e|
- if $exception_reported == false
- $exception_reported == true
- update_progress(status: 'error', message: e)
+ config.beanstalk_url = "beanstalk://#{$config[:beanstalkd][:host]}:#{$config[:beanstalkd][:port]}"
+ config.on_error = lambda { |e|
+ td = thread_data
+ if td.exception_reported == false
+ td.exception_reported = true
+ if e.instance_of? Beaneater::DeadlineSoonError
+ logger.warn "got a deadline warning".red
+ else
+ begin
+ raise_error(message: e)
+ rescue => e
+ # Do not retrow!!!!
+ end
+ end
end
- if $rollbar
- Rollbar.error(e)
+
+ # Report exception to rollbar
+ $roolbar_mutex.synchronize {
+ if $rollbar
+ if e.instance_of? ::SP::Job::JobException
+ e.job[:password] = '<redacted>'
+ Rollbar.error(e, e.message, { job: e.job, args: e.args})
+ else
+ Rollbar.error(e)
+ end
+ end
+ }
+
+ # Catch fatal exception that must be handled with a restarts (systemctl will restart us)
+ case e
+ when PG::UnableToSend, PG::AdminShutdown, PG::ConnectionBad
+ logger.fatal "Lost connection to database exiting now"
+ exit
+ when Redis::CannotConnectError
+ logger.fatal "Can't connect to redis exiting now"
+ exit
end
- catch_fatal_exceptions(e)
}
- #config.priority_labels = { :custom => 50, :useless => 1000 }
- #config.max_job_retries = 0 # default 0 retries
- #config.retry_delay = 5 # default 5 seconds
- #config.default_priority = 65536
+ config.max_job_retries = ($config[:options] && $config[:options][:max_job_retries]) ? $config[:options][:max_job_retries] : 0
+ config.retry_delay = ($config[:options] && $config[:options][:retry_delay]) ? $config[:options][:retry_delay] : 5
config.retry_delay_proc = lambda { |min_retry_delay, num_retries| min_retry_delay + (num_retries ** 3) }
config.respond_timeout = 120
- config.default_worker = SP::Job::Worker
- config.logger = $args[:debug] ? Logger.new(STDOUT) : Logger.new($args[:log_file])
+ config.default_worker = $config[:options] && $config[:options][:threads].to_i > 1 ? SP::Job::WorkerThread : SP::Job::Worker
+ config.logger = $args[:debug] ? SP::Job::Logger.new(STDOUT) : SP::Job::Logger.new($args[:log_file])
config.logger.formatter = proc do |severity, datetime, progname, msg|
date_format = datetime.strftime("%Y-%m-%d %H:%M:%S")
"[#{date_format}] #{severity}: #{msg}\n"
end
if $args[:log_level].nil?
@@ -134,35 +258,99 @@
}
end
end
#
-# Monkey patches to keep the tube name as plain vannila job name
+# Monkey patches the sad reality of ruby development
#
+if RUBY_ENGINE == 'jruby'
+
+ class Logger
+ class LogDevice
+
+ def write(message)
+ begin
+ synchronize do
+ if @shift_age and @dev.respond_to?(:stat)
+ begin
+ check_shift_log
+ rescue
+ warn("log shifting failed. #{$!}")
+ end
+ end
+ begin
+ @dev.write(message)
+ rescue ::SP::Job::JobCancelled => jc
+ raise jc
+ rescue
+ warn("log writing failed. #{$!}")
+ end
+ end
+ rescue ::SP::Job::JobCancelled => jc
+ raise jc
+ rescue Exception => ignored
+ warn("log writing failed. #{ignored}")
+ end
+ end
+
+ end
+ end
+end
+
module Backburner
module Helpers
def expand_tube_name (tube)
tube
end
end
module Logger
- def log_job_begin(name, args)
- log_info "Work job #{name}"
- @job_started_at = Time.now
+
+ if RUBY_ENGINE != 'jruby'
+
+ def log_job_begin(name, args)
+ log_info "Work job #{name}"
+ @job_started_at = Time.now
+ end
+
+ else
+
+ def log_job_begin(name, args)
+ log_info "Work job #{name}"
+ Thread.current[:job_started_at] = Time.now
+ end
+
+ # Print out when a job completed
+ # If message is nil, job is considered complete
+ def log_job_end(name, message = nil)
+ ellapsed = Time.now - Thread.current[:job_started_at]
+ ms = (ellapsed.to_f * 1000).to_i
+ action_word = message ? 'Finished' : 'Completed'
+ log_info("#{action_word} #{name} in #{ms}ms #{message}")
+ end
+
end
end
class Job
+ extend SP::Job::Common # to bring in logger and report_error into this class
+
# Processes a job and handles any failure, deleting the job once complete
#
# @example
# @task.process
#
def process
+ # Invoke the job setup function, bailout if the setup returns false
+ unless job_class.respond_to?(:prepare_job) && job_class.prepare_job(*args)
+ task.delete
+ logger.warn "Delete stale or preempted task".red
+ return false
+ end
+
# Invoke before hook and stop if false
res = @hooks.invoke_hook_events(job_class, :before_perform, *args)
unless res
task.delete
return false
@@ -177,29 +365,100 @@
timeout_job_after(task.ttr > 1 ? task.ttr - 1 : task.ttr) { job_class.perform(*args) }
end
task.delete
# Invoke after perform hook
@hooks.invoke_hook_events(job_class, :after_perform, *args)
+ rescue ::SP::Job::JobAborted => ja
+ #
+ # This exception:
+ # 1. is sent to the rollbar
+ # 2. does not bury the job, instead the job is deleted
+ #
+ logger.debug "Received job aborted exception #{Thread.current}".yellow
+ unless task.nil?
+ logger.debug 'Task deleted'.yellow
+ task.delete
+ end
+ # Invoke after perform hook
+ @hooks.invoke_hook_events(job_class, :after_perform, *args)
+ thread_data.job_id = nil
+ rescue ::SP::Job::JobCancelled => jc
+ #
+ # This exception:
+ # 1. is not sent to the rollbar
+ # 2. does not bury the job, instead the job is deleted
+ #
+ logger.debug "Received job cancellation exception #{Thread.current}".yellow
+ unless task.nil?
+ logger.debug 'Task deleted'.yellow
+ task.delete
+ end
+ @hooks.invoke_hook_events(job_class, :on_failure, jc, *args)
+ report_error(message: 'i18n_job_cancelled', status: 'cancelled')
+ if $redis_mutex.nil?
+ $redis.hset(thread_data.job_key, 'cancelled', true)
+ else
+ $redis_mutex.synchronize {
+ $redis.hset(thread_data.job_key, 'cancelled', true)
+ }
+ end
+ thread_data.job_id = nil
rescue => e
@hooks.invoke_hook_events(job_class, :on_failure, e, *args)
raise e
end
end
end
-# Mix-in the mix-in in the script so that we can use the Common module functions
-require 'sp/job/common'
+# Mix-in the common mix-in to make code available for the lambdas used in this file
extend SP::Job::Common
+logger.debug "Log file ... #{$args[:log_file]}"
+logger.debug "PID ........ #{Process.pid}"
+
#
# Now create the global data needed by the mix-in methods
#
-$connected = false
-$job_status = {}
-$validity = 2
-$redis = Redis.new(:host => $config[:redis][:host], :port => $config[:redis][:port], :db => 0)
-$beaneater = Beaneater.new "#{$config[:beanstalkd][:host]}:#{$config[:beanstalkd][:port]}"
-$check_db_life_span = false
-$status_dirty = false
+$connected = false
+$redis = Redis.new(:host => $config[:redis][:host], :port => $config[:redis][:port], :db => 0)
+$transient_job = $config[:options] && $config[:options][:transient] == true
+$beaneater = Beaneater.new "#{$config[:beanstalkd][:host]}:#{$config[:beanstalkd][:port]}"
if $config[:postgres] && $config[:postgres][:conn_str]
- $pg = ::SP::Job::PGConnection.new(owner: 'back_burner', config: $config[:postgres])
+ $pg = ::SP::Job::PGConnection.new(owner: $PROGRAM_NAME, config: $config[:postgres], multithreaded: $multithreading)
+ if $PROGRAM_NAME.split('/').last == 'saft-importer' || $PROGRAM_NAME.split('/').last == 'saft-destroyer'
+ $pg.exec("SET log_min_duration_statement TO 0;")
+ end
+ if $config[:options][:jsonapi] == true
+ $jsonapi = SP::Duh::JSONAPI::Service.new($pg, ($jsonapi.nil? ? nil : $jsonapi.url), SP::Job::JobDbAdapter)
+ end
end
+
+#
+# Open a second thread that will listen to cancellation and other "signals"
+#
+$cancel_thread = Thread.new {
+ begin
+ $subscription_redis = Redis.new(:host => $config[:redis][:host], :port => $config[:redis][:port], :db => 0)
+ $subscription_redis.subscribe($config[:service_id] + ':job-signal') do |on|
+ on.message do |channel, msg|
+ begin
+ message = JSON.parse(msg, {symbolize_names: true})
+ $threads.each do |thread|
+ if $thread_data[thread].job_id != nil && message[:id].to_s == $thread_data[thread].job_id && message[:status] == 'cancelled'
+ logger.info "Received cancel signal for job #{$thread_data[thread].job_id}"
+ thread.raise(::SP::Job::JobCancelled.new)
+ end
+ end
+ rescue Exception => e
+ # ignore invalid payloads
+ end
+ end
+ end
+ rescue Redis::CannotConnectError => ccc
+ logger.fatal "Can't connect to redis exiting now".red
+ exit
+ rescue Exception => e
+ # Forward unexpected exceptions to the main thread for proper handling
+ logger.fatal e.to_s.red
+ Thread.main.raise(e)
+ end
+}