Sha256: 617326eb40830dd662d9be7755aebbeedbf01cd624868c3cfa858713eaf3452e
Contents?: true
Size: 1.91 KB
Versions: 1
Compression:
Stored size: 1.91 KB
Contents
require 'jobi/utils' require 'jobi/message' module Jobi class Job class << self include Utils def options(queue_name:, ack: false, consumers: 5) @queue_name = queue_name.to_s @ack = ack @consumers = consumers end def after_run(callback) @after_run_callback = callback if callback end def run(**args) return unless Jobi.publisher? before_start(args) start rescue Error => e Jobi.logger.error('Failed to process the job') Jobi.logger.error(e) end def consume_messages return unless Jobi.consumer? join_queue @consumer_threads = [] @consumers.times do @consumer_threads << Thread.new { consumer.consume! } end @consumer_threads.join(&:join) end private def before_start(args) create_message(args: args) log_job_info! end def start join_queue publish_message @message.id end def consumer class_const = constantize("Jobi::Consumers::#{Jobi.client_class_name}") class_const.new(queue: @queue, ack: @ack) end def join_queue @queue = Jobi.session .queue(name: @queue_name) end def publish_message Jobi.session .publish( message: Marshal.dump(@message), queue: @queue ) end def create_message(args:) @message = Message.new( job_class: self, args: args, after_run: @after_run_callback ) end def log_job_info! Jobi.logger.info("A job has been started with id: [#{@message.id}]") Jobi.logger.debug("Queue: '#{@queue_name}'") Jobi.logger.debug("Args: #{@message.args}") Jobi.logger.debug("Job class: '#{@message.job_class}'") end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
jobi-0.1.1 | lib/jobi/job.rb |