Sha256: 7669c5ebff35c669285769233d429630421419bfc2cde84d0045803a0e22c7aa
Contents?: true
Size: 1.86 KB
Versions: 2
Compression:
Stored size: 1.86 KB
Contents
require 'forwardable' require 'json' module Rocketman class JobQueue extend Forwardable QUEUE_KEY = "rocketman".freeze def_delegators :@jobs, :<<, :empty?, :size, :clear, :push, :pop def initialize @storage = Rocketman.configuration.storage @jobs = get_job_queue at_exit { persist_events } if @storage.class.to_s == "Redis" end def schedule(job) @jobs << job end private def get_job_queue case @storage.class.to_s when "Redis" rehydrate_events else Queue.new end end def rehydrate_events queue = Queue.new if raw_data = @storage.get(QUEUE_KEY) puts "Rehydrating Rocketman events from #{@storage.class}" if Rocketman.configuration.debug rehydrate = JSON.restore(raw_data) # For security measure to prevent remote code execution (only allow contents valid in JSON) jobs = Marshal.load(rehydrate) event_count = 0 until jobs.empty? queue << jobs.shift event_count += 1 end puts "Rehydrated #{event_count} events from #{@storage.class}" if Rocketman.configuration.debug @storage.del(QUEUE_KEY) # After rehydration, delete it off Redis end queue end def persist_events return if @jobs.empty? puts "Persisting Rocketman events to #{@storage.class}" if Rocketman.configuration.debug intermediary = [] event_count = 0 until @jobs.empty? intermediary << @jobs.pop event_count += 1 end @jobs.close marshalled_json = Marshal.dump(intermediary).to_json # For security measure to prevent remote code execution (only allow contents valid in JSON) @storage.set(QUEUE_KEY, marshalled_json) puts "Persisted #{event_count} events to #{@storage.class}" if Rocketman.configuration.debug end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
rocketman-0.3.0 | lib/rocketman/job_queue.rb |
rocketman-0.2.0 | lib/rocketman/job_queue.rb |