lib/mosql/tailer.rb in mosql-0.3.2 vs lib/mosql/tailer.rb in mosql-0.4.0

- old
+ new

@@ -1,39 +1,88 @@ module MoSQL class Tailer < Mongoriver::AbstractPersistentTailer def self.create_table(db, tablename) - db.create_table?(tablename) do - column :service, 'TEXT' - column :timestamp, 'INTEGER' - primary_key [:service] + if !db.table_exists?(tablename) + db.create_table(tablename) do + column :service, 'TEXT' + column :timestamp, 'INTEGER' + column :position, 'BYTEA' + primary_key [:service] + end + else + # Try to do seamless upgrades from before-tokumx times + # It will raise an exception in this in most cases, + # but there isn't a nice way I found to check if column + # exists. + begin + db.add_column(tablename, :position, 'BYTEA') + rescue Sequel::DatabaseError => e + raise unless MoSQL::SQLAdapter.duplicate_column_error?(e) + end end + db[tablename.to_sym] end def initialize(backends, type, table, opts) super(backends, type, opts) @table = table @service = opts[:service] || "mosql" end - def read_timestamp - row = @table.where(:service => @service).select([:timestamp]).first - if row - BSON::Timestamp.new(row[:timestamp], 0) + def read_state + row = @table.where(:service => @service).first + return nil unless row + # Again, try to do seamless upgrades - + # If latest operation before or at timestamp if no position + # exists, use timestamp in database to guess what it could be. + result = {} + result['time'] = Time.at(row.fetch(:timestamp)) + if row[:position] + result['position'] = from_blob(row[:position]) else - BSON::Timestamp.new(0, 0) + log.warn("Trying to seamlessly update from old version!") + result['position'] = most_recent_position(result['time']) + save_state(result) end + result end - def write_timestamp(ts) + def write_state(state) + data = { + :service => @service, + :timestamp => state['time'].to_i, + :position => to_blob(state['position']) + } + unless @did_insert begin - @table.insert({:service => @service, :timestamp => ts.seconds}) + @table.insert(data) rescue Sequel::DatabaseError => e raise unless MoSQL::SQLAdapter.duplicate_key_error?(e) end @did_insert = true end - @table.where(:service => @service).update(:timestamp => ts.seconds) + + @table.where(:service => @service).update(data) + end + + private + def to_blob(position) + case database_type + when :mongo + return Sequel::SQL::Blob.new(position.seconds.to_s) + when :toku + return Sequel::SQL::Blob.new(position.to_s) + end + end + + def from_blob(blob) + case database_type + when :mongo + return BSON::Timestamp.new(blob.to_i, 0) + when :toku + return BSON::Binary.new(blob) + end end end end