Sha256: 6d6a200af9f0dbd85abf46d7ca2d6e7b5615ee7da344b307fc80e9cb5c9b42a4

Contents?: true

Size: 1.95 KB

Versions: 1

Compression:

Stored size: 1.95 KB

Contents

# frozen_string_literal: true

require "json"
require "fileutils"
require_relative "filesystem_queue/version"

module FilesystemQueue
  class Error < StandardError; end

  # A persistent queue system based on the local filesystem
  # Handles
  class Queue
    def initialize(queue_dir)
      @queue_dir = queue_dir
      @jobs_dir = File.join(@queue_dir, "jobs")
      @completed_dir = File.join(@queue_dir, "completed")
      @failed_dir = File.join(@queue_dir, "failed")
      @index_file = File.join(@queue_dir, "index.txt")

      [@jobs_dir, @completed_dir, @failed_dir].each do |dir|
        FileUtils.mkdir_p(dir) unless Dir.exist?(dir)
      end
      FileUtils.touch(@index_file) unless File.exist?(@index_file)
    end

    def enqueue(job)
      timestamp = Time.now.to_f.to_s
      job_file = File.join(@jobs_dir, "job_#{timestamp}.json")
      File.write(job_file, job.to_json)
      File.open(@index_file, "a") { |f| f.puts(job_file) }
    end

    def dequeue
      job_file = extract_job_file_from_index
      return nil unless job_file && File.exist?(job_file)

      job_data = JSON.parse(File.read(job_file), symbolize_names: true)
      [job_file, job_data]
    end

    def complete(job_file)
      move_job(job_file, @completed_dir)
    end

    def fail(job_file)
      move_job(job_file, @failed_dir)
    end

    def size
      File.readlines(@index_file).size
    end

    def failed_size
      Dir[File.join(@failed_dir, "*")].count { |file| File.file?(file) }
    end

    private

    def move_job(job_file, target_dir)
      FileUtils.mv(job_file, target_dir)
    rescue StandardError => e
      puts "Failed to move job file: #{e.message}"
    end

    def extract_job_file_from_index
      job_file = nil
      File.open(@index_file, "r+") do |f|
        lines = f.each_line.to_a
        return nil if lines.empty?

        job_file = lines.shift.strip
        f.rewind
        f.write(lines.join)
        f.truncate(f.pos)
      end
      job_file
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
filesystem_queue-0.1.1 lib/filesystem_queue.rb