require 'json' require 'flydata-core/postgresql/snapshot' module FlydataCore module Postgresql class SourcePos include Comparable # Source Position data for PostgreSQL # # Format: # # # from-snapshot: Store txid_snapshot guaranteeing the changes up to this snapshot are sent # pk_values: Store primary key values of the last record on the last request/transaction # # # ex1: No resume position # # 1031:1031: # # -> Changes up to 1031:1031: were sent. # Agent will fetch the change between the latest snapshot and 1031:1031: # # ex2: Resume position with single primary key # # 1031:1031:1032:1032:[{"id":10000}] # # -> Changes up to 1031:1031: and partial changes (id <= 10000) between 1032:1032: and 1031:1031: were sent. # Agent will fetch the change between 1032:1032: and 1031:1031: with a primary key condisions(id > 10000) # # ex3: Resume position with multiple primary keys # # 1031:1031:1033:1033:[{"group":"apple"},{"name":"TOM"}] # # -> Changes up to 1031:1031: and partial changes (group<='apple' and name<='TOM') between 1033:1033: and 1031:1031: were sent. # Agent will fetch the change between 1033:1033: and 1031:1031: with a primary key condisions(group >= 'apple' and name >= 'TOM') # def initialize(snapshot_or_obj, to_snapshot = nil, pk_values = nil) if snapshot_or_obj.kind_of?(self.class) snapshot_or_obj.tap do |s| @snapshot = s.snapshot @to_snapshot = s.to_snapshot @pk_values = s.pk_values end else @snapshot = Snapshot.new(snapshot_or_obj) @to_snapshot = if to_snapshot.to_s.empty? nil else Snapshot.new(to_snapshot) end @pk_values = pk_values # must be array or nil end end attr_reader :snapshot attr_reader :to_snapshot attr_reader :pk_values def empty? @snapshot.empty? end def to_s pk_values = @pk_values ? @pk_values.to_json : '' "#{@snapshot}\t#{@to_snapshot}\t#{pk_values}" end #TODO: Need to revisit the logic along with the spec # http://www.postgresql.org/docs/9.5/static/functions-info.html#FUNCTIONS-TXID-SNAPSHOT-PARTS def <=>(other) if @snapshot != other.snapshot return @snapshot <=> other.snapshot elsif @pk_values.nil? && !other.pk_values.nil? 1 elsif !@pk_values.nil? && other.pk_values.nil? -1 elsif @pk_values == other.pk_values 0 else @pk_values.to_s <=> other.pk_values.to_s end end def self.load(str) snapshot, to_snapshot, pk_values = str.split("\t").collect{|v| v.strip} pk_values = if pk_values.to_s.empty? nil else JSON.parse(pk_values) end self.new(snapshot, to_snapshot, pk_values) end end end end