module PGJob class Job # Initialize with AR::connection or attributes for PG.connect def initialize(*args, &block) if args.first.respond_to?(:exec) @conn = args.first @can_finish = false else @conn = PG.connect(*args) @can_finish = true end if block_given? block.call(self) finish! end end # Creates jobs table def migrate! unless tables.include?('jobs') @conn.exec <<-SQL CREATE TABLE jobs ( id SERIAL, name character varying(255), params text, mode character varying(20), status character varying(255), log text ); SQL end end # Destroys jobs table def rollback! @conn.exec <<-SQL DROP TABLE IF EXISTS jobs; SQL end # Closes connection if possible def finish! if @can_finish @conn.finish end end # Creates new job def create(name, params = {}) assert_symbol_keys!(params) sql = <<-SQL INSERT INTO jobs(name, params, mode, status) VALUES('#{@conn.escape(name)}', '#{@conn.escape JSON.dump(params)}', 'wait', 'Start') RETURNING id; SQL @conn.exec(sql).to_a.first['id'].to_i end # Returns name def name(id) sql = <<-SQL SELECT name FROM jobs WHERE id = #{id}; SQL @conn.exec(sql).to_a.first['name'] end # Sets/Returns status def status(id, mode = nil, value = nil ) if mode raise "wrong mode #{mode}" unless [:running, :success, :failed].include?(mode.to_sym) raise "set value" unless value sql = <<-SQL UPDATE jobs SET mode = '#{mode}', status = '#{@conn.escape value}' WHERE id = #{id}; SQL @conn.exec(sql) add_log(id, "[mode] #{mode} - #{value}") else sql = <<-SQL SELECT status FROM jobs WHERE id = #{id}; SQL @conn.exec(sql).to_a.first['status'] end end # Returns params def params(id) sql = <<-SQL SELECT params FROM jobs WHERE id = #{id}; SQL symbolize_keys JSON.load(@conn.exec(sql).to_a.first['params']) end # Returns full log def log(id) sql = <<-SQL SELECT log FROM jobs WHERE id = #{id}; SQL @conn.exec(sql).to_a.first['log'] end # Adds log message def add_log(id, msg) sql = <<-SQL UPDATE jobs SET log = '#{@conn.escape [log(id), msg].compact.join("\n")}' WHERE id = #{id}; SQL @conn.exec(sql) end def running?(id) [:wait, :running].include? mode(id) end def finished?(id) [:success, :failed].include? mode(id) end def success?(id) mode(id) == :success end def failed?(id) mode(id) == :failed end private def assert_symbol_keys!(hash) hash.each do |key, value| unless key.is_a?(Symbol) raise "#{key} should be symbol" end if value.is_a?(Hash) assert_symbol_keys!(value) end end end def symbolize_keys(hash) out = {} hash.each do |key, value| if value.is_a?(Hash) out[key.to_sym] = symbolize_keys(value) else out[key.to_sym] = value end end out end def mode(id) sql = <<-SQL SELECT mode FROM jobs WHERE id = #{id}; SQL @conn.exec(sql).to_a.first['mode'].to_sym end def tables sql = <<-SQL SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'; SQL @conn.exec(sql).map { |r| r['table_name'] } end end end