lib/rocket_job/event.rb in rocketjob-5.1.1 vs lib/rocket_job/event.rb in rocketjob-5.2.0.beta1

- old
+ new

@@ -1,6 +1,6 @@ -require 'concurrent-ruby' +require "concurrent-ruby" module RocketJob # RocketJob::Event # # Publish and Subscribe to events. Events are published immediately and usually consumed @@ -8,11 +8,11 @@ class Event include SemanticLogger::Loggable include Plugins::Document include Mongoid::Timestamps - ALL_EVENTS = '*'.freeze + ALL_EVENTS = "*".freeze # Capped collection long polling interval. class_attribute :long_poll_seconds, instance_accessor: false self.long_poll_seconds = 300 @@ -40,11 +40,11 @@ # Hash Parameters to be sent with the event (event specific). field :parameters, type: Hash validates_presence_of :name - store_in collection: 'rocket_job.events' + store_in collection: "rocket_job.events" index({created_at: 1}, background: true) # Add a subscriber for its events. # Returns a handle to the subscription that can be used to unsubscribe # this particular subscription @@ -82,18 +82,18 @@ end # Indefinitely tail the capped collection looking for new events. # time: the start time from which to start looking for new events. def self.listener(time: @load_time) - Thread.current.name = 'rocketjob event' + Thread.current.name = "rocketjob event" create_capped_collection - logger.info('Event listener started') + logger.info("Event listener started") tail_capped_collection(time) { |event| process_event(event) } - rescue Exception => exc - logger.error('#listener Event listener is terminating due to unhandled exception', exc) - raise(exc) + rescue Exception => e + logger.error("#listener Event listener is terminating due to unhandled exception", e) + raise(e) end # Create the capped collection only if it does not exist. # Drop the collection before calling this method to re-create it. def self.create_capped_collection(size: capped_collection_size) @@ -115,49 +115,49 @@ subscriber.object_id end def self.tail_capped_collection(time) with(socket_timeout: long_poll_seconds + 10) do - filter = {created_at: {'$gt' => time}} + filter = {created_at: {"$gt" => time}} collection. find(filter). await_data. cursor_type(:tailable_await). max_await_time_ms(long_poll_seconds * 1000). - sort('$natural' => 1). + sort("$natural" => 1). each do |doc| event = Mongoid::Factory.from_db(Event, doc) # Recovery will occur from after the last message read time = event.created_at yield(event) end end - rescue Mongo::Error::SocketError, Mongo::Error::SocketTimeoutError, Mongo::Error::OperationFailure, Timeout::Error => exc - logger.info("Creating a new cursor and trying again: #{exc.class.name} #{exc.message}") + rescue Mongo::Error::SocketError, Mongo::Error::SocketTimeoutError, Mongo::Error::OperationFailure, Timeout::Error => e + logger.info("Creating a new cursor and trying again: #{e.class.name} #{e.message}") retry end # Process a new event, calling registered subscribers. def self.process_event(event) - logger.info('Event Received', event.attributes) + logger.info("Event Received", event.attributes) if @subscribers.key?(event.name) @subscribers[event.name].each { |subscriber| subscriber.process_action(event.action, event.parameters) } end if @subscribers.key?(ALL_EVENTS) @subscribers[ALL_EVENTS].each { |subscriber| subscriber.process_event(event.name, event.action, event.parameters) } end - rescue StandardError => exc - logger.error('Unknown subscriber. Continuing..', exc) + rescue StandardError => e + logger.error("Unknown subscriber. Continuing..", e) end def self.collection_exists? collection.database.collection_names.include?(collection_name.to_s) end # Convert a non-capped collection to capped def self.convert_to_capped_collection(size) - collection.database.command('convertToCapped' => collection_name.to_s, 'size' => size) + collection.database.command("convertToCapped" => collection_name.to_s, "size" => size) end end end