module ReactiveRecord
  class Broadcast

    def self.after_commit(operation, model)
      # Calling public_columns_hash once insures all policies are loaded
      # before the first broadcast.
      @public_columns_hash ||= ActiveRecord::Base.public_columns_hash
      Hyperstack::InternalPolicy.regulate_broadcast(model) do |data|
        puts "Broadcast aftercommit hook: #{data}" if Hyperstack::Connection.show_diagnostics

        if !Hyperstack.on_server? && Hyperstack::Connection.root_path
          send_to_server(operation, data, model.__synchromesh_update_time) rescue nil # fails if server no longer running so ignore
        else
          SendPacket.run(data, operation: operation, updated_at: model.__synchromesh_update_time)
        end
      end
    rescue ActiveRecord::StatementInvalid => e
      raise e unless e.message == "Could not find table 'hyperstack_connections'"
    end unless RUBY_ENGINE == 'opal'

    def self.send_to_server(operation, data, updated_at)
      salt = SecureRandom.hex
      authorization = Hyperstack.authorization(salt, data[:channel], data[:broadcast_id])
      raise 'no server running' unless Hyperstack::Connection.root_path
      Timeout::timeout(Hyperstack.send_to_server_timeout) do
        SendPacket.remote(
          Hyperstack::Connection.root_path,
          data,
          operation: operation,
          updated_at: updated_at,
          salt: salt,
          authorization: authorization
        ).tap { |p| raise p.error if p.rejected? }
      end
    rescue Timeout::Error
      puts "\n********* FAILED TO RECEIVE RESPONSE FROM SERVER WITHIN #{Hyperstack.send_to_server_timeout} SECONDS. CHANGES WILL NOT BE SYNCED ************\n"
      raise 'no server running'
    end unless RUBY_ENGINE == 'opal'

    class SendPacket < Hyperstack::ServerOp
      param authorization: nil, nils: true
      param salt: nil
      param :operation
      param :broadcast_id
      param :channel
      param :channels
      param :klass
      param :record
      param :operation
      param :previous_changes
      param :updated_at

      unless RUBY_ENGINE == 'opal'
        validate do
          params.authorization.nil? ||
          Hyperstack.authorization(
            params.salt, params.channel, params.broadcast_id
          ) == params.authorization
        end
        dispatch_to do
          # No need to broadcast if the changes are filtered out by a policy
          params.channel unless params.operation == :change && params.previous_changes.empty?
        end
      end
    end

    SendPacket.on_dispatch do |params|
      in_transit[params.broadcast_id].receive(params) do |broadcast|
        if params.operation == :destroy
          ReactiveRecord::Collection.sync_scopes broadcast
        else
          ReactiveRecord::Collection.sync_scopes broadcast.process_previous_changes
        end
      end
    end if RUBY_ENGINE == 'opal'

    def self.to_self(record, data = {})
      # simulate incoming packet after a local save
      operation = if record.new_record?
                    :create
                  elsif record.destroyed?
                    :destroy
                  else
                    :change
                  end
      dummy_broadcast = new.local(operation, record, data)
      record.backing_record.sync! data unless operation == :destroy
      ReactiveRecord::Collection.sync_scopes dummy_broadcast
    end

    def record_with_current_values
      ReactiveRecord::Base.load_data do
        backing_record = @backing_record || klass.find(record[klass.primary_key]).backing_record
        if destroyed?
          backing_record.ar_instance
        else
          merge_current_values(backing_record)
        end
      end
    end

    def record_with_new_values
      klass._react_param_conversion(record).tap do |ar_instance|
        if destroyed?
          ar_instance.backing_record.destroy_associations
        elsif new?
          ar_instance.backing_record.initialize_collections
        end
      end
    end

    def new?
      @is_new
    end

    def destroyed?
      @destroyed
    end

    def local?
      @is_local
    end

    def klass
      Object.const_get(@klass)
    end

    def to_s
      "klass: #{klass} record: #{record} new?: #{new?} destroyed?: #{destroyed?}"
    end

    # private

    attr_reader :record
    attr_reader :updated_at

    def self.open_channels
      @open_channels ||= Set.new
    end

    def self.in_transit
      @in_transit ||= Hash.new { |h, k| h[k] = new(k) }
    end

    def initialize(id = nil)
      @id = id
      @received = Set.new
      @record = {}
      @previous_changes = {}
    end

    def local(operation, record, data)
      @destroyed = operation == :destroy
      @is_local = true
      @is_new = operation == :create
      @klass = record.class.name
      @record = data
      record.backing_record.destroyed = false
      @record[record.primary_key] = record.id if record.id
      record.backing_record.destroyed = @destroyed
      @backing_record = record.backing_record
      @previous_changes = record.changes
      self
    end

    def receive(params)
      @destroyed = params.operation == :destroy
      @channels ||= Hyperstack::IncomingBroadcast.open_channels.intersection params.channels
      @received << params.channel
      @klass ||= params.klass
      @record.merge! params.record
      @previous_changes.merge! params.previous_changes
      @updated_at = params.updated_at
      ReactiveRecord::Base.when_not_saving(klass) do
        @backing_record = ReactiveRecord::Base.exists?(klass, params.record[klass.primary_key])

        # first check to see if we already destroyed it and if so exit the block
        break if @backing_record&.destroyed

        # We ignore whether the record is being created or not, and just check and see if in our
        # local copy we have ever loaded this id before.  If we have then its not new to us.
        # BUT if we are destroying a record then it can't be treated as new regardless.
        # This is because we might be just doing a count on a scope and so no actual records will
        # exist.  Treating a destroyed record as "new" would cause us to first increment the
        # scope counter and then decrement for the destroy, resulting in a nop instead of a -1 on
        # the scope count.
        @is_new = !@backing_record&.id_loaded? && !@destroyed

        # it is possible that we are recieving data on a record for which we are also waiting
        # on an an inital data load in which case we have not yet set the loaded id, so we
        # set if now.
        @backing_record&.loaded_id = params.record[klass.primary_key]

        # once we have received all the data from all the channels (applies to create and update only)
        # we yield and process the record

        # pusher fake can send duplicate records which will result in a nil broadcast
        # so we also check that before yielding
        if @channels == @received && (broadcast = complete!)
          yield broadcast
        end
      end
    end

    def complete!
      self.class.in_transit.delete @id
    end

    def value_changed?(attr, value)
      attrs = @backing_record.synced_attributes
      return true if attr == @backing_record.primary_key
      return attrs[attr] != @backing_record.convert(attr, value) if attrs.key?(attr)

      assoc = klass.reflect_on_association_by_foreign_key attr

      return value unless assoc
      child = attrs[assoc.attribute]
      return value != child.id if child
      value
    end

    def integrity_check
      @previous_changes.each do |attr, value|
        next if @record.key?(attr) && @record[attr] == value.last
        Hyperstack::Component::IsomorphicHelpers.log "Broadcast contained change to #{attr} -> #{value.last} "\
                                     "without corresponding value in attributes (#{@record}).\n",
                                     :error
        raise "Broadcast Integrity Error"
      end
    end

    def process_previous_changes
      return self unless @backing_record
      integrity_check
      return self if destroyed?
      @record.dup.each do |attr, value|
        next if value_changed?(attr, value)
        @record.delete(attr)
        @previous_changes.delete(attr)
      end
      self
    end

    def merge_current_values(br)
      current_values = Hash[*@previous_changes.collect do |attr, values|
        value = attr == klass.primary_key ? record[klass.primary_key] : values.first
        if br.attributes.key?(attr) &&
           br.attributes[attr] != br.convert(attr, value) &&
           br.attributes[attr] != br.convert(attr, values.last)
          Hyperstack::Component::IsomorphicHelpers.log "warning #{attr} has changed locally - will force a reload.\n"\
               "local value: #{br.attributes[attr]} remote value: #{br.convert(attr, value)}->#{br.convert(attr, values.last)}",
               :warning
          return nil
        end
        [attr, value]
      end.compact.flatten(1)]
      klass._react_param_conversion(br.attributes.merge(current_values))
    end
  end
end