log/elasticsearch_logging.rb in cpee-1.4.31 vs log/elasticsearch_logging.rb in cpee-1.4.32

- old
+ new

@@ -7,12 +7,20 @@ require 'riddl/server' require 'faraday' require 'elasticsearch' class Logging < Riddl::Implementation - - def doc(topic,event_name,esc,template,instancenr,notification) + def doc(topic, event_name, esc, template, instancenr, notification) + # DEBUG: del pp + pp "---" + pp "---" + pp "#{topic}/#{event_name}" + pp instancenr + pp "---" + pp notification.to_yaml + pp "---" + pp "---" uuid = notification['instance_uuid'] return unless uuid # activity = notification['activity'] # parameters = notification['parameters'] @@ -24,20 +32,20 @@ "mappings" => { "entry" => { "properties" => { "group" => { "type" => "keyword" - }, "name" => { "type" => "keyword" } } } } } end #}}} + unless esc.indices.exists? index: 'instances' #{{{ esc.indices.create index: 'instances', body: { "mappings" => { "entry" => { "properties" => { @@ -60,10 +68,11 @@ } } } } end #}}} + unless esc.indices.exists? index: 'spawned' #{{{ esc.indices.create index: 'spawned', body: { "mappings" => { "entry" => { "properties" => { @@ -74,16 +83,17 @@ "type" => "text" }, "date": { "type": "date", "format": "date_time_no_millis" - }, + } } } } } end #}}} + unless esc.indices.exists? index: 'sensors' #{{{ esc.indices.create index: 'sensors', body: { "mappings" => { "entry" => { "properties" => { @@ -105,10 +115,11 @@ } } } } end #}}} + unless esc.indices.exists? index: 'values' #{{{ esc.indices.create index: 'values', body: { "mappings" => { "entry" => { "properties" => { @@ -142,10 +153,37 @@ } } } end #}}} + unless esc.indices.exists? index: 'test_values' #{{{ + esc.indices.create index: 'test_values', body: { + "mappings" => { + "entry" => { + "properties" => { + "uuid" => { + "type" => "keyword" + }, + "sensor" => { + "type" => "keyword" + }, + "task" => { + "type" => "keyword" + }, + "timestamp": { + "type": "date", + "format": "date_time" + }, + "value" => { + "type" => "object" + } + } + } + } + } + end #}}} + uuid = notification.dig('instance_uuid') if notification.dig('attributes','artefacts') #{{{ artefacts = JSON.parse(notification.dig('attributes','artefacts')) @@ -158,61 +196,162 @@ 'group': a['group'], 'name': a['name'], 'date': Time.now.iso8601, 'info': notification.dig('attributes','info') } - end end #}}} - pp notification + # DEBUG: + # pp notification case "#{topic}/#{event_name}" - when "dataelements/change", "endpoints/change" - sensors = JSON.parse(notification.dig('attributes','sensors') || '[]') - sensors.each do |s| - sid = Digest::MD5.hexdigest(uuid + '_' + s['name']) - esc.index index: 'sensors', type: 'entry', id: sid, body: { - 'uuid': uuid, - 'sensor': s['name'], - 'visualizer_url': s['visualizer_url'], - 'visualizer_params': [s['visualizer_params']] - } - esc.index index: 'values', type: 'entry', body: { - 'uuid': uuid, - 'sensor': s['name'], - 'timestamp': notification.dig('timestamp'), - 'value': s['value'] - } - end - when "activity/receiving" - sensors = JSON.parse(notification.dig('sensors') || '[]') - tdoc = notification.dig('received') - sensors.each do |s| - sid = Digest::MD5.hexdigest(uuid + '_' + s['name']) - esc.index index: 'sensors', type: 'entry', id: sid, body: { - 'uuid': uuid, - 'sensor': s['name'], - 'task': notification.dig('activity'), - 'visualizer_url': s['visualizer_url'], - 'visualizer_params': (s['visualizer_params'].nil? || s['visualizer_params'].empty? ? [] : [s['visualizer_params']]) - } - status, result = Riddl::Client.new(s['extractor_url']).post [ - Riddl::Parameter::Simple.new('data',JSON.pretty_generate(tdoc)), - Riddl::Parameter::Simple.new('what',s['extractor_arg']) - ] - if status >= 200 && status < 300 - ret = JSON::parse(result[0]&.value.read) rescue [] - ret.each do |v,t| - esc.index index: 'values', type: 'entry', body: { - 'uuid': uuid, - 'sensor': s['name'], - 'timestamp': t, - 'value': v - } - end - end + when "dataelements/change", "endpoints/change" + sensors = JSON.parse(notification.dig('attributes', 'sensors') || '[]') + sensors.each do |s| + sid = Digest::MD5.hexdigest(uuid + '_' + s['name']) + esc.index index: 'sensors', type: 'entry', id: sid, body: { + 'uuid': uuid, + 'sensor': s['name'], + 'visualizer_url': s['visualizer_url'], + 'visualizer_params': [s['visualizer_params']] + } + esc.index index: 'values', type: 'entry', body: { + 'uuid': uuid, + 'sensor': s['name'], + 'timestamp': notification.dig('timestamp'), + 'value': s['value'] + } + end + when "activity/receiving" + sensors = JSON.parse(notification.dig('sensors') || '[]') + # tdoc = notification.dig('received') + tdoc = notification + # DEBUG: + # pp notification + + sensors.each do |s| + sid = Digest::MD5.hexdigest(uuid + '_' + s['name']) + esc.index index: 'sensors', type: 'entry', id: sid, body: { + 'uuid': uuid, + 'sensor': s['name'], + 'task': notification.dig('activity'), + 'visualizer_url': s['visualizer_url'], + 'visualizer_params': (s['visualizer_params'].nil? || s['visualizer_params'].empty? ? [] : [s['visualizer_params']]) + } + status, result = Riddl::Client.new(s['extractor_url']).post [ + Riddl::Parameter::Simple.new('data', JSON.pretty_generate(tdoc)), + Riddl::Parameter::Simple.new('what', s['extractor_arg']) + ] + + if status >= 200 && status < 300 + # ret = JSON::parse(result[0]&.value.read) rescue [] + ret = JSON.parse(result[0]&.value.read) rescue [] + ret.each do |v, t| + esc.index index: 'values', type: 'entry', body: { + 'uuid': uuid, + 'sensor': s['name'], + # REVIEW: Why generate the timestamp in the PHP extractor? + # 'timestamp': "#{t}", + # 'timestamp': notification.dig('timestamp'), + 'timestamp': t, + 'value': v + } + end end + end + + when "activity/calling" + sid = Digest::MD5.hexdigest(uuid + '_' + "start_task") + esc.index index: 'sensors', type: 'entry', id: sid, body: { + 'uuid': uuid, + 'sensor': "start_task", + 'task': "#{notification.dig('activity')}/#{notification.dig('label')}", + # 'visualizer_url': '', + # 'visualizer_params': '' + } + esc.index index: 'test_values', type: 'entry', body: { + 'uuid': uuid, + 'sensor': "start_task", + 'timestamp': notification.dig('timestamp'), + 'value': { + 'id': notification.dig('activity'), + 'name': notification.dig('label'), + 'uuid': uuid, + 'instance': instancenr + } + } + + when "activity/done" + sid = Digest::MD5.hexdigest(uuid + '_' + "end_task") + esc.index index: 'sensors', type: 'entry', id: sid, body: { + 'uuid': uuid, + 'sensor': "end_task", + 'task': "#{notification.dig('activity')}/#{notification.dig('label')}", + # 'visualizer_url': '', + # 'visualizer_params': '' + } + esc.index index: 'test_values', type: 'entry', body: { + 'uuid': uuid, + 'sensor': "end_task", + 'timestamp': notification.dig('timestamp'), + 'value': { + 'id': notification.dig('activity'), + 'name': notification.dig('label'), + 'uuid': uuid, + 'instance': instancenr + } + } + + when "task/instantiation" + sid = Digest::MD5.hexdigest(uuid + '_' + "sub_task") + esc.index index: 'sensors', type: 'entry', id: sid, body: { + 'uuid': uuid, + 'sensor': 'sub_task', + 'task': "#{notification.dig('activity')}/#{notification.dig('label')}" + # 'visualizer_url': '', + # 'visualizer_params': '' + } + esc.index index: 'test_values', type: 'entry', body: { + # 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'), + 'uuid': uuid, + 'sensor': 'sub_task', + 'timestamp': notification.dig('timestamp'), + 'value': { + # 'child_uuid': notification.dig('received', 'data', 'CPEE-INSTANCE-UUID'), + # 'child_instance': notification.dig('received', 'data', 'CPEE-INSTANCE') + 'id': notification.dig('activity'), + 'name': notification.dig('label'), + 'uuid': uuid, + 'instance': instancenr, + 'child_uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'), + 'child_instance': notification.dig('received', 'CPEE-INSTANCE') + } + } + sid = Digest::MD5.hexdigest(uuid + '_' + "sub_task") + esc.index index: 'sensors', type: 'entry', id: sid, body: { + 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'), + 'sensor': 'sub_task', + 'task': "#{notification.dig('activity')}/#{notification.dig('label')}" + # 'visualizer_url': '', + # 'visualizer_params': '' + } + esc.index index: 'test_values', type: 'entry', body: { + # 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'), + 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'), + 'sensor': 'sub_task', + 'timestamp': notification.dig('timestamp'), + 'value': { + # 'child_uuid': notification.dig('received', 'data', 'CPEE-INSTANCE-UUID'), + # 'child_instance': notification.dig('received', 'data', 'CPEE-INSTANCE') + 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'), + 'instance': notification.dig('received', 'CPEE-INSTANCE'), + 'parent_id': notification.dig('activity'), + 'parent_name': notification.dig('label'), + 'parent_uuid': uuid, + 'parent_instance': instancenr + } + } end nil end def response #{{{