require 'json' require 'flydata/api/base' module Flydata module Api class DataEntry < Base def initialize(api_client) @model_name = 'data_entry' @url_path = "/data_ports/:data_port_id/#{@model_name.pluralize}" super end # Response example (/buffer_stat.json for a Sync data entry # { # "complete": true, # "state": "complete", # "stat": { # "buffer": { # "proxy": { # "buffer_size": 0, # "queue_length": 0 # }, # "redshift": { # "buffer_size": 0, # "queue_length": 0 # }, # "sync_redshift": { # "buffer_size": 0, # "queue_length": 0 # } # }, # "queue": { # "copy_queue": { # "num_items": 0 # }, # "query_queue": { # "num_items": 0 # } # } # }, # "message": "", # "success": true # } NUM_TABLES_PER_REQUEST = 150 def buffer_stat(data_entry_id, options = {}) table_array = options[:tables] || [''] table_array = [''] if table_array.empty? results = [] error = false table_array.each_slice(NUM_TABLES_PER_REQUEST) do |ta| tables = ta.join(',') result = @client.post("/#{@model_name.pluralize}/#{data_entry_id}/buffer_stat/#{options[:mode]}", nil, {tables: tables}) unless result["state"] error = result["message"] break end results << result end if error # mimicking the error value of /buffer_stat API {"complete"=> false, "message" => error, "success" => true, } else stat = results.inject({"buffer"=>{},"queue"=>{}}) do |h, result| %w[redshift sync_redshift].each do |fluent_process| %w[buffer_size queue_length].each do |stat| if result["stat"]["buffer"][fluent_process] && result["stat"]["buffer"][fluent_process][stat] h["buffer"][fluent_process] ||= {} h["buffer"][fluent_process][stat] ||= 0 h["buffer"][fluent_process][stat] += result["stat"]["buffer"][fluent_process][stat] end end end %w[copy_queue query_queue].each do |queue| if result["stat"]["queue"][queue] && result["stat"]["queue"][queue]["num_items"] h["queue"][queue] ||= {} h["queue"][queue]["num_items"] ||= 0 h["queue"][queue]["num_items"] += result["stat"]["queue"][queue]["num_items"] end end h end if proxy = results.first["stat"]["buffer"]["proxy"] # proxy values are not per-table stat["buffer"]["proxy"] ||= {} stat["buffer"]["proxy"]["buffer_size"] = proxy["buffer_size"] stat["buffer"]["proxy"]["queue_length"] = proxy["queue_length"] end # TODO Refactor. The logic below is copied from web data_entries_controller buffer_empty = stat["buffer"].all?{|k,v| v["buffer_size"] == 0} queue_empty = stat["queue"].all?{|k,v| v["num_items"] == 0} complete = (buffer_empty && queue_empty) state = if complete 'complete' elsif !buffer_empty 'processing' else 'uploading' end message = results.count == 1 ? results.first["message"] : build_message(stat, complete) { "complete"=>complete, "state" => state, "stat" => stat, "message" => message, "success" => true, } end end # Response example (/table_status.json) # { # "table_status": [ # { # "updated_at": 1462586673, # "status": "ready", # "table_name": "items", # "seq": 0, # "created_at": 1462586673, # "src_pos": "-", # "records_copied": "0", # "num_items": 0, # "next_item": false # }, # { # "updated_at": 1462586706, # "status": "ready", # "table_name": "users", # "seq": 0, # "created_at": 1462586674, # "src_pos": "-", # "records_copied": "0", # "records_sent": "8", # "num_items": 0, # "next_item": false # } # ] # } def table_status(data_entry_id, options = {}) table_array = options[:tables] || [''] results = [] table_array.each_slice(NUM_TABLES_PER_REQUEST) do |ta| tables = ta.join(',') result = @client.post("/#{@model_name.pluralize}/#{data_entry_id}/table_status", nil, {tables: tables}) results << result end results.inject({"table_status"=>[], "success"=>true}) do |h, result| h["table_status"] += result["table_status"] h end end def cleanup_sync(data_entry_id, tables, options = {}) params = options.merge({tables: tables.join(',')}) @client.post("/#{@model_name.pluralize}/#{data_entry_id}/cleanup_sync", nil, params) end # Update validity of tables # -- Params ( Agent 0.7.4 and older ) # params = { # updated_tables: { "bad_table"=>"error reason", "good_table"=>nil} # } # -- Params ( Agent 0.7.5 and later ) # table_update_hash = { # updated_tables: { # "" => { # "invalid_table_reason" => "", # "uk_as_pk" => ['xx', ...], # } # } # } # - If value is nil, the setting will be deleted. # - If table's attribute hash doesn't have an attribute key, the setting for the attribute for the table will not be changed def update_table_validity(data_entry_id, table_update_hash) @client.post("/#{@model_name.pluralize}/#{data_entry_id}/update_table_validity", {:headers => {:content_type => :json}}, table_update_hash.to_json) end # Tells the server that an initial sync has completed # stats_hash: {"init_sync_stats":{"Table1": 100, "Table2": 12345}} # Sent 100 records for Table1, Sent 12345 records for Table2 def complete_init_sync(data_entry_id, stats_hash) @client.post("/#{@model_name.pluralize}/#{data_entry_id}/complete_init_sync", {:headers => {:content_type => :json}}, stats_hash.to_json) end private # TODO Refactor. This code is copied from web data_entries_controller def build_message(stat, complete) return "" if complete message = nil buffer_size = stat["buffer"].inject(0) {|m, (k, v)| m + v["buffer_size"]} chunk_num = stat["queue"].inject(0) {|m, (k, v)| m + v["num_items"]} if buffer_size > 0 message = " -> %s bytes remaining..." % {buffer_size: buffer_size.to_s.reverse.gsub(/...(?=.)/,'\&,').reverse} elsif chunk_num > 0 message = " -> %s data chunk(s) remaining..." % {chunk_num: chunk_num.to_s.reverse.gsub(/...(?=.)/,'\&,').reverse} else message = " -> Calculating remaining bytes..." end message end end end end