Sha256: 396af161ed7552c8feec4e31231cc0759d9e4ca61b42d40fb667f5d1eebcc9c5

Contents?: true

Size: 1.48 KB

Versions: 5

Compression:

Stored size: 1.48 KB

Contents

module RorVsWild
  class Queue
    SLEEP_TIME = 10
    FLUSH_TRESHOLD = 10

    attr_reader :mutex, :thread, :client
    attr_reader :requests, :jobs

    def initialize(client)
      @jobs = []
      @requests = []
      @client = client
      @mutex = Mutex.new
      Kernel.at_exit { flush }
    end

    def push_job(data)
      push_to(jobs, data)
    end

    def push_request(data)
      push_to(requests, data)
    end

    def push_to(array, data)
      mutex.synchronize do
        wakeup_thread if array.push(data).size >= FLUSH_TRESHOLD || !thread
      end
    end

    def pull_jobs
      result = nil
      mutex.synchronize do
        if jobs.size > 0
          result = jobs
          @jobs = []
        end
      end
      result
    end

    def pull_requests
      result = nil
      mutex.synchronize do
        if requests.size > 0
          result = requests
          @requests = []
        end
      end
      result
    end

    def flush_indefinetely
      sleep(SLEEP_TIME) and flush while true
    rescue Exception => ex
      RorVsWild.logger.error(ex)
      raise
    end

    def flush
      data = pull_jobs and client.post("/jobs", jobs: data)
      data = pull_requests and client.post("/requests", requests: data)
    end

    def start_thread
      RorVsWild.logger.debug("RorVsWild::Queue#start_thread".freeze)
      @thread = Thread.new { flush_indefinetely }
    end

    def wakeup_thread
      (thread && thread.alive?) ? thread.wakeup : start_thread
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
rorvswild-1.5.14 lib/rorvswild/queue.rb
rorvswild-1.5.13 lib/rorvswild/queue.rb
rorvswild-1.5.12 lib/rorvswild/queue.rb
rorvswild-1.5.11 lib/rorvswild/queue.rb
rorvswild-1.5.10 lib/rorvswild/queue.rb