Sha256: e9d249a6e32832257e918331e8476ec65b2f3c9c16a6007addc583e604040ce3
Contents?: true
Size: 1.48 KB
Versions: 7
Compression:
Stored size: 1.48 KB
Contents
#!/usr/bin/env ruby require 'optparse' require 'sidekiq' max_tries = 10 cutoff_tries = 100 options = { atomic_fetch: false } OptionParser.new do |opts| opts.banner = "Usage: sidekiqfail [options]" opts.on("-a", "--atomic-fetch", "Run the Sidekiq worker with the atomic-fetch fetcher") do |v| options[:atomic_fetch] = v end end.parse! Sidekiq.configure_client do |config| config.redis = { db: 13 } end def processed Sidekiq.redis { |conn| conn.get('done') }.to_i end def total Sidekiq.redis { |conn| conn.get('total_enqueued') }.to_i end def inflight counter = it = 0 loop do it, keys = Sidekiq.redis { |c| c.scan(it, match: 'flight:*') } counter += keys.count it = it.to_i break if it == 0 end counter end def pending total - processed end def print_report Sidekiq.logger.error "Queued: #{total}" Sidekiq.logger.error "Processed: #{processed}" Sidekiq.logger.error "Lost: #{pending}" end args = ['-f', 'true', '-b', '10', '-j', '1000', '-t', '0.01'] args.push('-a') if options[:atomic_fetch] pid = Process.fork { Process.exec('./bin/sidekiqload', *args) } loop do Process.wait(pid) puts "Processed before failure #{processed} out of #{total}" break if pending == 0 || max_tries == 0 || cutoff_tries == 0 max_tries -= 1 if inflight == 0 cutoff_tries -= 1 args = ['-f', 'false', '-b', '0', '-j', '0', '-t', '0.01'] args.push('-a') if options[:atomic_fetch] pid = Process.fork { Process.exec('./bin/sidekiqload', *args) } end print_report
Version data entries
7 entries across 7 versions & 1 rubygems