Sha256: 2af067f2736a827c1d2cc64926fc95844f0ee07c09cab89c519e43a2baa997d9

Contents?: true

Size: 1.34 KB

Versions: 6

Compression:

Stored size: 1.34 KB

Contents

module Asynchronic
  module QueueEngine
    class Synchronic

      attr_reader :stubs

      def initialize(options={})
        @environment = options[:environment]
        @stubs = {}
      end

      def default_queue
        Asynchronic.default_queue
      end

      def environment
        @environment ||= Asynchronic.environment
      end

      def [](name)
        Queue.new self
      end

      def stub(job, &block)
        @stubs[job] = block
      end

      def asynchronic?
        false
      end

      def active_connections
        [Asynchronic.connection_name]
      end


      class Queue

        def initialize(engine)
          @engine = engine
        end

        def push(message)
          process = @engine.environment.load_process(message)

          if @engine.stubs[process.type]
            job = process.job
            block = @engine.stubs[process.type]
            process.define_singleton_method :job do
              MockJob.new job, process, &block
            end
          end

          process.execute
        end

      end


      class MockJob < TransparentProxy

        def initialize(job, process, &block)
          super job
          @process = process
          @block = block
        end

        def call
          @block.call @process
        end

        def before_finalize
        end

      end

    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
asynchronic-3.0.3 lib/asynchronic/queue_engine/synchronic.rb
asynchronic-3.0.2 lib/asynchronic/queue_engine/synchronic.rb
asynchronic-3.0.1 lib/asynchronic/queue_engine/synchronic.rb
asynchronic-3.0.0 lib/asynchronic/queue_engine/synchronic.rb
asynchronic-2.0.1 lib/asynchronic/queue_engine/synchronic.rb
asynchronic-2.0.0 lib/asynchronic/queue_engine/synchronic.rb