Sha256: 9533166147bfb63b29330412738d82883ade759fb615da08005ee481e5b4d3b9

Contents?: true

Size: 1.25 KB

Versions: 1

Compression:

Stored size: 1.25 KB

Contents

module Asynchronic
  class Environment

    attr_reader :queue_engine
    attr_reader :data_store
    
    def initialize(queue_engine, data_store)
      @queue_engine = queue_engine
      @data_store = data_store
    end

    def [](key)
      data_store.get key
    end

    def []=(key, value)
      data_store.set key, value
    end

    def queue(name)
      queue_engine[name]
    end

    def default_queue
      queue(queue_engine.default_queue)
    end

    def enqueue(msg, queue=nil)
      queue(queue || queue_engine.default_queue).push msg
    end

    def build_job(job_class, options={})
      Asynchronic.logger.debug('Asynchronic') { "Building job #{job_class} - #{options}" }
      job_class.new(options).tap do |job|
        self[job.lookup.id] = job
        self[job.lookup.created_at] = Time.now
      end
    end

    def build_process(job_class, options={})
      Process.new build_job(job_class, options), self
    end

    def load_process(pid)
      Process.new self[pid], self
    end

    def processes
      data_store.keys.
        select { |k| k.match Regexp.new("job:#{Asynchronic::UUID_REGEXP}:created_at$") }.
        sort_by {|k| data_store.get k }.
        reverse.
        map { |k| load_process k.gsub(':created_at', '') }
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
asynchronic-0.1.0 lib/asynchronic/environment.rb