log/elasticsearch_logging.rb in cpee-1.4.31 vs log/elasticsearch_logging.rb in cpee-1.4.32
- old
+ new
@@ -7,12 +7,20 @@
require 'riddl/server'
require 'faraday'
require 'elasticsearch'
class Logging < Riddl::Implementation
-
- def doc(topic,event_name,esc,template,instancenr,notification)
+ def doc(topic, event_name, esc, template, instancenr, notification)
+ # DEBUG: del pp
+ pp "---"
+ pp "---"
+ pp "#{topic}/#{event_name}"
+ pp instancenr
+ pp "---"
+ pp notification.to_yaml
+ pp "---"
+ pp "---"
uuid = notification['instance_uuid']
return unless uuid
# activity = notification['activity']
# parameters = notification['parameters']
@@ -24,20 +32,20 @@
"mappings" => {
"entry" => {
"properties" => {
"group" => {
"type" => "keyword"
-
},
"name" => {
"type" => "keyword"
}
}
}
}
}
end #}}}
+
unless esc.indices.exists? index: 'instances' #{{{
esc.indices.create index: 'instances', body: {
"mappings" => {
"entry" => {
"properties" => {
@@ -60,10 +68,11 @@
}
}
}
}
end #}}}
+
unless esc.indices.exists? index: 'spawned' #{{{
esc.indices.create index: 'spawned', body: {
"mappings" => {
"entry" => {
"properties" => {
@@ -74,16 +83,17 @@
"type" => "text"
},
"date": {
"type": "date",
"format": "date_time_no_millis"
- },
+ }
}
}
}
}
end #}}}
+
unless esc.indices.exists? index: 'sensors' #{{{
esc.indices.create index: 'sensors', body: {
"mappings" => {
"entry" => {
"properties" => {
@@ -105,10 +115,11 @@
}
}
}
}
end #}}}
+
unless esc.indices.exists? index: 'values' #{{{
esc.indices.create index: 'values', body: {
"mappings" => {
"entry" => {
"properties" => {
@@ -142,10 +153,37 @@
}
}
}
end #}}}
+ unless esc.indices.exists? index: 'test_values' #{{{
+ esc.indices.create index: 'test_values', body: {
+ "mappings" => {
+ "entry" => {
+ "properties" => {
+ "uuid" => {
+ "type" => "keyword"
+ },
+ "sensor" => {
+ "type" => "keyword"
+ },
+ "task" => {
+ "type" => "keyword"
+ },
+ "timestamp": {
+ "type": "date",
+ "format": "date_time"
+ },
+ "value" => {
+ "type" => "object"
+ }
+ }
+ }
+ }
+ }
+ end #}}}
+
uuid = notification.dig('instance_uuid')
if notification.dig('attributes','artefacts') #{{{
artefacts = JSON.parse(notification.dig('attributes','artefacts'))
@@ -158,61 +196,162 @@
'group': a['group'],
'name': a['name'],
'date': Time.now.iso8601,
'info': notification.dig('attributes','info')
}
-
end
end #}}}
- pp notification
+ # DEBUG:
+ # pp notification
case "#{topic}/#{event_name}"
- when "dataelements/change", "endpoints/change"
- sensors = JSON.parse(notification.dig('attributes','sensors') || '[]')
- sensors.each do |s|
- sid = Digest::MD5.hexdigest(uuid + '_' + s['name'])
- esc.index index: 'sensors', type: 'entry', id: sid, body: {
- 'uuid': uuid,
- 'sensor': s['name'],
- 'visualizer_url': s['visualizer_url'],
- 'visualizer_params': [s['visualizer_params']]
- }
- esc.index index: 'values', type: 'entry', body: {
- 'uuid': uuid,
- 'sensor': s['name'],
- 'timestamp': notification.dig('timestamp'),
- 'value': s['value']
- }
- end
- when "activity/receiving"
- sensors = JSON.parse(notification.dig('sensors') || '[]')
- tdoc = notification.dig('received')
- sensors.each do |s|
- sid = Digest::MD5.hexdigest(uuid + '_' + s['name'])
- esc.index index: 'sensors', type: 'entry', id: sid, body: {
- 'uuid': uuid,
- 'sensor': s['name'],
- 'task': notification.dig('activity'),
- 'visualizer_url': s['visualizer_url'],
- 'visualizer_params': (s['visualizer_params'].nil? || s['visualizer_params'].empty? ? [] : [s['visualizer_params']])
- }
- status, result = Riddl::Client.new(s['extractor_url']).post [
- Riddl::Parameter::Simple.new('data',JSON.pretty_generate(tdoc)),
- Riddl::Parameter::Simple.new('what',s['extractor_arg'])
- ]
- if status >= 200 && status < 300
- ret = JSON::parse(result[0]&.value.read) rescue []
- ret.each do |v,t|
- esc.index index: 'values', type: 'entry', body: {
- 'uuid': uuid,
- 'sensor': s['name'],
- 'timestamp': t,
- 'value': v
- }
- end
- end
+ when "dataelements/change", "endpoints/change"
+ sensors = JSON.parse(notification.dig('attributes', 'sensors') || '[]')
+ sensors.each do |s|
+ sid = Digest::MD5.hexdigest(uuid + '_' + s['name'])
+ esc.index index: 'sensors', type: 'entry', id: sid, body: {
+ 'uuid': uuid,
+ 'sensor': s['name'],
+ 'visualizer_url': s['visualizer_url'],
+ 'visualizer_params': [s['visualizer_params']]
+ }
+ esc.index index: 'values', type: 'entry', body: {
+ 'uuid': uuid,
+ 'sensor': s['name'],
+ 'timestamp': notification.dig('timestamp'),
+ 'value': s['value']
+ }
+ end
+ when "activity/receiving"
+ sensors = JSON.parse(notification.dig('sensors') || '[]')
+ # tdoc = notification.dig('received')
+ tdoc = notification
+ # DEBUG:
+ # pp notification
+
+ sensors.each do |s|
+ sid = Digest::MD5.hexdigest(uuid + '_' + s['name'])
+ esc.index index: 'sensors', type: 'entry', id: sid, body: {
+ 'uuid': uuid,
+ 'sensor': s['name'],
+ 'task': notification.dig('activity'),
+ 'visualizer_url': s['visualizer_url'],
+ 'visualizer_params': (s['visualizer_params'].nil? || s['visualizer_params'].empty? ? [] : [s['visualizer_params']])
+ }
+ status, result = Riddl::Client.new(s['extractor_url']).post [
+ Riddl::Parameter::Simple.new('data', JSON.pretty_generate(tdoc)),
+ Riddl::Parameter::Simple.new('what', s['extractor_arg'])
+ ]
+
+ if status >= 200 && status < 300
+ # ret = JSON::parse(result[0]&.value.read) rescue []
+ ret = JSON.parse(result[0]&.value.read) rescue []
+ ret.each do |v, t|
+ esc.index index: 'values', type: 'entry', body: {
+ 'uuid': uuid,
+ 'sensor': s['name'],
+ # REVIEW: Why generate the timestamp in the PHP extractor?
+ # 'timestamp': "#{t}",
+ # 'timestamp': notification.dig('timestamp'),
+ 'timestamp': t,
+ 'value': v
+ }
+ end
end
+ end
+
+ when "activity/calling"
+ sid = Digest::MD5.hexdigest(uuid + '_' + "start_task")
+ esc.index index: 'sensors', type: 'entry', id: sid, body: {
+ 'uuid': uuid,
+ 'sensor': "start_task",
+ 'task': "#{notification.dig('activity')}/#{notification.dig('label')}",
+ # 'visualizer_url': '',
+ # 'visualizer_params': ''
+ }
+ esc.index index: 'test_values', type: 'entry', body: {
+ 'uuid': uuid,
+ 'sensor': "start_task",
+ 'timestamp': notification.dig('timestamp'),
+ 'value': {
+ 'id': notification.dig('activity'),
+ 'name': notification.dig('label'),
+ 'uuid': uuid,
+ 'instance': instancenr
+ }
+ }
+
+ when "activity/done"
+ sid = Digest::MD5.hexdigest(uuid + '_' + "end_task")
+ esc.index index: 'sensors', type: 'entry', id: sid, body: {
+ 'uuid': uuid,
+ 'sensor': "end_task",
+ 'task': "#{notification.dig('activity')}/#{notification.dig('label')}",
+ # 'visualizer_url': '',
+ # 'visualizer_params': ''
+ }
+ esc.index index: 'test_values', type: 'entry', body: {
+ 'uuid': uuid,
+ 'sensor': "end_task",
+ 'timestamp': notification.dig('timestamp'),
+ 'value': {
+ 'id': notification.dig('activity'),
+ 'name': notification.dig('label'),
+ 'uuid': uuid,
+ 'instance': instancenr
+ }
+ }
+
+ when "task/instantiation"
+ sid = Digest::MD5.hexdigest(uuid + '_' + "sub_task")
+ esc.index index: 'sensors', type: 'entry', id: sid, body: {
+ 'uuid': uuid,
+ 'sensor': 'sub_task',
+ 'task': "#{notification.dig('activity')}/#{notification.dig('label')}"
+ # 'visualizer_url': '',
+ # 'visualizer_params': ''
+ }
+ esc.index index: 'test_values', type: 'entry', body: {
+ # 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'),
+ 'uuid': uuid,
+ 'sensor': 'sub_task',
+ 'timestamp': notification.dig('timestamp'),
+ 'value': {
+ # 'child_uuid': notification.dig('received', 'data', 'CPEE-INSTANCE-UUID'),
+ # 'child_instance': notification.dig('received', 'data', 'CPEE-INSTANCE')
+ 'id': notification.dig('activity'),
+ 'name': notification.dig('label'),
+ 'uuid': uuid,
+ 'instance': instancenr,
+ 'child_uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'),
+ 'child_instance': notification.dig('received', 'CPEE-INSTANCE')
+ }
+ }
+ sid = Digest::MD5.hexdigest(uuid + '_' + "sub_task")
+ esc.index index: 'sensors', type: 'entry', id: sid, body: {
+ 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'),
+ 'sensor': 'sub_task',
+ 'task': "#{notification.dig('activity')}/#{notification.dig('label')}"
+ # 'visualizer_url': '',
+ # 'visualizer_params': ''
+ }
+ esc.index index: 'test_values', type: 'entry', body: {
+ # 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'),
+ 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'),
+ 'sensor': 'sub_task',
+ 'timestamp': notification.dig('timestamp'),
+ 'value': {
+ # 'child_uuid': notification.dig('received', 'data', 'CPEE-INSTANCE-UUID'),
+ # 'child_instance': notification.dig('received', 'data', 'CPEE-INSTANCE')
+ 'uuid': notification.dig('received', 'CPEE-INSTANCE-UUID'),
+ 'instance': notification.dig('received', 'CPEE-INSTANCE'),
+ 'parent_id': notification.dig('activity'),
+ 'parent_name': notification.dig('label'),
+ 'parent_uuid': uuid,
+ 'parent_instance': instancenr
+ }
+ }
end
nil
end
def response #{{{