# #-- # Copyright (c) 2006-2007, Nicolas Modryzk and John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # . Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # . Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $ # # # "made in Japan" # # Nicolas Modrzyk at openwfe.org # John Mettraux at openwfe.org # require 'fileutils' require 'base64' require 'openwfe/service' require 'openwfe/workitem' require 'openwfe/flowexpressionid' require 'openwfe/expressions/expressionmap' module OpenWFE # # by default the journals are stored into "work/journal" # DEFAULT_STORAGE_PATH = "work/" # # this is the container class for storing entries in the the journal # class JournalEntry # reused for ENTRY_DELIM = ' -- ' attr_reader :fei, :action, :fe, :time def initialize (action, fei, fe, stime=nil) @action = action @fei = fei @fe = fe @time = if stime stime else Time.new end end # # parse the entry from a line read from a log file def JournalEntry.parse_from_string (line) # offset includes iso formatting and delim time_offset = line.index(ENTRY_DELIM) time = Time.iso8601(line.slice(0..time_offset)) line = line.slice(time_offset+ ENTRY_DELIM.length..line.length) action = line[0..2] index = line.index(ENTRY_DELIM,7) fei = OpenWFE::FlowExpressionId.to_fei(line[8..index-1]) if action == :put.to_s fe_64_yaml = line[index+ENTRY_DELIM.length,line.length].rstrip fe_yaml = Base64.decode64(fe_64_yaml.gsub("**","\n")) fe = YAML.load(fe_yaml) return JournalEntry.new(:put, fei, fe, time) else return JournalEntry.new(action, fei, nil, time) end end # # this is directly used to store the entry in the journal. # This is using base64 encoding to wrap the encoded object, and is not readable. Use to_human_s to decrypt def to_s if @action == :put base64 = Base64.encode64(YAML.dump(@fe)).gsub("\n","**") + "\n" @time.iso8601 + ENTRY_DELIM + @action.to_s + ENTRY_DELIM + @fei.to_s + ENTRY_DELIM + base64 else @time.iso8601 + ENTRY_DELIM + @action.to_s + ENTRY_DELIM + @fei.to_s + ENTRY_DELIM + "\n" end end # # same as above,except the encoded object is fully expanded. The return value cannot be processed. def to_human_s if @action == :put @time.iso8601 + ENTRY_DELIM + @action.to_s + ENTRY_DELIM + @fei.to_s + ENTRY_DELIM + "\n" + YAML.dump(@fe).to_s + "---\n" else to_s end end end # # the class responsible for handling replay of a log file class JournalReplay def initialize(storage=nil) if not storage @storage = @application_context[S_EXPRESSION_STORAGE] else @storage = storage end end # # from the content of path_to_journal, will replay all the entries one by one on the given storage def replay(path_to_journal) datalines = IO.readlines(path_to_journal) datalines.each { |line_read| entry = (JournalEntry.parse_from_string(line_read)) if(entry.action == :put) @storage[entry.fei]=entry.fe else #TODO: implement delete #@storage.delete(entry.fei) @storage.remove(entry.fei) end } end end # # this is the main class for journalized storage class JournalizedExpressionStorage include ServiceMixin attr_accessor :basepath def initialize (serviceName, applicationContext) service_init(serviceName, applicationContext) path = if (@application_context) @application_context[:file_expression_storage_path] else DEFAULT_STORAGE_PATH end @basepath = path + "/journal" FileUtils.makedirs @basepath end # # remove the file corresponding to the journal of this workflow instance def clean (workflow_id) fei_path = compute_file_path_id(workflow_id) if (File.exist?(fei_path)) File.delete(fei_path) end end # # backup the journal file corresponding to the give workflow instance # by default, deletes the original journal file. def backup (workflow_id, delete_original = true, backup_path = nil) path_to_journal = compute_file_path_id(workflow_id) temporary_journal = if not backup_path path_to_journal+".0" else backup_path end FileUtils.copy_file(path_to_journal, temporary_journal) File.delete(path_to_journal) if delete_original return temporary_journal end # # truncate the content of the log file: # - number of lines # - before a given date def truncate! (workflow_id, number_of_lines=nil, after_date=nil) return 0 if not number_of_lines and not after_date fei_path = compute_file_path_id(workflow_id) datalines = IO.readlines(fei_path) entries = Array.new datalines.each { |line_read| entry = JournalEntry.parse_from_string(line_read) break if after_date and entry.time >= after_date break if number_of_lines and entries.length >= number_of_lines entries.push entry } backup(workflow_id) fd = IO.sysopen(fei_path, File::WRONLY|File::CREAT, 0666) io = IO.open(fd , "w") entries.each{ |entry| io.write(entry.to_s) } io.close return entries.length end # # create a human readable log file for the given workflow instance. # Note that this will override any existing previously created file def to_human_s(workflow_id) fei_path = compute_file_path_id(workflow_id) if (File.exist?(fei_path)) fei_new_path = fei_path[0,fei_path.length-5] + ".txt" File.delete(fei_new_path) if (File.exist?(fei_new_path)) fd = IO.sysopen(fei_new_path, File::WRONLY|File::CREAT, 0666) io = IO.open(fd , "w") datalines = IO.readlines(fei_path) datalines.each { |line_read| human = (JournalEntry.parse_from_string(line_read)).to_human_s ; io.write(human) } io.close else raise "workflow journal does not exist for given id:"+workflow_id end end # # interface method for the persistence. add a flow expression to the storage def []= (fei, flow_expression) write_to_journal(JournalEntry.new(:put, fei, flow_expression)) end # # interface method: remove the expressionid and persist the related workitem def remove (fei) write_to_journal(JournalEntry.new(:del, fei, nil)) end # # compute the size of the journal (number of entries) in the journal for the # given workflow id def size (workflow_instance_id) fei_path = compute_file_path_id(workflow_instance_id) return 0 if not File.exist?(fei_path) datalines = IO.readlines(fei_path) return datalines.length end # # alias for the protected compute_file_path with a better name. # Return the path of the log for the given workflow instance def where_is_journal_for (workflow_instance_id) compute_file_path(workflow_instance_id) end # # write an entry to the journal. This call the to_s method of the entry # and append the resulting line to the log file def write_to_journal (journal_entry) fei_path = compute_file_path(journal_entry.fei) fei_parent_path = File.dirname(fei_path) FileUtils.makedirs fei_parent_path if not (File.exist?(fei_parent_path)) fd = IO.sysopen(fei_path, File::WRONLY|File::APPEND|File::CREAT, 0666) io = IO.open(fd , "w") io.write(journal_entry.to_s) io.close end protected def compute_file_path (fei) compute_file_path_id(fei.parent_workflow_instance_id) end def compute_file_path_id (parent_workflow_instance_id) @basepath + "/" + parent_workflow_instance_id.to_s + ".yaml" end end end