Sha256: 6c007f9d0591948adfa14e722a6fdabe620bcc975056f01ee3aa1a2e72efbb27

Contents?: true

Size: 939 Bytes

Versions: 7

Compression:

Stored size: 939 Bytes

Contents

module Steep
  module Server
    class DelayQueue
      attr_reader :delay, :thread, :queue, :last_task

      def initialize(delay:)
        @delay = delay

        @queue = Thread::Queue.new

        @thread = Thread.new do
          while (scheduled_at, proc = queue.pop)
            diff = scheduled_at - Time.now
            case
            when diff > 0.1
              sleep diff
            when diff > 0
              while Time.now < scheduled_at
                # nop
                sleep 0
              end
            end

            if proc.equal?(last_task)
              unless @cancelled
                proc[]
              end
            end
          end
        end
      end

      def cancel
        @cancelled = true
        queue.clear()
      end

      def execute(&block)
        @last_task = block
        scheduled_at = Time.now + delay
        queue << [scheduled_at, block]
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 2 rubygems

Version Path
steep-activesupport-4-1.9.3 lib/steep/server/delay_queue.rb
steep-1.9.3 lib/steep/server/delay_queue.rb
steep-1.9.2 lib/steep/server/delay_queue.rb
steep-1.9.1 lib/steep/server/delay_queue.rb
steep-1.9.0 lib/steep/server/delay_queue.rb
steep-1.9.0.dev.2 lib/steep/server/delay_queue.rb
steep-1.9.0.dev.1 lib/steep/server/delay_queue.rb