Sha256: 7763432e19f0d32dec605997d7f78e986409a20f5d558a7459a99aa2cacda4f2

Contents?: true

Size: 480 Bytes

Versions: 1

Compression:

Stored size: 480 Bytes

Contents

require 'thread'

class Proco
module Queue
# @private
class BatchQueue < Proco::Queue::Base
  def initialize size, batch_size
    super size
    @futures = []
    @batch_size = batch_size
  end

  def push_impl item
    @items << item
    if @items.length % @batch_size == 1
      @futures << Future.new
    end
    @futures.last
  end

  def take_impl
    items  = @items[0, @batch_size]
    @items = @items[@batch_size..-1] || []

    [@futures.shift, items]
  end
end
end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
proco-0.0.1 lib/proco/queue/batch_queue.rb