lib/ruote/storage/base.rb in ruote-2.1.9 vs lib/ruote/storage/base.rb in ruote-2.1.10
- old
+ new
@@ -30,10 +30,28 @@
#
# Base methods for storage implementations.
#
module StorageBase
+ def context
+
+ @context ||= Ruote::Context.new(self)
+ end
+
+ def context= (c)
+
+ @context = c
+ end
+
+ # Attempts to delete a document, returns true if the deletion
+ # succeeded. This is used with msgs to reserve work on them.
+ #
+ def reserve (doc)
+
+ delete(doc).nil?
+ end
+
#--
# configurations
#++
def get_configuration (key)
@@ -45,32 +63,20 @@
# messages
#++
def put_msg (action, options)
- # merge! is way faster than merge (no object creation probably)
+ msg = prepare_msg_doc(action, options)
- @counter ||= 0
-
- t = Time.now.utc
- ts = "#{t.strftime('%Y-%m-%d')}!#{t.to_i}.#{'%06d' % t.usec}"
- _id = "#{$$}!#{Thread.current.object_id}!#{ts}!#{'%03d' % @counter}"
-
- @counter = (@counter + 1) % 1000
- # some platforms (windows) have shallow usecs, so adding that counter...
-
- msg = options.merge!('type' => 'msgs', '_id' => _id, 'action' => action)
-
- msg.delete('_rev')
- # in case of message replay
-
put(msg)
+
#put(msg, :update_rev => true)
- #(@local_msgs ||= []) << options
+ #(@local_msgs ||= []) << Ruote.fulldup(msg)
end
#def get_local_msgs
+ # p @local_msgs
# if @local_msgs
# r = @local_msgs
# @local_msgs = nil
# r
# else
@@ -85,10 +91,15 @@
).sort { |a, b|
a['put_at'] <=> b['put_at']
}
end
+ def empty? (type)
+
+ (get_many(type) == [])
+ end
+
#--
# expressions
#++
def find_root_expression (wfid)
@@ -114,10 +125,13 @@
# ats and crons
#++
def get_schedules (delta, now)
+ # TODO : bring that 'optimization' back in,
+ # maybe every minute, if min != last_min ...
+
#if delta < 1.0
# at = now.strftime('%Y%m%d%H%M%S')
# get_many('schedules', /-#{at}$/)
#elsif delta < 60.0
# at = now.strftime('%Y%m%d%H%M')
@@ -131,37 +145,16 @@
#end
end
def put_schedule (flavour, owner_fei, s, msg)
- at = if s.is_a?(Time) # at or every
- s
- elsif Ruote.is_cron_string(s) # cron
- Rufus::CronLine.new(s).next_time(Time.now + 1)
- else # at or every
- Ruote.s_to_at(s)
+ if doc = prepare_schedule_doc(flavour, owner_fei, s, msg)
+ put(doc)
+ return doc['_id']
end
- at = at.utc
- if at <= Time.now.utc && flavour == 'at'
- put_msg(msg.delete('action'), msg)
- return
- end
-
- sat = at.strftime('%Y%m%d%H%M%S')
- i = "#{flavour}-#{Ruote.to_storage_id(owner_fei)}-#{sat}"
-
- put(
- '_id' => i,
- 'type' => 'schedules',
- 'flavour' => flavour,
- 'original' => s,
- 'at' => Ruote.time_to_utc_s(at),
- 'owner' => owner_fei,
- 'msg' => msg)
-
- i
+ nil
end
def delete_schedule (schedule_id)
s = get('schedules', schedule_id)
@@ -183,10 +176,94 @@
vars['variables'][k] = v
put_engine_variable(k, v) unless put(vars).nil?
end
+ #--
+ # migrations
+ #++
+
+ # Copies the content of this storage into a target storage.
+ #
+ # Of course, the target storage may be a different implementation.
+ #
+ def copy_to (target, opts={})
+
+ counter = 0
+
+ %w[
+ configurations errors expressions msgs schedules variables workitems
+ ].each do |type|
+
+ get_many(type).each do |item|
+
+ item.delete('_rev')
+ target.put(item)
+
+ counter += 1
+ puts(" #{type}/#{item['_id']}") if opts[:verbose]
+ end
+ end
+
+ counter
+ end
+
protected
+
+ # Used by put_msg
+ #
+ def prepare_msg_doc (action, options)
+
+ # merge! is way faster than merge (no object creation probably)
+
+ @counter ||= 0
+
+ t = Time.now.utc
+ ts = "#{t.strftime('%Y-%m-%d')}!#{t.to_i}.#{'%06d' % t.usec}"
+ _id = "#{$$}!#{Thread.current.object_id}!#{ts}!#{'%03d' % @counter}"
+
+ @counter = (@counter + 1) % 1000
+ # some platforms (windows) have shallow usecs, so adding that counter...
+
+ msg = options.merge!('type' => 'msgs', '_id' => _id, 'action' => action)
+
+ msg.delete('_rev')
+ # in case of message replay
+
+ msg
+ end
+
+ # Used by put_schedule
+ #
+ def prepare_schedule_doc (flavour, owner_fei, s, msg)
+
+ at = if s.is_a?(Time) # at or every
+ s
+ elsif Ruote.is_cron_string(s) # cron
+ Rufus::CronLine.new(s).next_time(Time.now + 1)
+ else # at or every
+ Ruote.s_to_at(s)
+ end
+ at = at.utc
+
+ if at <= Time.now.utc && flavour == 'at'
+ put_msg(msg.delete('action'), msg)
+ return false
+ end
+
+ sat = at.strftime('%Y%m%d%H%M%S')
+ i = "#{flavour}-#{Ruote.to_storage_id(owner_fei)}-#{sat}"
+
+ {
+ '_id' => i,
+ 'type' => 'schedules',
+ 'flavour' => flavour,
+ 'original' => s,
+ 'at' => Ruote.time_to_utc_s(at),
+ 'owner' => owner_fei,
+ 'msg' => msg
+ }
+ end
def get_engine_variables
get('variables', 'variables') || {
'type' => 'variables', '_id' => 'variables', 'variables' => {} }