# curl-multi - Ruby bindings for the libcurl multi interface # Copyright (C) 2007 Philotic, Inc. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . $:.unshift File.dirname(__FILE__) require 'rubygems' require 'inline' require 'uri-extensions' require 'curb' module Curl class Error < RuntimeError def initialize(code) super("Curl error #{code}") end end class HTTPError < RuntimeError def initialize(status) super("Curl HTTP error #{status}") end end class MultiError < RuntimeError attr_reader :errors def initialize(errors) @errors = errors super("Multiple errors: #{errors}") end end # An instance of this class collects the response for each curl easy handle. class Req CURLE_OK = 0 attr_reader :url, :body def done(code, status) @done = true if code != CURLE_OK @error = Error.new(code) elsif status != 200 @error = HTTPError.new(status) end end def done?() @done end def failed?() !!@error end def do_success() @success.call(body) end def do_failure() @failure.call(@error) end def inspect() "{Curl::Req #{url}}" end alias_method :to_s, :inspect def initialize(url, success, failure) @url = url @success = success @failure = failure @body = '' @done = false @error = nil end def add_chunk(chunk) @body = body + chunk end end class Multi def size() @handles.size end def inspect() '{Curl::Multi' + @handles.map{|h| ' '+h.url}.join + '}' end alias_method :to_s, :inspect def get(url, success, failure=lambda{}) add(url, nil, success, failure) end def post(url, params, success, failure=lambda{}) add(url, URI.escape_params(params), success, failure) end def select(rfds, wfds) loop do ready_rfds, ready_wfds = c_select(rfds.map{|s|s.fileno}, wfds.map{|s|s.fileno}) work() # Curl may or may not have work to do, but we can't tell. return ready_rfds, ready_wfds if ready_rfds.any? or ready_wfds.any? return [], [] if rfds == [] and wfds == [] end end # Do as much work as possible without blocking on the network. That is, # read as much data as is ready and write as much data as we have buffer # space for. If any complete responses arrive, call their handlers. def work perform() done, @handles = @handles.partition{|h| h.done?} failed, done = done.partition{|h| h.failed?} errors = [] errors += post_process(done) {|x| x.do_success} errors += post_process(failed) {|x| x.do_failure} raise errors[0] if errors.size == 1 raise MultiError.new(errors) if errors.size > 1 :ok end def cleanup @handles = [] :ok end def initialize @handles = [] end # Add a URL to the queue of items to fetch def add(url, body, success, failure=lambda{}) while (h = add_to_curl(Req.new(url, success, failure), url, body)) == nil select([], []) end @handles << h work() :ok end def post_process(handles) errors = [] handles.each do |h| begin yield(h) # This ought not to raise anything. rescue => ex errors << ex end end return errors end inline do |builder| builder.include('') if File.exists?('/usr/include/curl/curl.h') builder.include('"/usr/include/curl/curl.h"') else builder.include('"/usr/local/include/curl/curl.h"') end builder.prefix <<-end #define GET_MULTI_HANDLE(name) \ CURLM *(name);\ Data_Get_Struct(self, CURLM, (name));\ if (!(name)) rb_raise(rb_eTypeError,\ "Expected initialized curl multi handle") #define CHECK(e) do {\ if (!(e)) rb_raise(rb_eTypeError, "curl error");\ } while (0) #define CHECKN(e) CHECK(!(e)) ID id_initialize, id_done, id_add_chunk, id_size; end builder.c_raw_singleton <<-end void c_curl_multi_cleanup(void *x) { curl_multi_cleanup(x); } end # Creates a new CurlMulti handle builder.c_singleton <<-end VALUE new() { CURLcode r; VALUE inst; /* can't think of a better place to put this */ id_initialize = rb_intern("initialize"); id_done = rb_intern("done"); id_add_chunk = rb_intern("add_chunk"); id_size = rb_intern("size"); /* must be called at least once before any other curl functions */ r = curl_global_init(CURL_GLOBAL_ALL); CHECKN(r); inst = Data_Wrap_Struct(self, 0, c_curl_multi_cleanup, curl_multi_init()); rb_funcall(inst, id_initialize, 0); return inst; } end # We tell CurlEasy handles to write to this function in the constructor. # The self argument is a parameter we set up with CURLOPT_WRITEDATA that # lets us pass whatever data we'd like into the write fn. builder.c_raw_singleton <<-end uint c_add_chunk(char *chunk, uint size, uint nmemb, VALUE rb_req) { uint bytes = size * nmemb; /* Number of bytes of data */ if (bytes == 0) return 0; /* This is (rubyInstance, methodId, numArgs, arg1, arg2...) */ rb_funcall(rb_req, id_add_chunk, 1, rb_str_new(chunk, bytes)); return bytes; } end # Adds an easy handle to the multi handle's list builder.c <<-end VALUE add_to_curl(VALUE rb_req, VALUE url, VALUE body) { CURLMcode r; GET_MULTI_HANDLE(multi_handle); /* We start getting errors if we have too many open connections at once, so make a hard limit. */ if (FIX2INT(rb_funcall(self, id_size, 0)) > 500) return Qnil; CURL *easy_handle = curl_easy_init(); char *c_url = StringValuePtr(url); /* Pass it the URL */ curl_easy_setopt(easy_handle, CURLOPT_URL, c_url); /* GET or POST? */ if (body != Qnil) { char *c_body = StringValuePtr(body); uint body_sz = RSTRING(body)->len; curl_easy_setopt(easy_handle, CURLOPT_POST, 1); curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDS, c_body); curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDSIZE, body_sz); } /* Tell curl to use our callbacks */ curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, c_add_chunk); /* Make curl give us a ruby pointer in the callbacks */ curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, rb_req); curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, rb_req); /* Add it to the multi handle */ r = curl_multi_add_handle(multi_handle, easy_handle); CHECKN(r); return rb_req; } end # Wait until one of the given fds or one of curl's fds is ready. # Return two arrays of fds that are ready. builder.c <<-end void c_select(VALUE rfda, VALUE wfda) { int i, r, n = -1; long timeout; fd_set rfds, wfds, efds; CURLMcode cr; VALUE ready_rfda, ready_wfda; struct timeval tv = {0, 0}; GET_MULTI_HANDLE(multi_handle); FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); /* Put curl's fds into the sets. */ cr = curl_multi_fdset(multi_handle, &rfds, &wfds, &efds, &n); CHECKN(cr); /* Put the given fds into the sets. */ for (i = 0; i < RARRAY(rfda)->len; i++) { int fd = FIX2INT(RARRAY(rfda)->ptr[i]); FD_SET(fd, &rfds); n = n > fd ? n : fd; } for (i = 0; i < RARRAY(wfda)->len; i++) { int fd = FIX2INT(RARRAY(wfda)->ptr[i]); FD_SET(fd, &wfds); n = n > fd ? n : fd; } cr = curl_multi_timeout(multi_handle, &timeout); CHECKN(cr); tv.tv_sec = timeout / 1000; tv.tv_usec = (timeout * 1000) % 1000000; /* Wait */ r = select(n + 1, &rfds, &wfds, &efds, (timeout < 0) ? NULL : &tv); if (r < 0) rb_raise(rb_eRuntimeError, "select(): %s", sys_errlist[errno]); ready_rfda = rb_ary_new(); ready_wfda = rb_ary_new(); /* Collect the fds that are ready */ for (i = 0; i < RARRAY(rfda)->len; i++) { VALUE fd = FIX2INT(RARRAY(rfda)->ptr[i]); if (FD_ISSET(fd, &rfds)) rb_ary_push(ready_rfda, INT2FIX(fd)); } for (i = 0; i < RARRAY(wfda)->len; i++) { VALUE fd = FIX2INT(RARRAY(wfda)->ptr[i]); if (FD_ISSET(fd, &wfds)) rb_ary_push(ready_wfda, INT2FIX(fd)); } return rb_ary_new3(2, ready_rfda, ready_wfda); } end # Basically just a wrapper for curl_multi_perform(). builder.c <<-end void perform() { CURLMsg *msg; CURLcode er; CURLMcode r; int status; int running; GET_MULTI_HANDLE(multi_handle); /* do some work */ do { r = curl_multi_perform(multi_handle, &running); } while (r == CURLM_CALL_MULTI_PERFORM); CHECK(r == CURLM_OK); /* check which ones are done and mark them as done */ while ((msg = curl_multi_info_read(multi_handle, &r))) { VALUE rb_req; CURL *easy_handle; if (msg->msg != CURLMSG_DONE) continue; /* Save out the easy handle, because the msg struct becomes invalid * whene we call curl_easy_cleanup, curl_multi_remove_handle, or * curl_easy_cleanup. */ easy_handle = msg->easy_handle; r = curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &rb_req); CHECKN(r); er = curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &status); CHECKN(er); rb_funcall(rb_req, id_done, 2, INT2FIX(msg->data.result), INT2FIX(status)); r = curl_multi_remove_handle(DATA_PTR(self), easy_handle); CHECKN(r); /* Free the handle */ curl_easy_cleanup(easy_handle); } } end end end end