completed socket initialization refactoring

This commit is contained in:
roberto@quantal64
2012-08-31 10:47:33 +02:00
parent bb54b90640
commit 56eaa88dd7
9 changed files with 351 additions and 338 deletions
+1 -1
View File
@@ -19,7 +19,7 @@ ldap = auto
pcre = auto
routing = auto
ipv6 = false
debug = false
debug = true
unbit = false
xml_implementation = libxml2
yaml_implementation = auto
+23
View File
@@ -2,6 +2,29 @@
extern struct uwsgi_server uwsgi;
void uwsgi_async_init() {
int i;
uwsgi.async_queue = event_queue_init();
if (uwsgi.async_queue < 0) {
exit(1);
}
uwsgi_add_sockets_to_queue(uwsgi.async_queue, -1);
uwsgi.rb_async_timeouts = uwsgi_init_rb_timer();
uwsgi.async_queue_unused = uwsgi_malloc(sizeof(struct wsgi_request *) * uwsgi.async);
for (i = 0; i < uwsgi.async; i++) {
uwsgi.async_queue_unused[i] = &uwsgi.workers[uwsgi.mywid].cores[i].req;
}
uwsgi.async_queue_unused_ptr = uwsgi.async - 1;
}
struct wsgi_request *find_wsgi_req_proto_by_fd(int fd) {
return uwsgi.async_proto_fd_table[fd];
}
+43
View File
@@ -1295,3 +1295,46 @@ end:
free(us);
close(client_fd);
}
void uwsgi_emperor_start() {
if (!uwsgi.sockets && !ushared->gateways_cnt && !uwsgi.master_process) {
uwsgi_notify_ready();
emperor_loop();
// never here
exit(1);
}
uwsgi.emperor_pid = uwsgi_fork("uWSGI Emperor");
if (uwsgi.emperor_pid < 0) {
uwsgi_error("pid()");
exit(1);
}
else if (uwsgi.emperor_pid == 0) {
#ifdef __linux__
if (prctl(PR_SET_PDEATHSIG, SIGKILL, 0, 0, 0)) {
uwsgi_error("prctl()");
}
#endif
emperor_loop();
// never here
exit(1);
}
}
void uwsgi_check_emperor() {
char *emperor_env = getenv("UWSGI_EMPEROR_FD");
if (emperor_env) {
uwsgi.has_emperor = 1;
uwsgi.emperor_fd = atoi(emperor_env);
uwsgi.master_process = 1;
uwsgi.no_orphans = 1;
uwsgi_log("*** has_emperor mode detected (fd: %d) ***\n", uwsgi.emperor_fd);
if (getenv("UWSGI_EMPEROR_FD_CONFIG")) {
uwsgi.emperor_fd_config = atoi(getenv("UWSGI_EMPEROR_FD_CONFIG"));
}
}
}
+1 -1
View File
@@ -165,7 +165,7 @@ void uwsgi_commandline_config() {
#ifdef UWSGI_DEBUG
uwsgi_log("optind:%d argc:%d\n", optind, argc);
uwsgi_log("optind:%d argc:%d\n", optind, uwsgi.argc);
#endif
if (optind < uwsgi.argc) {
+35
View File
@@ -533,6 +533,41 @@ void uwsgi_setup_locking() {
uwsgi.lock_ops.rwunlock = uwsgi_rwunlock_fast;
uwsgi.lock_size = UWSGI_LOCK_SIZE;
uwsgi.rwlock_size = UWSGI_RWLOCK_SIZE;
// application generic lock
int i;
uwsgi.user_lock = uwsgi_malloc(sizeof(void *) * (uwsgi.locks + 1));
for (i = 0; i < uwsgi.locks + 1; i++) {
uwsgi.user_lock[i] = uwsgi_lock_init(uwsgi_concat2("user ", uwsgi_num2str(i)));
}
// event queue lock (mitigate same event on multiple queues)
if (uwsgi.threads > 1) {
pthread_mutex_init(&uwsgi.thunder_mutex, NULL);
}
if (uwsgi.master_process) {
// signal table lock
uwsgi.signal_table_lock = uwsgi_lock_init("signal");
// fmon table lock
uwsgi.fmon_table_lock = uwsgi_lock_init("filemon");
// timer table lock
uwsgi.timer_table_lock = uwsgi_lock_init("timer");
// probe table lock
uwsgi.probe_table_lock = uwsgi_lock_init("probe");
// rb_timer table lock
uwsgi.rb_timer_table_lock = uwsgi_lock_init("rbtimer");
// cron table lock
uwsgi.cron_table_lock = uwsgi_lock_init("cron");
}
uwsgi.rpc_table_lock = uwsgi_lock_init("rpc");
}
+193
View File
@@ -1580,3 +1580,196 @@ void uwsgi_map_sockets() {
}
}
void uwsgi_bind_sockets() {
socklen_t socket_type_len;
union uwsgi_sockaddr usa;
union uwsgi_sockaddr_ptr gsa;
struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
if (!uwsgi_sock->bound && !uwsgi_socket_is_already_bound(uwsgi_sock->name)) {
char *tcp_port = strrchr(uwsgi_sock->name, ':');
if (tcp_port == NULL) {
uwsgi_sock->fd = bind_to_unix(uwsgi_sock->name, uwsgi.listen_queue, uwsgi.chmod_socket, uwsgi.abstract_socket);
uwsgi_sock->family = AF_UNIX;
if (uwsgi.chown_socket) {
uwsgi_chown(uwsgi_sock->name, uwsgi.chown_socket);
}
uwsgi_log("uwsgi socket %d bound to UNIX address %s fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
}
else {
#ifdef UWSGI_IPV6
if (uwsgi_sock->name[0] == '[' && tcp_port[-1] == ']') {
uwsgi_sock->fd = bind_to_tcp6(uwsgi_sock->name, uwsgi.listen_queue, tcp_port);
uwsgi_log("uwsgi socket %d bound to TCP6 address %s fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
uwsgi_sock->family = AF_INET6;
}
else {
#endif
uwsgi_sock->fd = bind_to_tcp(uwsgi_sock->name, uwsgi.listen_queue, tcp_port);
uwsgi_log("uwsgi socket %d bound to TCP address %s fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
uwsgi_sock->family = AF_INET;
#ifdef UWSGI_IPV6
}
#endif
}
if (uwsgi_sock->fd < 0 && !uwsgi_sock->per_core) {
uwsgi_log("unable to create server socket on: %s\n", uwsgi_sock->name);
exit(1);
}
}
uwsgi_sock->bound = 1;
uwsgi_sock = uwsgi_sock->next;
}
if (uwsgi.chown_socket) {
if (!uwsgi.master_as_root) {
uwsgi_as_root();
}
}
int zero_used = 0;
uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
if (uwsgi_sock->bound && uwsgi_sock->fd == 0) {
zero_used = 1;
break;
}
uwsgi_sock = uwsgi_sock->next;
}
if (!zero_used) {
socket_type_len = sizeof(struct sockaddr_un);
gsa.sa = (struct sockaddr *) &usa;
if (!uwsgi.skip_zero && !getsockname(0, gsa.sa, &socket_type_len)) {
if (gsa.sa->sa_family == AF_UNIX) {
uwsgi_sock = uwsgi_new_socket(usa.sa_un.sun_path);
uwsgi_sock->family = AF_UNIX;
uwsgi_sock->fd = 0;
uwsgi_sock->bound = 1;
uwsgi_log("uwsgi socket %d inherited UNIX address %s fd 0\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name);
}
else {
uwsgi_sock = uwsgi_new_socket(uwsgi_concat2("::", ""));
uwsgi_sock->family = AF_INET;
uwsgi_sock->fd = 0;
uwsgi_sock->bound = 1;
uwsgi_log("uwsgi socket %d inherited INET address %s fd 0\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name);
}
}
else if (!uwsgi.honour_stdin) {
int fd = open("/dev/null", O_RDONLY);
if (fd < 0) {
uwsgi_error_open("/dev/null");
exit(1);
}
if (fd != 0) {
if (dup2(fd, 0)) {
uwsgi_error("dup2()");
exit(1);
}
close(fd);
}
}
else if (uwsgi.honour_stdin) {
if (!tcgetattr(0, &uwsgi.termios)) {
uwsgi.restore_tc = 1;
}
}
}
// check for auto_port socket
uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
if (uwsgi_sock->auto_port) {
#ifdef UWSGI_IPV6
if (uwsgi_sock->family == AF_INET6) {
uwsgi_log("uwsgi socket %d bound to TCP6 address %s (port auto-assigned) fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
}
else {
#endif
uwsgi_log("uwsgi socket %d bound to TCP address %s (port auto-assigned) fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
#ifdef UWSGI_IPV6
}
#endif
}
uwsgi_sock = uwsgi_sock->next;
}
}
void uwsgi_set_sockets_protocols() {
struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
char *requested_protocol = uwsgi_sock->proto_name;
if (uwsgi_sock->lazy) goto setup_proto;
if (!uwsgi_sock->bound || uwsgi_sock->fd == -1)
goto nextsock;
if (!uwsgi_sock->per_core) {
uwsgi_sock->arg = fcntl(uwsgi_sock->fd, F_GETFL, NULL);
if (uwsgi_sock->arg < 0) {
uwsgi_error("fcntl()");
exit(1);
}
uwsgi_sock->arg |= O_NONBLOCK;
if (fcntl(uwsgi_sock->fd, F_SETFL, uwsgi_sock->arg) < 0) {
uwsgi_error("fcntl()");
exit(1);
}
}
setup_proto:
if (!requested_protocol) {
requested_protocol = uwsgi.protocol;
}
if (requested_protocol && !strcmp("http", requested_protocol)) {
uwsgi_sock->proto = uwsgi_proto_http_parser;
uwsgi_sock->proto_accept = uwsgi_proto_base_accept;
uwsgi_sock->proto_write = uwsgi_proto_uwsgi_write;
uwsgi_sock->proto_writev = uwsgi_proto_uwsgi_writev;
uwsgi_sock->proto_write_header = uwsgi_proto_uwsgi_write_header;
uwsgi_sock->proto_writev_header = uwsgi_proto_uwsgi_writev_header;
uwsgi_sock->proto_sendfile = NULL;
uwsgi_sock->proto_close = uwsgi_proto_base_close;
}
else if (requested_protocol && (!strcmp("fastcgi", requested_protocol) || !strcmp("fcgi", requested_protocol))) {
if (!strcmp(uwsgi.protocol, "fastcgi") || !strcmp(uwsgi.protocol, "fcgi")) {
uwsgi.shared->options[UWSGI_OPTION_CGI_MODE] = 1;
}
uwsgi_sock->proto = uwsgi_proto_fastcgi_parser;
uwsgi_sock->proto_accept = uwsgi_proto_base_accept;
uwsgi_sock->proto_write = uwsgi_proto_fastcgi_write;
uwsgi_sock->proto_writev = uwsgi_proto_fastcgi_writev;
uwsgi_sock->proto_write_header = uwsgi_proto_fastcgi_write_header;
uwsgi_sock->proto_writev_header = uwsgi_proto_fastcgi_writev_header;
uwsgi_sock->proto_sendfile = uwsgi_proto_fastcgi_sendfile;
uwsgi_sock->proto_close = uwsgi_proto_fastcgi_close;
}
#ifdef UWSGI_ZEROMQ
else if (requested_protocol && !strcmp("zmq", requested_protocol)) {
uwsgi.zeromq = 1;
}
#endif
else {
uwsgi_sock->proto = uwsgi_proto_uwsgi_parser;
uwsgi_sock->proto_accept = uwsgi_proto_base_accept;
uwsgi_sock->proto_write = uwsgi_proto_uwsgi_write;
uwsgi_sock->proto_writev = uwsgi_proto_uwsgi_writev;
uwsgi_sock->proto_write_header = uwsgi_proto_uwsgi_write_header;
uwsgi_sock->proto_writev_header = uwsgi_proto_uwsgi_writev_header;
uwsgi_sock->proto_sendfile = NULL;
uwsgi_sock->proto_close = uwsgi_proto_base_close;
}
nextsock:
uwsgi_sock = uwsgi_sock->next;
}
}
+27 -335
View File
@@ -1547,9 +1547,6 @@ int main(int argc, char *argv[], char *envp[]) {
struct utsname uuts;
char *emperor_env;
//char *optname;
signal(SIGSEGV, uwsgi_segfault);
signal(SIGFPE, uwsgi_fpe);
signal(SIGHUP, SIG_IGN);
@@ -1606,7 +1603,7 @@ int main(int argc, char *argv[], char *envp[]) {
}
#endif
// get startup time
gettimeofday(&uwsgi.start_tv, NULL);
setlinebuf(stdout);
@@ -1622,18 +1619,8 @@ int main(int argc, char *argv[], char *envp[]) {
uwsgi_notify("initializing uWSGI");
emperor_env = getenv("UWSGI_EMPEROR_FD");
if (emperor_env) {
uwsgi.has_emperor = 1;
uwsgi.emperor_fd = atoi(emperor_env);
uwsgi.master_process = 1;
uwsgi.no_orphans = 1;
uwsgi_log("*** has_emperor mode detected (fd: %d) ***\n", uwsgi.emperor_fd);
if (getenv("UWSGI_EMPEROR_FD_CONFIG")) {
uwsgi.emperor_fd_config = atoi(getenv("UWSGI_EMPEROR_FD_CONFIG"));
}
}
// check if we are under the Emperor
uwsgi_check_emperor();
char *screen_env = getenv("TERM");
if (screen_env) {
@@ -1798,7 +1785,7 @@ int main(int argc, char *argv[], char *envp[]) {
uwsgi_log_initial("compiled with version: %s on %s\n", __VERSION__, UWSGI_BUILD_DATE);
#ifdef __sun__
#ifdef __sun__
if (uname(&uuts) < 0) {
#else
if (uname(&uuts)) {
@@ -1813,7 +1800,6 @@ int main(int argc, char *argv[], char *envp[]) {
uwsgi_log_initial("clock source: %s\n", uwsgi.clock->name);
#ifdef __BIG_ENDIAN__
uwsgi_log_initial("*** big endian arch detected ***\n");
#endif
@@ -1849,29 +1835,7 @@ int main(int argc, char *argv[], char *envp[]) {
// start the Emperor if needed
if (uwsgi.early_emperor && uwsgi.emperor) {
if (!uwsgi.sockets && !ushared->gateways_cnt && !uwsgi.master_process) {
uwsgi_notify_ready();
emperor_loop();
// never here
exit(1);
}
uwsgi.emperor_pid = uwsgi_fork("uWSGI Emperor");
if (uwsgi.emperor_pid < 0) {
uwsgi_error("pid()");
exit(1);
}
else if (uwsgi.emperor_pid == 0) {
#ifdef __linux__
if (prctl(PR_SET_PDEATHSIG, SIGKILL, 0, 0, 0)) {
uwsgi_error("prctl()");
}
#endif
emperor_loop();
// never here
exit(1);
}
uwsgi_emperor_start();
}
// run the pre-jail scripts
@@ -1899,7 +1863,6 @@ int main(int argc, char *argv[], char *envp[]) {
}
// call jail systems
for (i = 0; i < uwsgi.gp_cnt; i++) {
if (uwsgi.gp[i]->jail) {
@@ -1929,20 +1892,8 @@ int main(int argc, char *argv[], char *envp[]) {
int uwsgi_start(void *v_argv) {
#ifdef UWSGI_DEBUG
int so_bufsize;
socklen_t so_bufsize_len;
#endif
int i, j;
union uwsgi_sockaddr usa;
union uwsgi_sockaddr_ptr gsa;
socklen_t socket_type_len;
struct uwsgi_socket *uwsgi_sock;
#ifdef __linux__
uwsgi_set_cgroup();
@@ -2024,11 +1975,18 @@ int uwsgi_start(void *v_argv) {
exit(1);
}
// automatically fix options
sanitize_args();
// initialize workers
// start the Emperor if needed
if (!uwsgi.early_emperor && uwsgi.emperor) {
uwsgi_emperor_start();
}
// end of generic initialization
// build mime.types dictionary
if (uwsgi.build_mime_dict) {
if (!uwsgi.mime_file)
uwsgi_string_new_list(&uwsgi.mime_file, "/etc/mime.types");
@@ -2044,6 +2002,7 @@ int uwsgi_start(void *v_argv) {
}
}
// prepare offload threads
if (uwsgi.static_offload_to_thread) {
pthread_attr_init(&uwsgi.static_offload_thread_attr);
pthread_attr_setdetachstate(&uwsgi.static_offload_thread_attr, PTHREAD_CREATE_DETACHED);
@@ -2052,36 +2011,6 @@ int uwsgi_start(void *v_argv) {
pthread_mutex_init(&uwsgi.static_offload_thread_lock, NULL);
}
// end of generic initialization
// start the Emperor if needed
if (!uwsgi.early_emperor && uwsgi.emperor) {
if (!uwsgi.sockets && !ushared->gateways_cnt && !uwsgi.master_process) {
uwsgi_notify_ready();
emperor_loop();
// never here
exit(1);
}
uwsgi.emperor_pid = uwsgi_fork("uWSGI Emperor");
if (uwsgi.emperor_pid < 0) {
uwsgi_error("pid()");
exit(1);
}
else if (uwsgi.emperor_pid == 0) {
#ifdef __linux__
if (prctl(PR_SET_PDEATHSIG, SIGKILL, 0, 0, 0)) {
uwsgi_error("prctl()");
}
#endif
emperor_loop();
// never here
exit(1);
}
}
#ifdef UWSGI_ASYNC
// TODO rewrite to use uwsgi.max_fd
@@ -2155,51 +2084,19 @@ int uwsgi_start(void *v_argv) {
// setup locking
uwsgi_setup_locking();
// event queue lock (mitigate same event on multiple queues)
if (uwsgi.threads > 1) {
pthread_mutex_init(&uwsgi.thunder_mutex, NULL);
}
// application generic lock
uwsgi.user_lock = uwsgi_malloc(sizeof(void *) * (uwsgi.locks + 1));
for (i = 0; i < uwsgi.locks + 1; i++) {
uwsgi.user_lock[i] = uwsgi_lock_init(uwsgi_concat2("user ", uwsgi_num2str(i)));
}
if (uwsgi.master_process) {
// signal table lock
uwsgi.signal_table_lock = uwsgi_lock_init("signal");
// fmon table lock
uwsgi.fmon_table_lock = uwsgi_lock_init("filemon");
// timer table lock
uwsgi.timer_table_lock = uwsgi_lock_init("timer");
// probe table lock
uwsgi.probe_table_lock = uwsgi_lock_init("probe");
// rb_timer table lock
uwsgi.rb_timer_table_lock = uwsgi_lock_init("rbtimer");
// cron table lock
uwsgi.cron_table_lock = uwsgi_lock_init("cron");
}
uwsgi.rpc_table_lock = uwsgi_lock_init("rpc");
// setup sharedarea
if (uwsgi.sharedareasize > 0) {
uwsgi.sharedarea = uwsgi_calloc_shared(uwsgi.page_size * uwsgi.sharedareasize);
uwsgi_log("shared area mapped at %p, you can access it with uwsgi.sharedarea* functions.\n", uwsgi.sharedarea);
uwsgi.sa_lock = uwsgi_rwlock_init("sharedarea");
}
// setup queue
if (uwsgi.queue_size > 0) {
uwsgi_init_queue();
}
// setup cache
if (uwsgi.cache_max_items > 0) {
uwsgi_init_cache();
}
@@ -2233,188 +2130,10 @@ int uwsgi_start(void *v_argv) {
//now bind all the unbound sockets
uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
if (!uwsgi_sock->bound && !uwsgi_socket_is_already_bound(uwsgi_sock->name)) {
char *tcp_port = strrchr(uwsgi_sock->name, ':');
if (tcp_port == NULL) {
uwsgi_sock->fd = bind_to_unix(uwsgi_sock->name, uwsgi.listen_queue, uwsgi.chmod_socket, uwsgi.abstract_socket);
uwsgi_sock->family = AF_UNIX;
if (uwsgi.chown_socket) {
uwsgi_chown(uwsgi_sock->name, uwsgi.chown_socket);
}
uwsgi_log("uwsgi socket %d bound to UNIX address %s fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
}
else {
#ifdef UWSGI_IPV6
if (uwsgi_sock->name[0] == '[' && tcp_port[-1] == ']') {
uwsgi_sock->fd = bind_to_tcp6(uwsgi_sock->name, uwsgi.listen_queue, tcp_port);
uwsgi_log("uwsgi socket %d bound to TCP6 address %s fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
uwsgi_sock->family = AF_INET6;
}
else {
#endif
uwsgi_sock->fd = bind_to_tcp(uwsgi_sock->name, uwsgi.listen_queue, tcp_port);
uwsgi_log("uwsgi socket %d bound to TCP address %s fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
uwsgi_sock->family = AF_INET;
#ifdef UWSGI_IPV6
}
#endif
}
if (uwsgi_sock->fd < 0 && !uwsgi_sock->per_core) {
uwsgi_log("unable to create server socket on: %s\n", uwsgi_sock->name);
exit(1);
}
}
uwsgi_sock->bound = 1;
uwsgi_sock = uwsgi_sock->next;
}
if (uwsgi.chown_socket) {
if (!uwsgi.master_as_root) {
uwsgi_as_root();
}
}
int zero_used = 0;
uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
if (uwsgi_sock->bound && uwsgi_sock->fd == 0) {
zero_used = 1;
break;
}
uwsgi_sock = uwsgi_sock->next;
}
if (!zero_used) {
socket_type_len = sizeof(struct sockaddr_un);
gsa.sa = (struct sockaddr *) &usa;
if (!uwsgi.skip_zero && !getsockname(0, gsa.sa, &socket_type_len)) {
if (gsa.sa->sa_family == AF_UNIX) {
uwsgi_sock = uwsgi_new_socket(usa.sa_un.sun_path);
uwsgi_sock->family = AF_UNIX;
uwsgi_sock->fd = 0;
uwsgi_sock->bound = 1;
uwsgi_log("uwsgi socket %d inherited UNIX address %s fd 0\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name);
}
else {
uwsgi_sock = uwsgi_new_socket(uwsgi_concat2("::", ""));
uwsgi_sock->family = AF_INET;
uwsgi_sock->fd = 0;
uwsgi_sock->bound = 1;
uwsgi_log("uwsgi socket %d inherited INET address %s fd 0\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name);
}
}
else if (!uwsgi.honour_stdin) {
int fd = open("/dev/null", O_RDONLY);
if (fd < 0) {
uwsgi_error_open("/dev/null");
exit(1);
}
if (fd != 0) {
if (dup2(fd, 0)) {
uwsgi_error("dup2()");
exit(1);
}
close(fd);
}
}
else if (uwsgi.honour_stdin) {
if (!tcgetattr(0, &uwsgi.termios)) {
uwsgi.restore_tc = 1;
}
}
}
// check for auto_port socket
uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
if (uwsgi_sock->auto_port) {
#ifdef UWSGI_IPV6
if (uwsgi_sock->family == AF_INET6) {
uwsgi_log("uwsgi socket %d bound to TCP6 address %s (port auto-assigned) fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
}
else {
#endif
uwsgi_log("uwsgi socket %d bound to TCP address %s (port auto-assigned) fd %d\n", uwsgi_get_socket_num(uwsgi_sock), uwsgi_sock->name, uwsgi_sock->fd);
#ifdef UWSGI_IPV6
}
#endif
}
uwsgi_sock = uwsgi_sock->next;
}
uwsgi_bind_sockets();
// put listening socket in non-blocking state and set the protocol
uwsgi_sock = uwsgi.sockets;
while (uwsgi_sock) {
char *requested_protocol = uwsgi_sock->proto_name;
if (uwsgi_sock->lazy) goto setup_proto;
if (!uwsgi_sock->bound || uwsgi_sock->fd == -1)
goto nextsock;
if (!uwsgi_sock->per_core) {
uwsgi_sock->arg = fcntl(uwsgi_sock->fd, F_GETFL, NULL);
if (uwsgi_sock->arg < 0) {
uwsgi_error("fcntl()");
exit(1);
}
uwsgi_sock->arg |= O_NONBLOCK;
if (fcntl(uwsgi_sock->fd, F_SETFL, uwsgi_sock->arg) < 0) {
uwsgi_error("fcntl()");
exit(1);
}
}
setup_proto:
if (!requested_protocol) {
requested_protocol = uwsgi.protocol;
}
if (requested_protocol && !strcmp("http", requested_protocol)) {
uwsgi_sock->proto = uwsgi_proto_http_parser;
uwsgi_sock->proto_accept = uwsgi_proto_base_accept;
uwsgi_sock->proto_write = uwsgi_proto_uwsgi_write;
uwsgi_sock->proto_writev = uwsgi_proto_uwsgi_writev;
uwsgi_sock->proto_write_header = uwsgi_proto_uwsgi_write_header;
uwsgi_sock->proto_writev_header = uwsgi_proto_uwsgi_writev_header;
uwsgi_sock->proto_sendfile = NULL;
uwsgi_sock->proto_close = uwsgi_proto_base_close;
}
else if (requested_protocol && (!strcmp("fastcgi", requested_protocol) || !strcmp("fcgi", requested_protocol))) {
if (!strcmp(uwsgi.protocol, "fastcgi") || !strcmp(uwsgi.protocol, "fcgi")) {
uwsgi.shared->options[UWSGI_OPTION_CGI_MODE] = 1;
}
uwsgi_sock->proto = uwsgi_proto_fastcgi_parser;
uwsgi_sock->proto_accept = uwsgi_proto_base_accept;
uwsgi_sock->proto_write = uwsgi_proto_fastcgi_write;
uwsgi_sock->proto_writev = uwsgi_proto_fastcgi_writev;
uwsgi_sock->proto_write_header = uwsgi_proto_fastcgi_write_header;
uwsgi_sock->proto_writev_header = uwsgi_proto_fastcgi_writev_header;
uwsgi_sock->proto_sendfile = uwsgi_proto_fastcgi_sendfile;
uwsgi_sock->proto_close = uwsgi_proto_fastcgi_close;
}
#ifdef UWSGI_ZEROMQ
else if (requested_protocol && !strcmp("zmq", requested_protocol)) {
uwsgi.zeromq = 1;
}
#endif
else {
uwsgi_sock->proto = uwsgi_proto_uwsgi_parser;
uwsgi_sock->proto_accept = uwsgi_proto_base_accept;
uwsgi_sock->proto_write = uwsgi_proto_uwsgi_write;
uwsgi_sock->proto_writev = uwsgi_proto_uwsgi_writev;
uwsgi_sock->proto_write_header = uwsgi_proto_uwsgi_write_header;
uwsgi_sock->proto_writev_header = uwsgi_proto_uwsgi_writev_header;
uwsgi_sock->proto_sendfile = NULL;
uwsgi_sock->proto_close = uwsgi_proto_base_close;
}
nextsock:
uwsgi_sock = uwsgi_sock->next;
}
uwsgi_set_sockets_protocols();
}
@@ -2497,7 +2216,9 @@ nextsock:
}
#ifdef UWSGI_DEBUG
uwsgi_sock = uwsgi.sockets;
struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
int so_bufsize;
socklen_t so_bufsize_len;
while (uwsgi_sock) {
so_bufsize_len = sizeof(int);
if (getsockopt(uwsgi_sock->fd, SOL_SOCKET, SO_RCVBUF, &so_bufsize, &so_bufsize_len)) {
@@ -2860,28 +2581,14 @@ nextsock:
uwsgi_init_all_apps();
}
// some apps could be mounted only on specific workers
uwsgi_init_worker_mount_apps();
#ifdef UWSGI_ZEROMQ
// setup zeromq context (if required) one per-worker
if (uwsgi.zeromq) {
uwsgi.zmq_context = zmq_init(1);
if (uwsgi.zmq_context == NULL) {
uwsgi_error("zmq_init()");
exit(1);
}
struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
while(uwsgi_sock) {
if (!uwsgi_sock->proto_name || strcmp(uwsgi_sock->proto_name, "zmq")) {
goto zmq_next;
}
uwsgi_proto_zeromq_setup(uwsgi_sock);
zmq_next:
uwsgi_sock = uwsgi_sock->next;
}
uwsgi_zeromq_init();
}
#endif
@@ -2889,26 +2596,11 @@ zmq_next:
//do not pass kfd after fork()
#ifdef UWSGI_ASYNC
if (uwsgi.async > 1) {
uwsgi.async_queue = event_queue_init();
if (uwsgi.async_queue < 0) {
exit(1);
}
uwsgi_add_sockets_to_queue(uwsgi.async_queue, -1);
uwsgi.rb_async_timeouts = uwsgi_init_rb_timer();
uwsgi.async_queue_unused = uwsgi_malloc(sizeof(struct wsgi_request *) * uwsgi.async);
for (i = 0; i < uwsgi.async; i++) {
uwsgi.async_queue_unused[i] = &uwsgi.workers[uwsgi.mywid].cores[i].req;
}
uwsgi.async_queue_unused_ptr = uwsgi.async - 1;
uwsgi_async_init();
}
#endif
// setup UNIX signals for the worker
if (uwsgi.shared->options[UWSGI_OPTION_HARAKIRI] > 0 && !uwsgi.master_process) {
signal(SIGALRM, (void *) &harakiri);
}
@@ -2927,7 +2619,7 @@ zmq_next:
signal(SIGPIPE, (void *) &warn_pipe);
}
//initialization done
// worker initialization done
// run fixup handler
for (i = 0; i < 256; i++) {
+20
View File
@@ -2,6 +2,26 @@
extern struct uwsgi_server uwsgi;
void uwsgi_zeromq_init() {
uwsgi.zmq_context = zmq_init(1);
if (uwsgi.zmq_context == NULL) {
uwsgi_error("zmq_init()");
exit(1);
}
struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
while(uwsgi_sock) {
if (!uwsgi_sock->proto_name || strcmp(uwsgi_sock->proto_name, "zmq")) {
goto zmq_next;
}
uwsgi_proto_zeromq_setup(uwsgi_sock);
zmq_next:
uwsgi_sock = uwsgi_sock->next;
}
}
#ifdef UWSGI_JSON
#include <jansson.h>
+8 -1
View File
@@ -2143,7 +2143,7 @@ struct http_status_codes {
};
#ifdef UWSGI_ASYNC
void uwsgi_async_init(void);
void *async_loop(void *);
struct wsgi_request *find_first_available_wsgi_req(void);
struct wsgi_request *find_first_accepting_wsgi_req(void);
@@ -2625,6 +2625,7 @@ ssize_t uwsgi_proto_zeromq_write(struct wsgi_request *, char *, size_t);
ssize_t uwsgi_proto_zeromq_write_header(struct wsgi_request *, char *, size_t);
ssize_t uwsgi_proto_zeromq_sendfile(struct wsgi_request *);
int uwsgi_proto_zeromq_parser(struct wsgi_request *);
void uwsgi_zeromq_init(void);
#endif
int uwsgi_num2str2(int, char *);
@@ -3192,6 +3193,12 @@ void uwsgi_map_sockets(void);
void uwsgi_set_cpu_affinity(void);
void uwsgi_emperor_start(void);
void uwsgi_bind_sockets(void);
void uwsgi_set_sockets_protocols(void);
void uwsgi_check_emperor(void);
#ifdef UWSGI_AS_SHARED_LIBRARY
int uwsgi_init(int, char **, char **);
#endif