lib/rocket_job/event.rb in rocketjob-5.1.1 vs lib/rocket_job/event.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -1,6 +1,6 @@
-require 'concurrent-ruby'
+require "concurrent-ruby"
module RocketJob
# RocketJob::Event
#
# Publish and Subscribe to events. Events are published immediately and usually consumed
@@ -8,11 +8,11 @@
class Event
include SemanticLogger::Loggable
include Plugins::Document
include Mongoid::Timestamps
- ALL_EVENTS = '*'.freeze
+ ALL_EVENTS = "*".freeze
# Capped collection long polling interval.
class_attribute :long_poll_seconds, instance_accessor: false
self.long_poll_seconds = 300
@@ -40,11 +40,11 @@
# Hash Parameters to be sent with the event (event specific).
field :parameters, type: Hash
validates_presence_of :name
- store_in collection: 'rocket_job.events'
+ store_in collection: "rocket_job.events"
index({created_at: 1}, background: true)
# Add a subscriber for its events.
# Returns a handle to the subscription that can be used to unsubscribe
# this particular subscription
@@ -82,18 +82,18 @@
end
# Indefinitely tail the capped collection looking for new events.
# time: the start time from which to start looking for new events.
def self.listener(time: @load_time)
- Thread.current.name = 'rocketjob event'
+ Thread.current.name = "rocketjob event"
create_capped_collection
- logger.info('Event listener started')
+ logger.info("Event listener started")
tail_capped_collection(time) { |event| process_event(event) }
- rescue Exception => exc
- logger.error('#listener Event listener is terminating due to unhandled exception', exc)
- raise(exc)
+ rescue Exception => e
+ logger.error("#listener Event listener is terminating due to unhandled exception", e)
+ raise(e)
end
# Create the capped collection only if it does not exist.
# Drop the collection before calling this method to re-create it.
def self.create_capped_collection(size: capped_collection_size)
@@ -115,49 +115,49 @@
subscriber.object_id
end
def self.tail_capped_collection(time)
with(socket_timeout: long_poll_seconds + 10) do
- filter = {created_at: {'$gt' => time}}
+ filter = {created_at: {"$gt" => time}}
collection.
find(filter).
await_data.
cursor_type(:tailable_await).
max_await_time_ms(long_poll_seconds * 1000).
- sort('$natural' => 1).
+ sort("$natural" => 1).
each do |doc|
event = Mongoid::Factory.from_db(Event, doc)
# Recovery will occur from after the last message read
time = event.created_at
yield(event)
end
end
- rescue Mongo::Error::SocketError, Mongo::Error::SocketTimeoutError, Mongo::Error::OperationFailure, Timeout::Error => exc
- logger.info("Creating a new cursor and trying again: #{exc.class.name} #{exc.message}")
+ rescue Mongo::Error::SocketError, Mongo::Error::SocketTimeoutError, Mongo::Error::OperationFailure, Timeout::Error => e
+ logger.info("Creating a new cursor and trying again: #{e.class.name} #{e.message}")
retry
end
# Process a new event, calling registered subscribers.
def self.process_event(event)
- logger.info('Event Received', event.attributes)
+ logger.info("Event Received", event.attributes)
if @subscribers.key?(event.name)
@subscribers[event.name].each { |subscriber| subscriber.process_action(event.action, event.parameters) }
end
if @subscribers.key?(ALL_EVENTS)
@subscribers[ALL_EVENTS].each { |subscriber| subscriber.process_event(event.name, event.action, event.parameters) }
end
- rescue StandardError => exc
- logger.error('Unknown subscriber. Continuing..', exc)
+ rescue StandardError => e
+ logger.error("Unknown subscriber. Continuing..", e)
end
def self.collection_exists?
collection.database.collection_names.include?(collection_name.to_s)
end
# Convert a non-capped collection to capped
def self.convert_to_capped_collection(size)
- collection.database.command('convertToCapped' => collection_name.to_s, 'size' => size)
+ collection.database.command("convertToCapped" => collection_name.to_s, "size" => size)
end
end
end