# This file is part of CPEE-LOGGING-XES-YAML.
#
# CPEE-LOGGING-XES-YAML is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# CPEE-LOGGING-XES-YAML is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
# more details.
#
# You should have received a copy of the GNU Lesser General Public License along with
# CPEE-LOGGING-XES-YAML (file LICENSE in the main directory). If not, see
# .
require 'weel'
require 'digest/sha1'
class StreamPoint
attr_accessor :value, :timestamp, :source, :meta, :id
def initialize(id=nil)
@id = id
@value = nil
@timestamp = Time.now
@source = nil
@meta = nil
end
def to_h
tp = { }
tp['stream:id'] = @id
tp['stream:value'] = @value
tp['stream:timestamp'] = @timestamp
tp['stream:source'] = @source unless @source.nil?
tp['stream:meta'] = @meta unless @meta.nil?
tp
end
end
class Stream
attr_accessor :id, :source, :meta
attr_reader :name
def initialize(name)
@name = name
@id = nil
@source = nil
@meta = nil
@values = []
end
def <<(val)
@values << val
end
def to_list
tp = []
tp << {'stream:name' => @name}
tp << {'stream:id' => @id} unless @id.nil?
tp << {'stream:source' => @source} unless @source.nil?
tp << {'stream:meta' => @meta} unless @meta.nil?
@values.each do |e|
if e.is_a? Stream
e.source = @source if e.source.nil? && !@source.nil?
tp << { 'stream:datastream' => e.to_list }
elsif e.is_a? StreamPoint
e.source = @source if e.source.nil? && !@source.nil?
tp << { 'stream:point' => e.to_h }
end
end
tp
end
end
module CPEE
module Logging
def self::notify(opts,topic,event_name,payload)
opts[:subscriptions].each do |e,urls|
if e == topic + '/' + event_name
urls.each do |url|
client = Riddl::Client.new(url)
client.post [
Riddl::Parameter::Simple::new('type','event'),
Riddl::Parameter::Simple::new('topic',topic),
Riddl::Parameter::Simple::new('event',event_name),
Riddl::Parameter::Complex::new('notification','application/json',payload)
]
end
end
end
end
def self::val_merge(target,val,tid,tso)
if val.is_a? Stream
val.source = tso if val.source.nil?
target.push *val.to_list
else
tp = nil
if val.is_a? StreamPoint
tp = val
tp.id = tid if tp.id.nil?
tp.source = tso if tp.source.nil?
else
tp = StreamPoint.new(tid)
tp.source = tso
tp.value = val
end
target << { 'stream:point' => tp.to_h }
end
end
def self::extract_probes(where,xml)
XML::Smart::string(xml) do |doc|
doc.register_namespace 'd', 'http://cpee.org/ns/description/1.0'
doc.find('//d:call').each do |c|
File.unlink(where + '_' + c.attributes['id'] + '.probe') rescue nil
c.find('d:annotations/d:_context_data_analysis/d:probes[d:probe]').each do |p|
File.write(where + '_' + c.attributes['id'] + '.probe', p.dump)
end
end
end
end
def self::extract_annotations(where,xml)
ret = {}
XML::Smart::string(xml) do |doc|
doc.register_namespace 'd', 'http://cpee.org/ns/description/1.0'
doc.find('/d:description | //d:call').each do |c|
tid = c.attributes['id'] || 'start'
fname = where + '_' + tid + '.anno'
nset = if tid == 'start'
c.find('d:*[starts-with(name(),"_")]')
else
c.find('d:annotations')
end
nset.each do |p|
anno = p.dump
ret[tid] ||= []
ret[tid] << anno
end
if ret[tid]
if ret[tid].length > 1
ret[tid] = "\n" +
ret[tid].join("\n") + "\n" +
""
else
ret[tid] = ret[tid][0]
end
hash = Digest::SHA1.hexdigest(ret[tid])
if !File.exist?(fname) || (File.exist?(fname) && File.read(fname) != hash)
File.write(fname,hash)
end
end
end
end
ret
end
def self::extract_result(result)
ret = result.map do |res|
if res['mimetype'].nil?
res['value']
elsif res['mimetype'] == 'application/json'
JSON::parse(res['data'])
elsif res['mimetype'] == 'application/xml' || res['mimetype'] == 'text/xml'
XML::Smart::string(res['data']) rescue nil
elsif res['mimetype'] == 'text/yaml'
YAML::load(res['data']) rescue nil
elsif res['mimetype'] == 'text/plain'
t = res['data']
if t.start_with?(" e
e.backtrace[0].gsub(/(\w+):(\d+):in.*/,'Probe ' + pid + ' Line \2: ') + e.message
end
def self::persist_values(where,values)
unless File.exist?(where)
File.write(where,'{}')
end
f = File.open(where,'r+')
f.flock(File::LOCK_EX)
json = JSON::load(f) || {}
json.merge!(values)
f.rewind
f.truncate(0)
f.write(JSON.generate(json))
f.close
end
def self::forward(opts,topic,event_name,payload)
if topic == 'state' && event_name == 'change'
self::notify(opts,topic,event_name,payload)
elsif topic == 'state' && event_name == 'change'
self::notify(opts,topic,event_name,payload)
elsif topic == 'gateway' && event_name == 'join'
self::notify(opts,topic,event_name,payload)
end
end
def self::doc(opts,topic,event_name,payload)
notification = JSON.parse(payload)
instance = notification['instance-uuid']
return unless instance
log_dir = opts[:log_dir]
template = opts[:template]
instancenr = notification['instance']
content = notification['content']
activity = content['activity']
parameters = content['parameters']
receiving = content['received']
if content['dslx']
CPEE::Logging::extract_probes(File.join(log_dir,instance),content['dslx'])
CPEE::Logging::extract_annotations(File.join(log_dir,instance),content['dslx']).each do |k,v|
so = Marshal.load(Marshal.dump(notification))
so['content'].delete('dslx')
so['content'].delete('dsl')
so['content'].delete('description')
so['content']['annotation'] = v
so['content']['activity'] = k
so['topic'] = 'annotation'
so['name'] = 'change'
EM.defer do
self::notify(opts,'annotation','change',so.to_json)
end
end
end
if topic == 'dataelements' && event_name == 'change'
if content['changed']&.any?
CPEE::Logging::persist_values(File.join(log_dir,instance + '.data.json'),content['values'])
end
end
log = YAML::load(File.read(template))
log["log"]["trace"]["concept:name"] ||= instancenr
log["log"]["trace"]["cpee:name"] ||= notification['instance-name'] if notification['instance-name']
log["log"]["trace"]["cpee:instance"] ||= instance
File.open(File.join(log_dir,instance+'.xes.yaml'),'w'){|f| f.puts log.to_yaml} unless File.exist? File.join(log_dir,instance+'.xes.yaml')
event = {}
event["concept:instance"] = instancenr
event["concept:name"] = content["label"] if content["label"]
if content["endpoint"]
event["concept:endpoint"] = content["endpoint"]
end
event["id:id"] = (activity.nil? || activity == "") ? 'external' : activity
event["cpee:activity"] = event["id:id"]
event["cpee:activity_uuid"] = content['activity-uuid'] if content['activity-uuid']
event["cpee:instance"] = instance
case event_name
when 'receiving', 'change', 'instantiation'
event["lifecycle:transition"] = "unknown"
when 'done'
event["lifecycle:transition"] = "complete"
else
event["lifecycle:transition"] = "start"
end
event["cpee:lifecycle:transition"] = "#{topic}/#{event_name}"
event["cpee:state"] = content['state'] if content['state']
event["cpee:description"] = content['dslx'] if content['dslx']
unless parameters["arguments"]&.nil?
event["data"] = parameters["arguments"]
end if parameters
if content['changed']&.any?
event["data"] = content['values'].map do |k,v|
{ 'name' => k, 'value' => v }
end
fname = File.join(log_dir,instance + '_' + event["id:id"] + '.probe')
dname = File.join(log_dir,instance + '.data.json')
if File.exist?(fname)
rs = WEEL::ReadStructure.new(File.exist?(dname) ? JSON::load(File::open(dname)) : {},{},{},{})
XML::Smart::open_unprotected(fname) do |doc|
doc.register_namespace 'd', 'http://cpee.org/ns/description/1.0'
doc.find('//d:probe[d:extractor_type="intrinsic"]').each do |p|
pid = p.find('string(d:id)')
event['stream:datastream'] ||= []
val = CPEE::Logging::extract_sensor(rs,p.find('string(d:extractor_code)'),pid,nil) rescue nil
CPEE::Logging::val_merge(event['stream:datastream'],val,pid,p.find('string(d:source)'))
end
end
notification['datastream'] = event['stream:datastream']
EM.defer do
notification['topic'] = 'stream'
notification['name'] = 'extraction'
self::notify(opts,'stream','extraction',notification.to_json)
end
end
end
if topic == 'activity' && event_name == 'receiving' && receiving && !receiving.empty?
fname = File.join(log_dir,instance + '_' + event["id:id"] + '.probe')
dname = File.join(log_dir,instance + '.data.json')
if File.exist?(fname)
te = event.dup
rs = WEEL::ReadStructure.new(File.exist?(dname) ? JSON::load(File::open(dname)) : {},{},{},{})
XML::Smart::open_unprotected(fname) do |doc|
doc.register_namespace 'd', 'http://cpee.org/ns/description/1.0'
if doc.find('//d:probe/d:extractor_type[.="extrinsic"]').any?
rc = CPEE::Logging::extract_result(receiving)
doc.find('//d:probe[d:extractor_type="extrinsic"]').each do |p|
pid = p.find('string(d:id)')
te['stream:datastream'] ||= []
val = CPEE::Logging::extract_sensor(rs,p.find('string(d:extractor_code)'),pid,rc) rescue nil
CPEE::Logging::val_merge(te['stream:datastream'],val,pid,p.find('string(d:source)'))
end
end
end
if te['stream:datastream']
te["cpee:lifecycle:transition"] = "stream/data"
File.open(File.join(log_dir,instance+'.xes.yaml'),'a') do |f|
f << {'event' => te}.to_yaml
end
notification['datastream'] = te['stream:datastream']
EM.defer do
notification['topic'] = 'stream'
notification['name'] = 'extraction'
self::notify(opts,'stream','extraction',notification.to_json)
end
end
end
end
if receiving && !receiving.empty?
event["data"] = receiving
end
event["time:timestamp"]= notification['timestamp'] || Time.now.strftime("%Y-%m-%dT%H:%M:%S.%L%:z")
File.open(File.join(log_dir,instance+'.xes.yaml'),'a') do |f|
f << {'event' => event}.to_yaml
end
end
end
end