Sha256: 2fef3e180077a660e15f8ec8848c387ec6d24a1109b53a50ac605f1be963b4d0
Contents?: true
Size: 1.68 KB
Versions: 5
Compression:
Stored size: 1.68 KB
Contents
module FaaStRuby class EventHub @@queue = Queue.new def self.queue @@queue end def self.push(payload) @@queue << payload end def self.thread @@thread end def self.load_subscribers Dir.glob('*/*/faastruby.yml').each do |file| workspace_name, function_name, _ = file.split('/') path = "#{workspace_name}/#{function_name}" config = YAML.load(File.read(file)) next unless config['channels'].is_a?(Array) config['channels'].compact! config['channels'].each do |c| channel = EventChannel.new(c) channel.subscribe(path) end end puts "[EventHub] Channel subscriptions: #{EventChannel.channels}".yellow puts "[EventHub] If you modify 'faastruby.yml' in any function, you will need to restart the server to apply the changes.".yellow end def self.listen_for_events! load_subscribers @@thread = Thread.new do loop do encoded_channel, encoded_data = @@queue.pop.split(',') channel = EventChannel.new(Base64.urlsafe_decode64(encoded_channel)) puts "[EventHub] Event channel=#{channel.name.inspect}".yellow channel.subscribers.each do |s| subscriber = Subscriber.new(s) puts "[EventHub] Trigger function=#{subscriber.path.inspect} base64_payload=#{encoded_data.inspect}".yellow response = subscriber.call(encoded_data) puts "[#{subscriber.path}] #=> status=#{response.status} body=#{response.body.inspect} headers=#{Oj.dump response.headers}".light_blue end end end puts "[EventHub] Events thread started.".yellow end end end
Version data entries
5 entries across 5 versions & 1 rubygems