require 'socket' require 'bloom_filter/protocol' class BloomFilter class Client PACK_N = "N" def initialize(host, port, options = {}) @host, @port = host, port @timeout = options[:timeout] reconnect end def add(el) el = el.to_s @socket.write("#{[el.size + 1].pack(PACK_N)}#{Protocol::ADD}#{el}") timeout_or_default(false) do @socket.read(@socket.read(4).unpack(PACK_N).first) == Protocol::TRUE end end alias_method :<<, :add def include?(el) el = el.to_s @socket.write("#{[el.size + 1].pack(PACK_N)}#{Protocol::INCLUDE}#{el}") timeout_or_default(true) do @socket.read(@socket.read(4).unpack(PACK_N).first) == Protocol::TRUE end end def &(els) if els.size == 1 el = els.first self.include?(el) ? [el] : [] else elements = els.collect { |el| el.to_s }.join(Protocol::DEFAULT_SEPARATOR) @socket.write("#{[elements.size + 1].pack(PACK_N)}#{Protocol::INCLUDE_MANY}#{elements}") timeout_or_default(els) do response = @socket.read(@socket.read(4).unpack(PACK_N).first) result = [] els.size.times do |i| result << els[i] if response[i,1] == Protocol::TRUE end result end end end def dump(path, timeout = nil) @socket.write("#{[path.size + 1].pack(PACK_N)}#{Protocol::DUMP}#{path}") timeout_or_default(false, timeout) do @socket.read(@socket.read(4).unpack(PACK_N).first) == Protocol::TRUE end end def load(path, timeout = nil) @socket.write("#{[path.size + 1].pack(PACK_N)}#{Protocol::LOAD}#{path}") timeout_or_default(false, timeout) do @socket.read(@socket.read(4).unpack(PACK_N).first) == Protocol::TRUE end end def reconnect @socket.close if @socket && !@socket.closed? @socket = begin TCPSocket.new(@host, @port) rescue Exception => e nil end end def connected? !!(@socket && !@socket.closed?) end private def timeout_or_default(default, timeout = nil, &block) ready = IO.select([@socket], nil, nil, timeout || @timeout) if ready block.call else default end end end end