lib/hyper-operation/transport/connection.rb in hyper-operation-1.0.alpha1.5 vs lib/hyper-operation/transport/connection.rb in hyper-operation-1.0.alpha1.6

- old
+ new

@@ -1,174 +1,96 @@ +# frozen_string_literal: true + module Hyperstack - module AutoCreate - def table_exists? - # works with both rails 4 and 5 without deprecation warnings - if connection.respond_to?(:data_sources) - connection.data_sources.include?(table_name) - else - connection.tables.include?(table_name) - end - end + class Connection + class << self + attr_accessor :transport, :connection_adapter, :show_diagnostics - def needs_init? - Hyperstack.transport != :none && Hyperstack.on_server? && !table_exists? - end + def adapter + adapter_name = Hyperstack.connection[:adapter].to_s + adapter_path = "hyper-operation/transport/connection_adapter/#{adapter_name}" - def create_table(*args, &block) - connection.create_table(table_name, *args, &block) if needs_init? - end - end + begin + require adapter_path + rescue LoadError => e + if e.path == adapter_path + raise e.class, "Could not load the '#{adapter_name}' adapter. Make sure the adapter is spelled correctly in your Hyperstack config, and the necessary gems are in your Gemfile.", e.backtrace - class Connection < ActiveRecord::Base - class QueuedMessage < ActiveRecord::Base + # Bubbled up from the adapter require. Prefix the exception message + # with some guidance about how to address it and reraise. + else + raise e.class, "Error loading the '#{adapter_name}' adapter. Missing a gem it depends on? #{e.message}", e.backtrace + end + end - extend AutoCreate - - self.table_name = 'hyperstack_queued_messages' - - do_not_synchronize - - serialize :data - - belongs_to :hyperstack_connection, - class_name: 'Hyperstack::Connection', - foreign_key: 'connection_id' - - scope :for_session, - ->(session) { joins(:hyperstack_connection).where('session = ?', session) } - - # For simplicity we use QueuedMessage with connection_id 0 - # to store the current path which is used by consoles to - # communicate back to the server - - default_scope { where('connection_id IS NULL OR connection_id != 0') } - - def self.root_path=(path) - unscoped.find_or_create_by(connection_id: 0).update(data: path) + adapter_name = adapter_name.camelize + "Hyperstack::ConnectionAdapter::#{adapter_name}".constantize end - def self.root_path - unscoped.find_or_create_by(connection_id: 0).data + def build_tables + adapter.build_tables end - end - extend AutoCreate - - def self.build_tables - create_table(force: :cascade) do |t| - t.string :channel - t.string :session - t.datetime :created_at - t.datetime :expires_at - t.datetime :refresh_at + def build_tables? + adapter.respond_to?(:build_tables) end - QueuedMessage.create_table(force: :cascade) do |t| - t.text :data - t.integer :connection_id - end - end - do_not_synchronize - - self.table_name = 'hyperstack_connections' - - has_many :messages, - foreign_key: 'connection_id', - class_name: 'Hyperstack::Connection::QueuedMessage', - dependent: :destroy - scope :expired, - -> { where('expires_at IS NOT NULL AND expires_at < ?', Time.zone.now) } - scope :pending_for, - ->(channel) { where(channel: channel).where('session IS NOT NULL') } - scope :inactive, - -> { where('session IS NULL AND refresh_at < ?', Time.zone.now) } - - def self.needs_refresh? - exists?(['refresh_at IS NOT NULL AND refresh_at < ?', Time.zone.now]) - end - - def transport - self.class.transport - end - - before_create do - if session - self.expires_at = Time.now + transport.expire_new_connection_in - elsif transport.refresh_channels_every != :never - self.refresh_at = Time.now + transport.refresh_channels_every + def active + adapter.active end - end - class << self - attr_accessor :transport + def open(channel, session = nil, root_path = nil) + puts "open(#{channel}, #{session}, #{root_path})" if show_diagnostics - def active - # if table doesn't exist then we are either calling from within - # a migration or from a console before the server has ever started - # in these cases there are no channels so we return nothing - return [] unless table_exists? - if Hyperstack.on_server? - expired.delete_all - refresh_connections if needs_refresh? + adapter.open(channel, session, root_path).tap do |c| + puts " - open returning #{c}" if show_diagnostics end - all.pluck(:channel).uniq end - def open(channel, session = nil, root_path = nil) - self.root_path = root_path - find_or_create_by(channel: channel, session: session) - end - def send_to_channel(channel, data) - pending_for(channel).each do |connection| - QueuedMessage.create(data: data, hyperstack_connection: connection) - end - transport.send_data(channel, data) if exists?(channel: channel, session: nil) + puts "send_to_channel(#{channel}, #{data})" if show_diagnostics + + adapter.send_to_channel(channel, data) end def read(session, root_path) - self.root_path = root_path - where(session: session) - .update_all(expires_at: Time.now + transport.expire_polled_connection_in) - QueuedMessage.for_session(session).destroy_all.pluck(:data) + puts "read(#{session}, #{root_path})" if show_diagnostics + + adapter.read(session, root_path) end def connect_to_transport(channel, session, root_path) - self.root_path = root_path - if (connection = find_by(channel: channel, session: session)) - messages = connection.messages.pluck(:data) - connection.destroy - else - messages = [] - end - open(channel) - messages + puts "connect_to_transport(#{channel}, #{session}, #{root_path})" if show_diagnostics + + adapter.connect_to_transport(channel, session, root_path) end def disconnect(channel) - find_by(channel: channel, session: nil).destroy + adapter.disconnect(channel) end def root_path=(path) - QueuedMessage.root_path = path if path + adapter.root_path = path end def root_path - # if the QueuedMessage table doesn't exist then we are either calling from within - # a migration or from a console before the server has ever started - # in these cases there is no root path to the server - QueuedMessage.root_path if QueuedMessage.table_exists? + adapter.root_path end def refresh_connections - refresh_started_at = Time.zone.now - channels = transport.refresh_channels - next_refresh = refresh_started_at + transport.refresh_channels_every - channels.each do |channel| - connection = find_by(channel: channel, session: nil) - connection.update(refresh_at: next_refresh) if connection + adapter.refresh_connections + end + + def method_missing(method_name, *args, &block) + if adapter::Connection.respond_to?(method_name) + adapter::Connection.send(method_name, *args, &block) + else + super end - inactive.delete_all + end + + def respond_to_missing?(method_name, include_private = false) + adapter::Connection.respond_to?(method_name) end end end end