Sha256: bc03850f2adbfefb29331180a0515f2caa7a3473c1536385af185a1fb648536b
Contents?: true
Size: 1.43 KB
Versions: 1
Compression:
Stored size: 1.43 KB
Contents
# frozen_string_literal: true # This is a queue implementation for the queue service based on file systems module DispatchRider module QueueServices class FileSystem < Base class Queue def initialize(path) FileUtils.mkdir_p(path) @path = path end def add(item) name_base = "#{@path}/#{Time.now.to_f}" File.write("#{name_base}.inprogress", item) FileUtils.mv("#{name_base}.inprogress", "#{name_base}.ready") end def pop file_path = next_item(10) return nil unless file_path file_path_inflight = file_path.gsub(/\.ready$/, '.inflight') FileUtils.mv(file_path, file_path_inflight) File.new(file_path_inflight) end def put_back(item) add(item) remove(item) end def remove(item) item.close File.unlink(item.path) end delegate :size, to: :file_paths private # Long polling next item fetcher # allows to sleep between checks for a new file and not run the main loop as much def next_item(timeout = 10.seconds) Timeout.timeout(timeout) do sleep 1 until file_paths.first file_paths.first end rescue Timeout::Error nil end def file_paths Dir["#{@path}/*.ready"] end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
dispatch-rider-2.2.0 | lib/dispatch-rider/queue_services/file_system/queue.rb |