/* WebROaR - Ruby Application Server - http://webroar.in/ * Copyright (C) 2009 Goonj LLC * * This file is part of WebROaR. * * WebROaR is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * WebROaR is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with WebROaR. If not, see . */ #include #include #include #include extern int is_alive; wkr_ctl_t* wkr_ctl_new() { LOG_FUNCTION wkr_ctl_t *ctl = wr_malloc(wkr_ctl_t); if(ctl == NULL) { return NULL; } ctl->fd = -1; ctl->msg_size = ctl->bytes_read = 0; ctl->scgi = NULL; return ctl; } void wkr_ctl_free(wkr_ctl_t **c) { LOG_FUNCTION wkr_ctl_t *ctl = *c; if(ctl) { if(ctl->fd > 0) close(ctl->fd); if(ctl->scgi) scgi_free(ctl->scgi); free(ctl); } *c = NULL; } /** Process control message */ static inline void ctl_msg_process(wkr_t* w) { LOG_FUNCTION wkr_ctl_t *ctl = w->ctl; char *value; scgi_t* ctl_req = ctl->scgi; ctl->scgi = NULL; value = (char*) scgi_header_value_get(ctl_req, "METHOD"); if(value) { if(strcmp(value,"ADD")==0) { // Response of ADD method value = (char*) scgi_header_value_get(ctl_req, "STATUS"); if(value && strcmp(value, "OK")==0) { LOG_INFO("Worker connected with Head."); } else { LOG_ERROR(SEVERE,"Unable to connect with Head."); sigproc(); } } else if(strcmp(value,"REMOVE")==0) { //Request to REMOVE worker //TODO: need to send acknowledgement for clossing or not /*scgi_t* ctl_resp = scgi_new(); if(ctl_resp){ scgi_header_add(ctl_resp, "METHOD", strlen("METHOD"), "REMOVE", strlen("REMOVE")); scgi_header_add(ctl_resp, "STATUS", strlen("STATUS"), "OK", strlen("OK")); scgi_build(ctl_resp); send(control.fd, ctl_resp->request_buffer, ctl_resp->request_length, 0); scgi_free(ctl_resp); }*/ sigproc(); } else if(strcmp(value,"PING") == 0) { LOG_INFO("Worker got PING message"); scgi_t* ctl_resp = scgi_new(); if(ctl_resp) { scgi_header_add(ctl_resp, "COMPONENT", strlen("COMPONENT"), "WORKER", strlen("WORKER")); scgi_header_add(ctl_resp, "METHOD", strlen("METHOD"), "PING", strlen("PING")); scgi_header_add(ctl_resp, "STATUS", strlen("STATUS"), "OK", strlen("OK")); scgi_build(ctl_resp); ctl->scgi = ctl_resp; ev_io_start(w->loop, &ctl->w_write); } } } else { LOG_ERROR(SEVERE,"METHOD is missing"); } } /** Send SCGI control message */ void ctl_write_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) { LOG_FUNCTION wkr_t* w = (wkr_t*) watcher->data; wkr_ctl_t *ctl = w->ctl; if(revents & EV_ERROR) { ev_io_stop(loop, watcher); LOG_ERROR(SEVERE,"Error writing control message :%s",strerror(errno)); sigproc(); return; } if(scgi_send(ctl->scgi, watcher->fd) <= 0){ ev_io_stop(loop, watcher); LOG_ERROR(SEVERE,"Error writing control message :%s",strerror(errno)); sigproc(); return; } // Check message length if(ctl->scgi->bytes_sent >= ctl->scgi->length) { ev_io_stop(loop, watcher); LOG_DEBUG(DEBUG, "ctl_write_cb() message sent successfully"); scgi_free(ctl->scgi); ctl->scgi = NULL; } } /** Read control message */ void ctl_read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) { LOG_FUNCTION wkr_t* w = (wkr_t*) watcher->data; wkr_ctl_t *ctl = w->ctl; if(revents & EV_ERROR) { ev_io_stop(loop, watcher); is_alive = 0; LOG_ERROR(SEVERE,"Error reading control message :%s",strerror(errno)); return; } int bytesRead = recv(watcher->fd, ctl->msg + ctl->bytes_read, WR_MSG_SIZE - ctl->bytes_read, 0); if(bytesRead <= 0) { LOG_ERROR(SEVERE,"Error reading control message :%s",strerror(errno)); ev_io_stop(loop, watcher); is_alive = 0; return; } ctl->bytes_read += bytesRead; int i; for(i = 0 ; i < ctl->bytes_read ; i++) { if(ctl->msg[i] == ':') { break; } } if(i >= ctl->bytes_read) return; ctl->msg_size = atoi(ctl->msg); ctl->msg_size += (i+2); scgi_t* ctl_req = NULL; if(ctl->bytes_read >= ctl->msg_size) { ctl_req = scgi_parse(ctl->msg, ctl->msg_size); if(ctl_req == NULL ) { LOG_ERROR(SEVERE,"Cannot parse control message."); ev_io_stop(loop,watcher); is_alive = 0; return; } ctl->msg_size += atoi(scgi_header_value_get(ctl_req, SCGI_CONTENT_LENGTH)); if(ctl->bytes_read < ctl->msg_size) { scgi_free(ctl_req); return; } ctl->scgi = ctl_req; ctl_msg_process(w); scgi_free(ctl_req); } else { return; } ctl->bytes_read = ctl->msg_size = 0; } /** Send connect acknowledgement on internet socket */ int send_ack_on_internet_socket(wkr_t* w) { LOG_FUNCTION LOG_DEBUG(DEBUG,"send_ack_on_internet_socket() port = %i", w->listen_port); wkr_ctl_t *ctl = w->ctl; struct sockaddr_in addr; if ((ctl->fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { LOG_ERROR(WARN,"socket():%s",strerror(errno)); return -1; } setsocketoption(ctl->fd); LOG_DEBUG(DEBUG,"send_ack_on_internet_socket() FD for control.fd is %d",ctl->fd); int iport = atoi(w->tmp->ctl_path.str); addr.sin_family = AF_INET; // host byte order addr.sin_port = htons(iport); // short, network byte order addr.sin_addr.s_addr = htonl(INADDR_ANY); // auto-fill with my IP memset(addr.sin_zero, '\0', sizeof addr.sin_zero); if(connect(ctl->fd, (struct sockaddr *)&addr, sizeof addr) < 0) { LOG_ERROR(SEVERE,"Connection with controller failed:%s",strerror(errno)); return -1; } char buf[WR_SHORT_STR_LEN]; int len; pid_t pid = getpid(); scgi_t* add_request = scgi_new(); if(add_request == NULL) { LOG_ERROR(SEVERE,"Cannot create SCGI Request"); return -1; } // Construct SCGI request scgi_header_add(add_request, "COMPONENT", strlen("COMPONENT"), "WORKER", strlen("WORKER")); scgi_header_add(add_request, "METHOD", strlen("METHOD"), "ADD", strlen("ADD")); scgi_header_add(add_request, "APPLICATION", strlen("APPLICATION"), w->tmp->name.str, w->tmp->name.len); scgi_header_add(add_request, "UDS", strlen("UDS"), "NO", strlen("NO")); len = sprintf(buf, "%d", w->listen_port); scgi_header_add(add_request, "PORT", strlen("PORT"), buf, len); len = sprintf(buf, "%d", pid); scgi_header_add(add_request, "PID", strlen("PID"), buf, len); scgi_build(add_request); ctl->w_read.data = ctl->w_write.data = w; ctl->bytes_read = 0; // ctl->bytes_write = 0; ctl->scgi = add_request; ev_io_init(&(ctl->w_read),ctl_read_cb, ctl->fd,EV_READ); ev_io_start(w->loop,&(ctl->w_read)); ev_io_init(&(ctl->w_write),ctl_write_cb, ctl->fd,EV_WRITE); ev_io_start(w->loop,&(ctl->w_write)); return 0; } /** Send connect acknowledgement on unix domain socket */ int send_ack_on_unix_socket(wkr_t* w) { LOG_FUNCTION LOG_DEBUG(DEBUG,"send_ack_on_unix_socket() path = %s", w->sock_path.str); wkr_ctl_t *ctl = w->ctl; size_t len; char buf[WR_SHORT_STR_LEN]; pid_t pid = getpid(); scgi_t* add_request = scgi_new(); if(add_request == NULL) { LOG_ERROR(SEVERE,"Cannot create SCGI Request"); return -1; } // Construct SCGI request scgi_header_add(add_request, "COMPONENT", strlen("COMPONENT"), "WORKER", strlen("WORKER")); scgi_header_add(add_request, "METHOD", strlen("METHOD"), "ADD", strlen("ADD")); scgi_header_add(add_request, "APPLICATION", strlen("APPLICATION"), w->tmp->name.str, w->tmp->name.len); scgi_header_add(add_request, "UDS", strlen("UDS"), "YES", strlen("YES")); scgi_header_add(add_request, "SOCK_PATH", strlen("SOCK_PATH"), w->sock_path.str, w->sock_path.len); len = sprintf(buf, "%d", pid); scgi_header_add(add_request, "PID", strlen("PID"), buf, len); scgi_build(add_request); ctl->w_read.data = ctl->w_write.data = w; ctl->bytes_read = 0; //ctl->bytes_write = 0; ctl->scgi = add_request; ev_io_init(&(ctl->w_read),ctl_read_cb, ctl->fd,EV_READ); ev_io_start(w->loop,&(ctl->w_read)); ev_io_init(&(ctl->w_write),ctl_write_cb, ctl->fd,EV_WRITE); ev_io_start(w->loop,&(ctl->w_write)); return 0; }