lib/ruote/storage/base.rb in ruote-2.1.9 vs lib/ruote/storage/base.rb in ruote-2.1.10

- old
+ new

@@ -30,10 +30,28 @@ # # Base methods for storage implementations. # module StorageBase + def context + + @context ||= Ruote::Context.new(self) + end + + def context= (c) + + @context = c + end + + # Attempts to delete a document, returns true if the deletion + # succeeded. This is used with msgs to reserve work on them. + # + def reserve (doc) + + delete(doc).nil? + end + #-- # configurations #++ def get_configuration (key) @@ -45,32 +63,20 @@ # messages #++ def put_msg (action, options) - # merge! is way faster than merge (no object creation probably) + msg = prepare_msg_doc(action, options) - @counter ||= 0 - - t = Time.now.utc - ts = "#{t.strftime('%Y-%m-%d')}!#{t.to_i}.#{'%06d' % t.usec}" - _id = "#{$$}!#{Thread.current.object_id}!#{ts}!#{'%03d' % @counter}" - - @counter = (@counter + 1) % 1000 - # some platforms (windows) have shallow usecs, so adding that counter... - - msg = options.merge!('type' => 'msgs', '_id' => _id, 'action' => action) - - msg.delete('_rev') - # in case of message replay - put(msg) + #put(msg, :update_rev => true) - #(@local_msgs ||= []) << options + #(@local_msgs ||= []) << Ruote.fulldup(msg) end #def get_local_msgs + # p @local_msgs # if @local_msgs # r = @local_msgs # @local_msgs = nil # r # else @@ -85,10 +91,15 @@ ).sort { |a, b| a['put_at'] <=> b['put_at'] } end + def empty? (type) + + (get_many(type) == []) + end + #-- # expressions #++ def find_root_expression (wfid) @@ -114,10 +125,13 @@ # ats and crons #++ def get_schedules (delta, now) + # TODO : bring that 'optimization' back in, + # maybe every minute, if min != last_min ... + #if delta < 1.0 # at = now.strftime('%Y%m%d%H%M%S') # get_many('schedules', /-#{at}$/) #elsif delta < 60.0 # at = now.strftime('%Y%m%d%H%M') @@ -131,37 +145,16 @@ #end end def put_schedule (flavour, owner_fei, s, msg) - at = if s.is_a?(Time) # at or every - s - elsif Ruote.is_cron_string(s) # cron - Rufus::CronLine.new(s).next_time(Time.now + 1) - else # at or every - Ruote.s_to_at(s) + if doc = prepare_schedule_doc(flavour, owner_fei, s, msg) + put(doc) + return doc['_id'] end - at = at.utc - if at <= Time.now.utc && flavour == 'at' - put_msg(msg.delete('action'), msg) - return - end - - sat = at.strftime('%Y%m%d%H%M%S') - i = "#{flavour}-#{Ruote.to_storage_id(owner_fei)}-#{sat}" - - put( - '_id' => i, - 'type' => 'schedules', - 'flavour' => flavour, - 'original' => s, - 'at' => Ruote.time_to_utc_s(at), - 'owner' => owner_fei, - 'msg' => msg) - - i + nil end def delete_schedule (schedule_id) s = get('schedules', schedule_id) @@ -183,10 +176,94 @@ vars['variables'][k] = v put_engine_variable(k, v) unless put(vars).nil? end + #-- + # migrations + #++ + + # Copies the content of this storage into a target storage. + # + # Of course, the target storage may be a different implementation. + # + def copy_to (target, opts={}) + + counter = 0 + + %w[ + configurations errors expressions msgs schedules variables workitems + ].each do |type| + + get_many(type).each do |item| + + item.delete('_rev') + target.put(item) + + counter += 1 + puts(" #{type}/#{item['_id']}") if opts[:verbose] + end + end + + counter + end + protected + + # Used by put_msg + # + def prepare_msg_doc (action, options) + + # merge! is way faster than merge (no object creation probably) + + @counter ||= 0 + + t = Time.now.utc + ts = "#{t.strftime('%Y-%m-%d')}!#{t.to_i}.#{'%06d' % t.usec}" + _id = "#{$$}!#{Thread.current.object_id}!#{ts}!#{'%03d' % @counter}" + + @counter = (@counter + 1) % 1000 + # some platforms (windows) have shallow usecs, so adding that counter... + + msg = options.merge!('type' => 'msgs', '_id' => _id, 'action' => action) + + msg.delete('_rev') + # in case of message replay + + msg + end + + # Used by put_schedule + # + def prepare_schedule_doc (flavour, owner_fei, s, msg) + + at = if s.is_a?(Time) # at or every + s + elsif Ruote.is_cron_string(s) # cron + Rufus::CronLine.new(s).next_time(Time.now + 1) + else # at or every + Ruote.s_to_at(s) + end + at = at.utc + + if at <= Time.now.utc && flavour == 'at' + put_msg(msg.delete('action'), msg) + return false + end + + sat = at.strftime('%Y%m%d%H%M%S') + i = "#{flavour}-#{Ruote.to_storage_id(owner_fei)}-#{sat}" + + { + '_id' => i, + 'type' => 'schedules', + 'flavour' => flavour, + 'original' => s, + 'at' => Ruote.time_to_utc_s(at), + 'owner' => owner_fei, + 'msg' => msg + } + end def get_engine_variables get('variables', 'variables') || { 'type' => 'variables', '_id' => 'variables', 'variables' => {} }