Sha256: a9fa9d20f2333c869e9596084a8091a8e40295384ec2b7015784ebdad93a6739

Contents?: true

Size: 1.78 KB

Versions: 3

Compression:

Stored size: 1.78 KB

Contents

#######################################################################
# Copyright (c) 2014 ENEO Tecnologia S.L.
# This file is part of redBorder.
# redBorder is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# redBorder is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with redBorder. If not, see <http://www.gnu.org/licenses/>.
#######################################################################

module Redborder
  module KafkaNotifier
    class MessageQueue
      def initialize(limit = 500, timeout = 1)
        @mutex = Mutex.new
        @queue = []
        @limit = limit
        @timeout = timeout
        @recieved = ConditionVariable.new
      end
     
      def <<(x)
        @mutex.synchronize do
          @queue << x
          @recieved.signal if @queue.size >= @limit
        end
      end

      def pop_all
        @mutex.synchronize do
          @recieved.wait(@mutex, @timeout) while @queue.empty?
          to_return = []

          if @queue.size >= @limit
            to_return = @queue[0..(@limit - 1)]
            @queue = @queue[@limit..-1]
          else
            @recieved.wait(@mutex, @timeout)
            to_return = @queue.dup
            @queue.clear
          end

          return to_return
        end
      end

      def size
        @mutex.synchronize do
          @queue.size
        end
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
kafka_notifier-1.0.0 lib/kafka_notifier/message_queue.rb
kafka_notifier-0.0.2 lib/kafka_notifier/message_queue.rb
kafka_notifier-0.0.1 lib/kafka_notifier/message_queue.rb