# # Copyright (c) 2011-2017 Cloudware S.A. All rights reserved. # # This file is part of sp-job. # # sp-job is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # sp-job is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with sp-job. If not, see . # # encoding: utf-8 # require 'sp/job/pg_connection' require 'sp/job/job_db_adapter' require 'roadie' require 'thread' module SP module Job class JobCancelled < ::StandardError end class JobAborted < ::StandardError end class JobException < ::StandardError attr_reader :job attr_reader :args def initialize (args:, job: nil) super(args[:message] || $current_job[:tube] || $args[:program_name]) @job = job @args = args end end class Logger < ::Logger def task (sequence, text, success = true) if success info "[#{sequence}] #{text} \xE2\x9C\x94".green else info "[#{sequence}] #{text} \xE2\x9D\x8C".red end end end class ThreadData < Struct.new(:job_status, :report_time_stamp, :exception_reported, :job_id, :publish_key, :job_key, :current_job, :job_notification, :jsonapi) def initialize @job_status = {} if $config[:options] && $config[:options][:jsonapi] == true @jsonapi = SP::Duh::JSONAPI::Service.new($pg, nil, SP::Job::JobDbAdapter) end end end class FauxMutex def synchronize (&block) yield end end end end # # Initialize global data needed for configuration # $prefix = OS.mac? ? '/usr/local' : '' $rollbar = false $min_progress = 3 $args = { stdout: false, log_level: 'info', program_name: File.basename($PROGRAM_NAME, File.extname($PROGRAM_NAME)), config_file: File.join($prefix, 'etc', File.basename($PROGRAM_NAME, File.extname($PROGRAM_NAME)), 'conf.json'), default_log_file: File.join($prefix, 'var', 'log', 'jobs', "#{File.basename($PROGRAM_NAME, File.extname($PROGRAM_NAME))}.log") } # # Parse command line arguments # $option_parser = OptionParser.new do |opts| opts.banner = "Usage: #{$PROGRAM_NAME} ARGS" opts.on('-c', '--config=CONFIG.JSON', "path to json configuration file (default: '#{$args[:default_log_file]}')") { |v| $args[:config_file] = File.expand_path(v) } opts.on('-l', '--log=LOGFILE' , "path to log file (default: '#{$args[:log_file]}')") { |v| $args[:log_file] = File.expand_path(v) } opts.on('-d', '--debug' , "developer mode: log to stdout and print job") { $args[:debug] = true } opts.on('-v', '--log_level=LEVEL' , "Log level DEBUG, INFO, WARN, ERROR, FATAL") { |v| $args[:log_level] = v } opts.on('-i', '--index=IDX' , "systemd instance index") { |v| $args[:index] = v } end $option_parser.parse! if $args[:debug] require 'ruby-debug' if RUBY_ENGINE == 'jruby' end # Adjust log file if need, user specified option always takes precedence # if $args[:log_file].nil? if $args[:index].nil? $args[:log_file] = $args[:default_log_file] else $args[:log_file] = File.join($prefix, 'var', 'log', 'jobs', "#{$args[:program_name]}.#{$args[:index]}.log") end end # # Create PID file for this jobs instance # if OS.mac? Dir.mkdir("#{$prefix}/var/run/jobs") unless Dir.exist? "#{$prefix}/var/run/jobs" end File.write("#{$prefix}/var/run/jobs/#{$args[:program_name]}#{$args[:index].nil? ? '' : '.' + $args[:index]}.pid", Process.pid) # # Read configuration # $config = JSON.parse(File.read(File.expand_path($args[:config_file])), symbolize_names: true) $min_progress = $config[:options][:min_progress] # # Global data for mutex and sync # $threads = [ Thread.current ] $thread_data = {} $thread_data[Thread.current] = ::SP::Job::ThreadData.new # # Sanity check we only support multithreading on JRUBY # if $config[:options] && $config[:options][:threads].to_i > 1 raise 'Multithreading is not supported in MRI/CRuby' unless RUBY_ENGINE == 'jruby' $redis_mutex = Mutex.new $roolbar_mutex = Mutex.new $multithreading = true else $redis_mutex = nil $roolbar_mutex = ::SP::Job::FauxMutex.new $multithreading = false end # # Configure rollbar # unless $config[:rollbar].nil? $rollbar = true Rollbar.configure do |config| config.access_token = $config[:rollbar][:token] if $config[:rollbar][:token] config.environment = $config[:rollbar][:environment] if $config[:rollbar] && $config[:rollbar][:environment] end end # # Configure backburner queue # Backburner.configure do |config| config.beanstalk_url = "beanstalk://#{$config[:beanstalkd][:host]}:#{$config[:beanstalkd][:port]}" config.on_error = lambda { |e| td = thread_data if td.exception_reported == false td.exception_reported = true if e.instance_of? Beaneater::DeadlineSoonError logger.warn "got a deadline warning".red else begin raise_error(message: e) rescue => e # Do not retrow!!!! end end end # Report exception to rollbar $roolbar_mutex.synchronize { if $rollbar if e.instance_of? ::SP::Job::JobException e.job[:password] = '' Rollbar.error(e, e.message, { job: e.job, args: e.args}) else Rollbar.error(e) end end } # Catch fatal exception that must be handled with a restarts (systemctl will restart us) case e when PG::UnableToSend, PG::AdminShutdown, PG::ConnectionBad logger.fatal "Lost connection to database exiting now" exit when Redis::CannotConnectError logger.fatal "Can't connect to redis exiting now" exit end } config.max_job_retries = ($config[:options] && $config[:options][:max_job_retries]) ? $config[:options][:max_job_retries] : 0 config.retry_delay = ($config[:options] && $config[:options][:retry_delay]) ? $config[:options][:retry_delay] : 5 config.retry_delay_proc = lambda { |min_retry_delay, num_retries| min_retry_delay + (num_retries ** 3) } config.respond_timeout = 120 config.default_worker = $config[:options] && $config[:options][:threads].to_i > 1 ? SP::Job::WorkerThread : SP::Job::Worker config.logger = $args[:debug] ? SP::Job::Logger.new(STDOUT) : SP::Job::Logger.new($args[:log_file]) config.logger.formatter = proc do |severity, datetime, progname, msg| date_format = datetime.strftime("%Y-%m-%d %H:%M:%S") "[#{date_format}] #{severity}: #{msg}\n" end if $args[:log_level].nil? config.logger.level = Logger::INFO else case $args[:log_level].upcase when 'DEBUG' config.logger.level = Logger::DEBUG when 'INFO' config.logger.level = Logger::INFO when 'WARN' config.logger.level = Logger::WARN when 'ERROR' config.logger.level = Logger::ERROR when 'FATAL' config.logger.level = Logger::FATAL else config.logger.level = Logger::INFO end end config.logger.datetime_format = "%Y-%m-%d %H:%M:%S" config.primary_queue = $args[:program_name] config.reserve_timeout = nil config.job_parser_proc = lambda { |body| rv = Hash.new rv[:args] = [JSON.parse(body, :symbolize_names => true)] rv[:class] = rv[:args][0][:tube] || $args[:program_name] rv } end if $config[:mail] Mail.defaults do delivery_method :smtp, { :address => $config[:mail][:smtp][:address], :port => $config[:mail][:smtp][:port].to_i, :domain => $config[:mail][:smtp][:domain], :user_name => $config[:mail][:smtp][:user_name], :password => $config[:mail][:smtp][:password], :authentication => $config[:mail][:smtp][:authentication], :enable_starttls_auto => $config[:mail][:smtp][:enable_starttls_auto] } end end # # Monkey patches the sad reality of ruby development # if RUBY_ENGINE == 'jruby' class Logger class LogDevice def write(message) begin synchronize do if @shift_age and @dev.respond_to?(:stat) begin check_shift_log rescue warn("log shifting failed. #{$!}") end end begin @dev.write(message) rescue ::SP::Job::JobCancelled => jc raise jc rescue warn("log writing failed. #{$!}") end end rescue ::SP::Job::JobCancelled => jc raise jc rescue Exception => ignored warn("log writing failed. #{ignored}") end end end end end module Backburner module Helpers def expand_tube_name (tube) tube end end module Logger if RUBY_ENGINE != 'jruby' def log_job_begin(name, args) log_info "Work job #{name}" @job_started_at = Time.now end else def log_job_begin(name, args) log_info "Work job #{name}" Thread.current[:job_started_at] = Time.now end # Print out when a job completed # If message is nil, job is considered complete def log_job_end(name, message = nil) ellapsed = Time.now - Thread.current[:job_started_at] ms = (ellapsed.to_f * 1000).to_i action_word = message ? 'Finished' : 'Completed' log_info("#{action_word} #{name} in #{ms}ms #{message}") end end end class Job extend SP::Job::Common # to bring in logger and report_error into this class # Processes a job and handles any failure, deleting the job once complete # # @example # @task.process # def process # Invoke the job setup function, bailout if the setup returns false unless job_class.respond_to?(:prepare_job) && job_class.prepare_job(*args) task.delete logger.warn "Delete stale or preempted task".red return false end # Invoke before hook and stop if false res = @hooks.invoke_hook_events(job_class, :before_perform, *args) unless res task.delete return false end # Execute the job @hooks.around_hook_events(job_class, :around_perform, *args) do # We subtract one to ensure we timeout before beanstalkd does, except if: # a) ttr == 0, to support never timing out # b) ttr == 1, so that we don't accidentally set it to never time out # NB: A ttr of 1 will likely result in race conditions between # Backburner and beanstalkd and should probably be avoided timeout_job_after(task.ttr > 1 ? task.ttr - 1 : task.ttr) { job_class.perform(*args) } end task.delete # Invoke after perform hook @hooks.invoke_hook_events(job_class, :after_perform, *args) rescue ::SP::Job::JobAborted => ja # # This exception: # 1. is sent to the rollbar # 2. does not bury the job, instead the job is deleted # logger.debug "Received job aborted exception #{Thread.current}".yellow unless task.nil? logger.debug 'Task deleted'.yellow task.delete end # Invoke after perform hook @hooks.invoke_hook_events(job_class, :after_perform, *args) thread_data.job_id = nil rescue ::SP::Job::JobCancelled => jc # # This exception: # 1. is not sent to the rollbar # 2. does not bury the job, instead the job is deleted # logger.debug "Received job cancellation exception #{Thread.current}".yellow unless task.nil? logger.debug 'Task deleted'.yellow task.delete end @hooks.invoke_hook_events(job_class, :on_failure, jc, *args) report_error(message: 'i18n_job_cancelled', status: 'cancelled') if $redis_mutex.nil? $redis.hset(thread_data.job_key, 'cancelled', true) else $redis_mutex.synchronize { $redis.hset(thread_data.job_key, 'cancelled', true) } end thread_data.job_id = nil rescue => e @hooks.invoke_hook_events(job_class, :on_failure, e, *args) raise e end end end # Mix-in the common mix-in to make code available for the lambdas used in this file extend SP::Job::Common logger.debug "Log file ... #{$args[:log_file]}" logger.debug "PID ........ #{Process.pid}" # # Now create the global data needed by the mix-in methods # $connected = false $redis = Redis.new(:host => $config[:redis][:host], :port => $config[:redis][:port], :db => 0) $transient_job = $config[:options] && $config[:options][:transient] == true $beaneater = Beaneater.new "#{$config[:beanstalkd][:host]}:#{$config[:beanstalkd][:port]}" if $config[:postgres] && $config[:postgres][:conn_str] $pg = ::SP::Job::PGConnection.new(owner: $PROGRAM_NAME, config: $config[:postgres], multithreaded: $multithreading) if $PROGRAM_NAME.split('/').last == 'saft-importer' || $PROGRAM_NAME.split('/').last == 'saft-destroyer' $pg.exec("SET log_min_duration_statement TO 0;") end if $config[:options][:jsonapi] == true $jsonapi = SP::Duh::JSONAPI::Service.new($pg, ($jsonapi.nil? ? nil : $jsonapi.url), SP::Job::JobDbAdapter) end end # # Open a second thread that will listen to cancellation and other "signals" # $cancel_thread = Thread.new { begin $subscription_redis = Redis.new(:host => $config[:redis][:host], :port => $config[:redis][:port], :db => 0) $subscription_redis.subscribe($config[:service_id] + ':job-signal') do |on| on.message do |channel, msg| begin message = JSON.parse(msg, {symbolize_names: true}) $threads.each do |thread| if $thread_data[thread].job_id != nil && message[:id].to_s == $thread_data[thread].job_id && message[:status] == 'cancelled' logger.info "Received cancel signal for job #{$thread_data[thread].job_id}" thread.raise(::SP::Job::JobCancelled.new) end end rescue Exception => e # ignore invalid payloads end end end rescue Redis::CannotConnectError => ccc logger.fatal "Can't connect to redis exiting now".red exit rescue Exception => e # Forward unexpected exceptions to the main thread for proper handling logger.fatal e.to_s.red Thread.main.raise(e) end }