Sha256: cf577dbf56427d12a2324c06c51cfd36a6bc0bf434590fda0c46c08e07ec4335

Contents?: true

Size: 1.6 KB

Versions: 1

Compression:

Stored size: 1.6 KB

Contents

require 'concurrent/utility/monotonic_time'
require 'concurrent/channel/tick'
require 'concurrent/channel/buffer/base'

module Concurrent
  class Channel
    module Buffer

      class Ticker < Base

        def initialize(interval)
          super()
          synchronize do
            @interval = interval.to_f
            @next_tick = Concurrent.monotonic_time + interval
          end
        end

        def size() 1; end

        def empty?() false; end

        def full?() true; end

        def put(item)
          false
        end

        def offer(item)
          false
        end

        def take
          loop do
            result, _ = do_poll
            if result.nil?
              return NO_VALUE
            elsif result != NO_VALUE
              return result
            end
          end
        end

        def next
          loop do
            result, _ = do_poll
            if result.nil?
              return NO_VALUE, false
            elsif result != NO_VALUE
              return result, true
            end
          end
        end

        def poll
          result, _ = do_poll
          if result.nil? || result == NO_VALUE
            NO_VALUE
          else
            result
          end
        end

        private

        def do_poll
          if ns_closed?
            return nil, false
          elsif (now = Concurrent.monotonic_time) > @next_tick
            tick = Concurrent::Channel::Tick.new(@next_tick)
            @next_tick = now + @interval
            return tick, true
          else
            return NO_VALUE, true
          end
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
concurrent-ruby-edge-0.2.0.pre3 lib/concurrent/channel/buffer/ticker.rb