# -*- encoding: binary -*- begin require 'json' rescue LoadError raise LoadError, "either json or json-pure is required" end require 'rack' module Upr # JSON protocol based on Lighttpd's mod_uploadprogress # http://trac.lighttpd.net/trac/wiki/Docs:ModUploadProgress class JSON < Struct.new(:frequency, :backend, :upload_id) include Params # We use this in case length is nil when clients send chunked uploads INT_MAX = 0x7fffffff SLEEP_CLASS = defined?(Actor) ? Actor : Kernel # our default response headers, we need to set no-transform to # prevent deflaters from compressing our already-small small input # and also to prevent buffering/corking of the response inside # deflater buffers. RESPONSE_HEADERS = { 'Content-Type' => 'application/json', 'Cache-Control' => 'no-cache, no-transform', } def initialize(options = {}) super(options[:frequency] || 1, options[:backend], options[:upload_id]) # support :drb for compatibility with mongrel_upload_progress if options[:drb] backend and raise ArgumentError, ":backend and :drb are incompatible" require 'drb' DRb.start_service self.backend = DRbObject.new(nil, options[:drb]) elsif String === backend # allow people to use strings in case their backend gets # lazy-loaded (like an ActiveRecord model) self.backend = eval(backend) elsif backend.nil? raise ArgumentError, "backend MUST be specified" end # only for use with rails_proc upload_id.nil? and self.upload_id = options[:env] end def rails_render_options env = upload_id self.upload_id = extract_upload_id(env) text = if Rack::Request.new(env).GET.include?("long") Proc.new { |response,output| each { |line| output.write(line) } } else _once end { :content_type => 'application/json', :text => text } end def _once if status = backend.read(upload_id) if status.done? _json_object(:state => 'done') elsif status.seen == 0 _json_object(:state => 'starting') elsif status.error? _error_msg("upload failed") else _update_msg(status) end else timeout = Time.now + 2 until status = backend.read(upload_id) SLEEP_CLASS.sleep(0.1) return _error_msg("couldn't get status") if Time.now > timeout end _json_object(:state => 'starting') end end # Rack interface reservced for future use with streaming AJAX def call(env) if uid = extract_upload_id(env) _wrap(env, uid) else [ 400, RESPONSE_HEADERS.dup, [ _error_msg("upload_id not given") ] ] end end # Rack interface reservced for future use with streaming AJAX def each(&block) sleeper = defined?(Actor) ? Actor : Kernel timeout = Time.now + 2 eol = ";\n" yield _json_object(:state => 'starting') << eol begin until status = backend.read(upload_id) sleeper.sleep(0.1) break if Time.now > timeout end if status begin yield _update_msg(status) << eol break if status.done? sleeper.sleep(frequency) end while status = backend.read(upload_id) yield _json_object(:state => 'done') << eol else yield _error_msg("couldn't get status") << eol end rescue => e yield _error_msg(e.message) << eol end end # Rack interface reservced for future use with streaming AJAX def _wrap(env, uid) _self = dup _self.upload_id = uid [ 200, RESPONSE_HEADERS.dup, _self ] end def _error_msg(msg) _json_object(:state => 'error', :status => 400, :message => msg) end def _json_object(options) # $stderr.syswrite "#{options.inspect} #{$upr.inspect}\n" options.to_json end def _update_msg(status) raise "client error" if status.error? received = status.seen size = status.length || INT_MAX _json_object(:state => 'uploading', :size => size, :received => received) end end end