Sha256: 6d6a200af9f0dbd85abf46d7ca2d6e7b5615ee7da344b307fc80e9cb5c9b42a4
Contents?: true
Size: 1.95 KB
Versions: 1
Compression:
Stored size: 1.95 KB
Contents
# frozen_string_literal: true require "json" require "fileutils" require_relative "filesystem_queue/version" module FilesystemQueue class Error < StandardError; end # A persistent queue system based on the local filesystem # Handles class Queue def initialize(queue_dir) @queue_dir = queue_dir @jobs_dir = File.join(@queue_dir, "jobs") @completed_dir = File.join(@queue_dir, "completed") @failed_dir = File.join(@queue_dir, "failed") @index_file = File.join(@queue_dir, "index.txt") [@jobs_dir, @completed_dir, @failed_dir].each do |dir| FileUtils.mkdir_p(dir) unless Dir.exist?(dir) end FileUtils.touch(@index_file) unless File.exist?(@index_file) end def enqueue(job) timestamp = Time.now.to_f.to_s job_file = File.join(@jobs_dir, "job_#{timestamp}.json") File.write(job_file, job.to_json) File.open(@index_file, "a") { |f| f.puts(job_file) } end def dequeue job_file = extract_job_file_from_index return nil unless job_file && File.exist?(job_file) job_data = JSON.parse(File.read(job_file), symbolize_names: true) [job_file, job_data] end def complete(job_file) move_job(job_file, @completed_dir) end def fail(job_file) move_job(job_file, @failed_dir) end def size File.readlines(@index_file).size end def failed_size Dir[File.join(@failed_dir, "*")].count { |file| File.file?(file) } end private def move_job(job_file, target_dir) FileUtils.mv(job_file, target_dir) rescue StandardError => e puts "Failed to move job file: #{e.message}" end def extract_job_file_from_index job_file = nil File.open(@index_file, "r+") do |f| lines = f.each_line.to_a return nil if lines.empty? job_file = lines.shift.strip f.rewind f.write(lines.join) f.truncate(f.pos) end job_file end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
filesystem_queue-0.1.1 | lib/filesystem_queue.rb |