lib/embulk/input/jira_api/client.rb in embulk-input-jira-0.2.4 vs lib/embulk/input/jira_api/client.rb in embulk-input-jira-0.2.5

- old
+ new

@@ -1,39 +1,77 @@ require "jiralicious" require "parallel" +require "limiter" require "embulk/input/jira_api/issue" require "timeout" module Embulk module Input module JiraApi class Client - PARALLEL_THREAD_COUNT = 50 - SEARCH_TIMEOUT_SECONDS = 5 - SEARCH_ISSUES_TIMEOUT_SECONDS = 60 + MAX_RATE_LIMIT = 50 + MIN_RATE_LIMIT = 2 + # Normal http request timeout is 300s + SEARCH_ISSUES_TIMEOUT_SECONDS = 300 DEFAULT_SEARCH_RETRY_TIMES = 10 + def initialize + @rate_limiter = Limiter::RateQueue.new(MAX_RATE_LIMIT, interval: 2) + end + def self.setup(&block) Jiralicious.configure(&block) new end def search_issues(jql, options={}) - timeout_and_retry(SEARCH_ISSUES_TIMEOUT_SECONDS) do - issues_raw = search(jql, options).issues_raw - - # TODO: below code has race-conditon. - Parallel.map(issues_raw, in_threads: PARALLEL_THREAD_COUNT) do |issue_raw| - # https://github.com/dorack/jiralicious/blob/v0.4.0/lib/jiralicious/search_result.rb#L32-34 - issue = Jiralicious::Issue.find(issue_raw["key"]) - JiraApi::Issue.new(issue) + issues_raw = search(jql, options).issues_raw + # Maximum number of issues to retrieve is 50 + rate_limit = MAX_RATE_LIMIT + success_items = [] + fail_items = [] + error_object = nil + timeout_and_retry(SEARCH_ISSUES_TIMEOUT_SECONDS * MAX_RATE_LIMIT ) do + retry_count = 0 + semaphore = Mutex.new + @rate_limiter = Limiter::RateQueue.new(rate_limit, interval: 2) + error_object = nil + while issues_raw.length > 0 && retry_count <= DEFAULT_SEARCH_RETRY_TIMES do + Parallel.each(issues_raw, in_threads: rate_limit) do |issue_raw| + # https://github.com/dorack/jiralicious/blob/v0.4.0/lib/jiralicious/search_result.rb#L32-34 + begin + issue = find_issue(issue_raw["key"]) + semaphore.synchronize { + success_items.push(JiraApi::Issue.new(issue)) + } + rescue MultiJson::ParseError => e + html = e.message + title = html[%r|<title>(.*?)</title>|, 1] + # 401 due to high number of concurrent requests with current account + # The number of concurrent requests is not fixed by every account + # Hence catch the error item and retry later + raise title if title != "Unauthorized (401)" + semaphore.synchronize { + fail_items.push(issue_raw) + error_object = e + } + end + end + retry_count += 1 + rate_limit = calculate_rate_limit(rate_limit, issues_raw.length, fail_items.length, retry_count) + issues_raw = fail_items + fail_items = [] + raise error_object if retry_count > DEFAULT_SEARCH_RETRY_TIMES && !error_object.nil? + # Sleep after some seconds for JIRA API perhaps under the overload + sleep retry_count if fail_items.length > 0 end + success_items end end def search(jql, options={}) - timeout_and_retry(SEARCH_TIMEOUT_SECONDS) do + timeout_and_retry(SEARCH_ISSUES_TIMEOUT_SECONDS) do Jiralicious.search(jql, options) end end def total_count(jql) @@ -51,14 +89,23 @@ html = e.message title = html[%r|<title>(.*?)</title>|, 1] #=> e.g. "Unauthorized (401)" raise ConfigError.new("Can not authorize with your credential.") if title == 'Unauthorized (401)' end + # Calculate rate limit based on previous run result + # Return 2 MIN_RATE_LIMIT in case turning from the 5th times or success_items is less than 2 + # Otherwise return the min number between fail_items, success_items and current_limit + def calculate_rate_limit(current_limit, all_items, fail_items, times) + success_items = all_items - fail_items + return MIN_RATE_LIMIT if times >= DEFAULT_SEARCH_RETRY_TIMES/2 || success_items < MIN_RATE_LIMIT + return [fail_items, success_items, current_limit].min + end + private def timeout_and_retry(wait, retry_times = DEFAULT_SEARCH_RETRY_TIMES, &block) - count = 1 + count = 0 begin Timeout.timeout(wait) do yield end rescue Jiralicious::JqlError, Jiralicious::AuthenticationError, Jiralicious::NotLoggedIn, Jiralicious::InvalidLogin => e @@ -70,31 +117,27 @@ # same as this Mailchimp plugin issue: https://github.com/treasure-data/embulk-output-mailchimp/issues/10 # (a) JIRA returns error as HTML, but HTTParty try to parse it as JSON. # And (b) `search_issues` method has race-condition bug. If it occurred, MultiJson::ParseError raised too. html = e.message title = html[%r|<title>(.*?)</title>|, 1] #=> e.g. "Unauthorized (401)" - if title - # (a) - case title - when "Atlassian Cloud Notifications - Page Unavailable" - # a.k.a. HTTP 503 - raise title - when "Unauthorized (401)" - Embulk.logger.warn "JIRA returns error: #{title}. Will go to retry" - count += 1 - retry - end - else - # (b) - count += 1 - retry - end + raise title if title == "Atlassian Cloud Notifications - Page Unavailable" + count += 1 + raise title.nil? ? "Unknown Error" : title if count > retry_times + Embulk.logger.warn "JIRA returns error: #{title == 'Unauthorized (401)' ? title + " due to overloading API requests. Retrying on failed items only" : title}." + sleep count + retry rescue Timeout::Error => e count += 1 - sleep count # retry after some seconds for JIRA API perhaps under the overload raise e if count > retry_times + Embulk.logger.warn "Time out error." + sleep count # retry after some seconds for JIRA API perhaps under the overload retry end + end + + def find_issue(issue_key) + @rate_limiter.shift + Jiralicious::Issue.find(issue_key) end end end end end