module PGJob class Job # Initialize with AR::connection or attributes for PG.connect def initialize(*args, &block) if args.first.respond_to?(:exec) @conn = args.first @can_finish = false else @conn = PG.connect(*args) @can_finish = true end if block_given? block.call(self) finish! end end # Creates jobs table def migrate! unless tables.include?('jobs') @conn.exec <<-SQL CREATE TABLE jobs ( id SERIAL, name character varying(255), params text, status character varying(20), log text ); SQL end end # Destroys jobs table def rollback! @conn.exec <<-SQL DROP TABLE IF EXISTS jobs; SQL end # Closes connection if possible def finish! if @can_finish @conn.finish end end # Creates new job def create(name, params = {}) assert_symbol_keys!(params) sql = <<-SQL INSERT INTO jobs(name, params, status) VALUES('#{@conn.escape(name)}', '#{@conn.escape JSON.dump(params)}', 'wait') RETURNING id; SQL @conn.exec(sql).to_a.first['id'].to_i end # Returns name def name(id) sql = <<-SQL SELECT name FROM jobs WHERE id = #{id}; SQL @conn.exec(sql).to_a.first['name'] end # Sets/Returns status def status(id, value = nil) if value raise "wrong value" unless [:wait, :running, :success, :failed].include?(value.to_sym) sql = <<-SQL UPDATE jobs SET status = '#{@conn.escape value.to_s}' WHERE id = #{id}; SQL @conn.exec(sql) else sql = <<-SQL SELECT status FROM jobs WHERE id = #{id}; SQL @conn.exec(sql).to_a.first['status'].to_sym end end # Returns params def params(id) sql = <<-SQL SELECT params FROM jobs WHERE id = #{id}; SQL symbolize_keys JSON.load(@conn.exec(sql).to_a.first['params']) end # Returns full log def log(id) sql = <<-SQL SELECT log FROM jobs WHERE id = #{id}; SQL @conn.exec(sql).to_a.first['log'] end # Adds log message def add_log(id, msg) sql = <<-SQL UPDATE jobs SET log = '#{@conn.escape [log(id), msg].compact.join("\n")}' WHERE id = #{id}; SQL @conn.exec(sql) end private def assert_symbol_keys!(hash) hash.each do |key, value| unless key.is_a?(Symbol) raise "#{key} should be symbol" end if value.is_a?(Hash) assert_symbol_keys!(value) end end end def symbolize_keys(hash) out = {} hash.each do |key, value| if value.is_a?(Hash) out[key.to_sym] = symbolize_keys(value) else out[key.to_sym] = value end end out end def tables sql = <<-SQL SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'; SQL @conn.exec(sql).map { |r| r['table_name'] } end end end