Sha256: 3d84e0d3d28659e5ab0f49d0072b92240fb24869ab28d301198fc9b4a47ccecf

Contents?: true

Size: 494 Bytes

Versions: 1

Compression:

Stored size: 494 Bytes

Contents

require 'thread'

class Proco
module Queue
# @private
class BatchQueue < Proco::Queue::Base
  def initialize size, batch_size, delay
    super size, delay
    @futures = []
    @batch_size = batch_size
  end

  def push_impl item
    @items << item
    if @items.length % @batch_size == 1
      @futures << Future.new
    end
    @futures.last
  end

  def take_impl
    items  = @items[0, @batch_size]
    @items = @items[@batch_size..-1] || []

    [@futures.shift, items]
  end
end
end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
proco-0.0.2 lib/proco/queue/batch_queue.rb