Sha256: 2796739c2555f9cb04d6723fc3e536d74557f7508009bbb5d988d5d342bd5c2e

Contents?: true

Size: 1.18 KB

Versions: 1

Compression:

Stored size: 1.18 KB

Contents

require "thread"

require "tennis/backend/abstract"
require "tennis/backend/task"

module Tennis
  module Backend
    class Memory < Abstract

      attr_reader :queue, :acked_tasks

      def initialize(**kwargs)
        super
        @mutex = Mutex.new
        @task_id = 0
        @queue = []
        @acked_tasks = []
        @acked_history_size = kwargs.fetch(:acked_history_size, 10)
      end

      def enqueue(job:, method:, args:, delay: nil)
        @mutex.synchronize do
          @task_id += 1
          queue << Task.new(self, @task_id, job, method, args)
        end
      end

      def receive(job_classes:, timeout: 1.0)
        @mutex.synchronize do
          task = queue.find { |task| job_classes.include?(task.job.class) }

          if task.nil?
            sleep(timeout)
            nil
          else
            queue.delete(task)
            task
          end
        end
      end

      def ack(task)
        @mutex.synchronize do
          acked_tasks.unshift task
          acked_tasks.pop if acked_tasks.size > @acked_history_size
        end
      end

      def requeue(task)
        @mutex.synchronize do
          queue << task
        end
      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
tennis-jobs-0.4.0 lib/tennis/backend/memory.rb