module Refinery #:nodoc:
# Publish events.
class EventPublisher
include Refinery::Loggable
include Refinery::Configurable
include Refinery::Queueable
include Refinery::Utilities
STARTING = 'starting' #:nodoc:
RUNNING = 'running' #:nodoc:
STOPPED = 'stopped' #:nodoc:
attr_accessor :publishers_directory
# Initialize the event publisher
#
# Options:
# * :debug: Set to true to enable debug logging
# * :config: Provide a file path to load that config
def initialize(options={})
logger.level = Logger::INFO if options[:verbose]
logger.level = Logger::DEBUG if options[:debug]
config.load_file(options[:config]) if options[:config]
self.publishers_directory = options[:publishers] if options[:publishers]
end
# Get the event publisher state
def state
@state ||= STARTING
end
# Return true if the event publisher is running
def running?
state == RUNNING
end
# The directory where publishers are found. Defaults to ./publishers
def publishers_directory
@publishers_directory ||= './publishers'
end
# A hash of all publisher classes mapped to last modified timestamps.
def publishers
@publishers ||= {}
end
# Run the specified publisher once and return
def run_once(key)
settings = config['processors'][key]
raise RuntimeError, "No processor configuration found for #{key}" unless settings
queue_name = settings['queue'] || key
logger.debug "Using queue #{queue_name}_waiting"
waiting_queue = queue("#{queue_name}_waiting")
load_publisher_class(key).new(waiting_queue).execute
end
# Run the event publisher
def run
@state = RUNNING
logger.info "Starting event publisher"
config['processors'].each do |key, settings|
run_publisher(key, settings)
end
begin
threads.each { |thread| thread.join }
rescue Interrupt => e
end
logger.info "Exiting event publisher"
end
private
# An array of threads, one for each publisher instance
def threads
@threads ||= []
end
# Run the publisher for the given key
def run_publisher(key, settings)
logger.info "Creating publisher for #{key}"
queue_name = settings['queue'] || key
logger.debug "Using queue #{queue_name}_waiting"
waiting_queue = queue("#{queue_name}_waiting")
threads << Thread.new(waiting_queue, settings) do |waiting_queue, settings|
while(running?)
begin
load_publisher_class(key).new(waiting_queue).execute
rescue Exception => e
logger.error e
raise e
end
delay = settings['publishers']['delay'] || 60
logger.debug "Sleeping #{delay} seconds"
sleep delay
end
end
end
def load_publisher_class(key)
source_file = "#{publishers_directory}/#{key}.rb"
if File.exist?(source_file)
modified_at = File.mtime(source_file)
if publishers[key] != modified_at
logger.debug "Loading #{source_file}"
load(source_file)
publishers[key] = modified_at
end
else
raise SourceFileNotFound, "Source file not found: #{source_file}"
end
Object.const_get(camelize("#{key}_publisher"))
end
end
end