lib/mosql/tailer.rb in mosql-0.3.2 vs lib/mosql/tailer.rb in mosql-0.4.0
- old
+ new
@@ -1,39 +1,88 @@
module MoSQL
class Tailer < Mongoriver::AbstractPersistentTailer
def self.create_table(db, tablename)
- db.create_table?(tablename) do
- column :service, 'TEXT'
- column :timestamp, 'INTEGER'
- primary_key [:service]
+ if !db.table_exists?(tablename)
+ db.create_table(tablename) do
+ column :service, 'TEXT'
+ column :timestamp, 'INTEGER'
+ column :position, 'BYTEA'
+ primary_key [:service]
+ end
+ else
+ # Try to do seamless upgrades from before-tokumx times
+ # It will raise an exception in this in most cases,
+ # but there isn't a nice way I found to check if column
+ # exists.
+ begin
+ db.add_column(tablename, :position, 'BYTEA')
+ rescue Sequel::DatabaseError => e
+ raise unless MoSQL::SQLAdapter.duplicate_column_error?(e)
+ end
end
+
db[tablename.to_sym]
end
def initialize(backends, type, table, opts)
super(backends, type, opts)
@table = table
@service = opts[:service] || "mosql"
end
- def read_timestamp
- row = @table.where(:service => @service).select([:timestamp]).first
- if row
- BSON::Timestamp.new(row[:timestamp], 0)
+ def read_state
+ row = @table.where(:service => @service).first
+ return nil unless row
+ # Again, try to do seamless upgrades -
+ # If latest operation before or at timestamp if no position
+ # exists, use timestamp in database to guess what it could be.
+ result = {}
+ result['time'] = Time.at(row.fetch(:timestamp))
+ if row[:position]
+ result['position'] = from_blob(row[:position])
else
- BSON::Timestamp.new(0, 0)
+ log.warn("Trying to seamlessly update from old version!")
+ result['position'] = most_recent_position(result['time'])
+ save_state(result)
end
+ result
end
- def write_timestamp(ts)
+ def write_state(state)
+ data = {
+ :service => @service,
+ :timestamp => state['time'].to_i,
+ :position => to_blob(state['position'])
+ }
+
unless @did_insert
begin
- @table.insert({:service => @service, :timestamp => ts.seconds})
+ @table.insert(data)
rescue Sequel::DatabaseError => e
raise unless MoSQL::SQLAdapter.duplicate_key_error?(e)
end
@did_insert = true
end
- @table.where(:service => @service).update(:timestamp => ts.seconds)
+
+ @table.where(:service => @service).update(data)
+ end
+
+ private
+ def to_blob(position)
+ case database_type
+ when :mongo
+ return Sequel::SQL::Blob.new(position.seconds.to_s)
+ when :toku
+ return Sequel::SQL::Blob.new(position.to_s)
+ end
+ end
+
+ def from_blob(blob)
+ case database_type
+ when :mongo
+ return BSON::Timestamp.new(blob.to_i, 0)
+ when :toku
+ return BSON::Binary.new(blob)
+ end
end
end
end