# This file is part of CPEE. # # CPEE is free software: you can redistribute it and/or modify it under the terms # of the GNU General Public License as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # # CPEE is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A # PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # CPEE (file COPYING in the main directory). If not, see # . require 'json' require_relative 'fail' module CPEE module Notifications def self::implementation(id,opts) Proc.new do if CPEE::Persistence::exists?(id,opts) on resource "notifications" do run CPEE::Notifications::Overview if get on resource "topics" do run CPEE::Notifications::Topics, opts if get end on resource "subscriptions" do run CPEE::Notifications::Subscriptions, id, opts if get run CPEE::Notifications::CreateSubscription, id, opts if post 'create_subscription' on resource do run CPEE::Notifications::Subscription, id, opts if get run CPEE::Notifications::UpdateSubscription, id, opts if put 'change_subscription' run CPEE::Notifications::DeleteSubscription, id, opts if delete on resource 'sse' do run CPEE::Notifications::SSE, id, opts if sse end end end end else run CPEE::FAIL end end end class Overview < Riddl::Implementation #{{{ def response Riddl::Parameter::Complex.new("overview","text/xml") do <<-END END end end end #}}} class Topics < Riddl::Implementation #{{{ def response opts = @a[0] Riddl::Parameter::Complex.new("overview","text/xml") do File.read(opts[:topics]) end end end #}}} class Subscriptions < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] Riddl::Parameter::Complex.new("subscriptions","text/xml") do ret = XML::Smart::string <<-END END CPEE::Persistence::extract_handlers(id,opts).each do |de| ret.root.add('subscription').tap do |n| n.attributes['id'] = de[0] n.attributes['url'] = de[1] if de[1] && !de[1].empty? end end ret.to_s end end end #}}} class Subscription < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] key = @r[-1] if CPEE::Persistence::exists_handler?(id,opts,key) Riddl::Parameter::Complex.new("subscriptions","text/xml") do ret = XML::Smart::string <<-END END url = CPEE::Persistence::extract_item(id,opts,File.join('handlers',key,'url')) ret.root.attributes['id'] = key ret.root.attributes['url'] = url if url && !url.empty? items = {} CPEE::Persistence::extract_handler(id,opts,key).each do |h| t, i, v = h.split('/') items[t] ||= [] items[t] << [i,v] end items.each do |k,v| ret.root.add('topic').tap do |n| n.attributes['id'] = k v.each do |e| n.add *e end end end ret.to_s end else @status = 404 end end end #}}} class CreateSubscription < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] key = @p[0].name == 'id' ? @p.shift.value : Digest::MD5.hexdigest(Kernel::rand().to_s) url = @p[0].name == 'url' ? @p.shift.value : nil values = [] while @p.length > 0 topic = @p.shift.value base = @p.shift type = base.name values += base.value.split(',').map { |i| File.join(topic,type[0..-2],i) } end @header = CPEE::Persistence::set_handler(id,opts,key,url,values) Riddl::Parameter::Simple.new('key',key) end end #}}} class UpdateSubscription < Riddl::Implementation #{{{ def response id = @a[0] opts = @a[1] key = @r.last if CPEE::Persistence::exists_handler?(id,opts,key) url = @p[0].name == 'url' ? @p.shift.value : nil values = [] while @p.length > 0 topic = @p.shift.value base = @p.shift type = base.name values += base.value.split(',').map { |i| File.join(topic,type[0..-2],i) } end @header = CPEE::Persistence::set_handler(id,opts,key,url,values,true) else @status = 404 end end end #}}} class DeleteSubscription < Riddl::Implementation #{{{ def self::set(id,opts,key) CPEE::Persistence::set_handler(id,opts,key,"",[],true) end def response id = @a[0] opts = @a[1] key = @r.last if CPEE::Persistence::exists_handler?(id,opts,key) DeleteSubscription::set(id,opts,key) else @status = 404 end nil end end #}}} def self::sse_distributor(opts) #{{{ conn = opts[:redis_dyn].call "Server SSE" conn.psubscribe('forward:*','event:state/change') do |on| on.pmessage do |pat, what, message| if pat == 'forward:*' id, key = what.match(/forward:([^\/]+)\/(.+)/).captures if sse = opts.dig(:sse_connections,id,key) sse.send message else DeleteSubscription::set(id,opts,key) end elsif pat == 'event:state/change' mess = JSON.parse(message[message.index(' ')+1..-1]) state = mess.dig('content','state') if state == 'finished' || state == 'abandoned' opts.dig(:sse_connections,mess.dig('instance').to_s)&.each do |key,sse| EM.add_timer(10) do # just to be sure that all messages arrived. 10 seconds should be enough ... we think ... therefore we are (not sure) sse.close end end end end end end conn.close end #}}} def self::sse_heartbeat(opts) #{{{ opts.dig(:sse_connections).each do |id,keys| keys.each do |key,sse| sse.send_with_id('heartbeat', '42') unless sse&.closed? end end end #}}} class SSE < Riddl::SSEImplementation #{{{ def onopen @opts = @a[1] @id = @a[0].to_s @key = @r[-2] if CPEE::Persistence::exists_handler?(@id,@opts,@key) @opts[:sse_connections][@id] ||= {} @opts[:sse_connections][@id][@key] = self true else false end end def onclose @opts.dig(:sse_connections,@id)&.delete(@key) @opts.dig(:sse_connections)&.delete(@id) if @opts.dig(:sse_connections,@id)&.length == 0 DeleteSubscription::set(@id,@opts,@key) end end #}}} end end