lib/cellect/client/connection.rb in cellect-client-0.1.3 vs lib/cellect/client/connection.rb in cellect-client-1.0.0
- old
+ new
@@ -5,74 +5,74 @@
module Client
class CellectServerError < StandardError; end
class Connection
include Celluloid
include Celluloid::IO
-
+
# Reload the data for a workflow on all servers
def reload_workflow(id)
broadcast :post, "/workflows/#{ id }/reload"
end
-
+
# Remove the workflow from all servers
def delete_workflow(id)
broadcast :delete, "/workflows/#{ id }"
end
-
+
# Adds or updates a subject on all servers
def add_subject(id, workflow_id:, group_id: nil, priority: nil)
broadcast :put, "/workflows/#{ workflow_id }/add", querystring(subject_id: id, group_id: group_id, priority: priority)
end
-
+
# Removes a subject on all servers
def remove_subject(id, workflow_id:, group_id: nil)
broadcast :put, "/workflows/#{ workflow_id }/remove", querystring(subject_id: id, group_id: group_id)
end
-
+
# Preload a user on a server
def load_user(user_id:, host:, workflow_id:)
send_http host, :post, "/workflows/#{ workflow_id }/users/#{ user_id }/load"
end
-
+
# Adds a subject to a users seen set
def add_seen(subject_id:, user_id:, host:, workflow_id:)
send_http host, :put, "/workflows/#{ workflow_id }/users/#{ user_id }/add_seen", querystring(subject_id: subject_id)
end
-
+
# Gets unseen subjects for a user
def get_subjects(user_id:, host:, workflow_id:, limit: nil, group_id: nil)
response = send_http host, :get, "/workflows/#{ workflow_id }", querystring(user_id: user_id, group_id: group_id, limit: limit)
ensure_valid_response response
MultiJson.load response.body
end
-
+
protected
-
+
# Broadcast by iterating over each server
def broadcast(action, path, query = '')
Cellect::Client.node_set.nodes.each_pair do |node, host|
send_http host, action, path, query
end
end
-
+
# Makes an API call through an evented Celluloid Socket
def send_http(host, action, path, query = '')
params = { host: host, path: path }
params[:query] = query if query && !query.empty?
uri = URI::HTTP.build params
HTTP.send action, uri.to_s, socket_class: Celluloid::IO::TCPSocket
end
-
+
# Builds a querystring from a hash
def querystring(hash = { })
[].tap do |list|
hash.each_pair do |key, value|
next unless value
list << "#{ key }=#{ value }"
end
end.join('&')
end
-
+
# Ensure the API response was OK
def ensure_valid_response(response)
unless response.code == 200
raise CellectServerError, "Server Responded #{ response.code }"
end