Sha256: 895cf40fdc88874e69b1da82ee86d8484a1d45e47b0100e66de8a96893f19f5a

Contents?: true

Size: 1 KB

Versions: 6

Compression:

Stored size: 1 KB

Contents

module Steep
  module Server
    class DelayQueue
      attr_reader :delay, :thread, :queue, :last_task

      def initialize(delay:)
        @delay = delay

        @queue = Thread::Queue.new

        @thread = Thread.new do
          while (scheduled_at, proc = queue.pop)
            # @type var scheduled_at: Time
            # @type var proc: ^() -> void

            diff = scheduled_at - Time.now
            case
            when diff > 0.1
              sleep diff
            when diff > 0
              while Time.now < scheduled_at
                # nop
                sleep 0
              end
            end

            if proc.equal?(last_task)
              unless @cancelled
                proc[]
              end
            end
          end
        end
      end

      def cancel
        @cancelled = true
        queue.clear()
      end

      def execute(&block)
        @last_task = block
        scheduled_at = Time.now + delay
        queue << [scheduled_at, block]
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
steep-1.8.3 lib/steep/server/delay_queue.rb
steep-1.8.2 lib/steep/server/delay_queue.rb
steep-1.8.1 lib/steep/server/delay_queue.rb
steep-1.8.0 lib/steep/server/delay_queue.rb
steep-1.8.0.pre.2 lib/steep/server/delay_queue.rb
steep-1.8.0.pre.1 lib/steep/server/delay_queue.rb