started porting the gevent plugin to the new api

This commit is contained in:
Roberto De Ioris
2013-01-20 16:00:11 +01:00
parent d3dfdbb0ba
commit 810a1463d5
5 changed files with 80 additions and 168 deletions
+4
View File
@@ -3,18 +3,21 @@
// generate internal server error message
void uwsgi_500(struct wsgi_request *wsgi_req) {
if (uwsgi_response_prepare_headers(wsgi_req, "500 Internal Server Error", 25)) return;
if (uwsgi_response_add_connection_close(wsgi_req)) return;
uwsgi_response_write_body_do(wsgi_req, "Internal Server Error", 21);
}
void uwsgi_404(struct wsgi_request *wsgi_req) {
if (uwsgi_response_prepare_headers(wsgi_req, "404 Not Found", 13)) return;
if (uwsgi_response_add_connection_close(wsgi_req)) return;
if (uwsgi_response_add_header(wsgi_req, "Content-Type", 12, "text/plain", 10)) return;
uwsgi_response_write_body_do(wsgi_req, "Not Found", 9);
}
void uwsgi_403(struct wsgi_request *wsgi_req) {
if (uwsgi_response_prepare_headers(wsgi_req, "403 Forbidden", 13)) return;
if (uwsgi_response_add_connection_close(wsgi_req)) return;
if (uwsgi_response_add_content_type(wsgi_req, "text/plain", 10)) return;
uwsgi_response_write_body_do(wsgi_req, "Forbidden", 9);
}
@@ -25,6 +28,7 @@ void uwsgi_redirect_to_slash(struct wsgi_request *wsgi_req) {
size_t redirect_len = 0;
if (uwsgi_response_prepare_headers(wsgi_req, "301 Moved Permanently", 21)) return;
if (uwsgi_response_add_connection_close(wsgi_req)) return;
if (wsgi_req->query_string_len == 0) {
redirect = uwsgi_concat2n(wsgi_req->path_info, wsgi_req->path_info_len, "/", 1);
+10 -11
View File
@@ -14,17 +14,17 @@ static void cache_command(char *key, uint16_t keylen, char *val, uint16_t vallen
#ifdef UWSGI_DEBUG
uwsgi_log("cache value size: %llu\n", tmp_vallen);
#endif
wsgi_req->response_size = wsgi_req->socket->proto_write(wsgi_req, val, tmp_vallen);
uwsgi_response_write_body_do(wsgi_req, val, tmp_vallen);
}
}
else if (!uwsgi_strncmp(key, keylen, "get", 3)) {
val = uwsgi_cache_get(val, vallen, &tmp_vallen);
if (val && vallen > 0) {
wsgi_req->response_size = wsgi_req->socket->proto_write(wsgi_req, val, tmp_vallen);
uwsgi_response_write_body_do(wsgi_req, val, tmp_vallen);
}
else {
wsgi_req->response_size = wsgi_req->socket->proto_write(wsgi_req, "HTTP/1.0 404 Not Found\r\n\r\n<h1>Not Found</h1>", 44);
uwsgi_404(wsgi_req);
}
}
}
@@ -46,8 +46,8 @@ int uwsgi_cache_request(struct wsgi_request *wsgi_req) {
value = uwsgi_cache_get(wsgi_req->buffer, wsgi_req->uh.pktsize, &vallen);
if (value && vallen > 0) {
wsgi_req->uh.pktsize = vallen;
wsgi_req->response_size = wsgi_req->socket->proto_write(wsgi_req, (char *)&wsgi_req->uh, 4);
wsgi_req->response_size += wsgi_req->socket->proto_write(wsgi_req, value, vallen);
if (uwsgi_response_write_body_do(wsgi_req, (char *)&wsgi_req->uh, 4)) return -1;
uwsgi_response_write_body_do(wsgi_req, value, vallen);
}
}
break;
@@ -82,13 +82,13 @@ int uwsgi_cache_request(struct wsgi_request *wsgi_req) {
if (value && vallen > 0) {
wsgi_req->uh.pktsize = 0;
wsgi_req->uh.modifier2 = 1;
wsgi_req->response_size = wsgi_req->socket->proto_write(wsgi_req, (char *)&wsgi_req->uh, 4);
wsgi_req->response_size += wsgi_req->socket->proto_write(wsgi_req, value, vallen);
if (uwsgi_response_write_body_do(wsgi_req, (char *)&wsgi_req->uh, 4)) return -1;
uwsgi_response_write_body_do(wsgi_req, value, vallen);
}
else {
wsgi_req->uh.pktsize = 0;
wsgi_req->uh.modifier2 = 0;
wsgi_req->response_size = wsgi_req->socket->proto_write(wsgi_req, (char *)&wsgi_req->uh, 4);
if (uwsgi_response_write_body_do(wsgi_req, (char *)&wsgi_req->uh, 4)) return -1;
}
}
break;
@@ -106,11 +106,10 @@ int uwsgi_cache_request(struct wsgi_request *wsgi_req) {
}
wsgi_req->uh.pktsize = cache_dump->pos;
wsgi_req->response_size = wsgi_req->socket->proto_write(wsgi_req, (char *)&wsgi_req->uh, 4);
wsgi_req->response_size += wsgi_req->socket->proto_write(wsgi_req, cache_dump->buf, cache_dump->pos);
if (uwsgi_response_write_body_do(wsgi_req, (char *)&wsgi_req->uh, 4)) return -1;
uwsgi_response_write_body_do(wsgi_req, cache_dump->buf, cache_dump->pos);
uwsgi_buffer_destroy(cache_dump);
uwsgi_wlock(uwsgi.cache_lock);
uwsgi_socket_nb(wsgi_req->poll.fd);
int ret = uwsgi_write_nb(wsgi_req->poll.fd, (char *)uwsgi.cache_items, uwsgi.cache_filesize, uwsgi.shared->options[UWSGI_OPTION_SOCKET_TIMEOUT]);
if (!ret) {
wsgi_req->response_size += uwsgi.cache_filesize;
+11 -156
View File
@@ -1,59 +1,10 @@
#include "../python/uwsgi_python.h"
#include "gevent.h"
extern struct uwsgi_server uwsgi;
extern struct uwsgi_python up;
struct uwsgi_gevent ugevent;
#define GEVENT_SWITCH PyObject *gswitch = python_call(ugevent.greenlet_switch, ugevent.greenlet_switch_args, 0, NULL); Py_DECREF(gswitch)
#define GET_CURRENT_GREENLET python_call(ugevent.get_current, ugevent.get_current_args, 0, NULL)
#define free_req_queue uwsgi.async_queue_unused_ptr++; uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = wsgi_req
#define stop_the_watchers if (timer) { ret = PyObject_CallMethod(timer, "stop", NULL);\
if (ret) { Py_DECREF(ret); } }\
ret = PyObject_CallMethod(watcher, "stop", NULL);\
if (ret) { Py_DECREF(ret); }
#define stop_the_watchers_and_clear stop_the_watchers\
Py_DECREF(current); Py_DECREF(current_greenlet);\
Py_DECREF(watcher);\
if (timer) Py_DECREF(timer);
#define stop_the_io ret = PyObject_CallMethod(watcher, "stop", NULL);\
if (ret) { Py_DECREF(ret); }
#define stop_the_io_and_clear stop_the_io;\
Py_DECREF(current); Py_DECREF(current_greenlet);\
Py_DECREF(watcher);\
#define stop_the_c_watchers if (c_watchers) { int j;\
for(j=0;j<count;j++) {\
if (c_watchers[j]) {\
ret = PyObject_CallMethod(watcher, "stop", NULL);\
if (ret) { Py_DECREF(ret); }\
Py_DECREF(c_watchers[j]);\
}\
}\
free(c_watchers);\
}
struct uwsgi_gevent {
PyObject *greenlet_switch;
PyObject *greenlet_switch_args;
PyObject *get_current;
PyObject *get_current_args;
PyObject *hub;
PyObject *hub_loop;
PyObject *spawn;
PyObject *signal;
PyObject *greenlet_args;
PyObject *signal_args;
PyObject *my_signal_watcher;
PyObject *signal_watcher;
PyObject **watchers;
} ugevent;
void uwsgi_opt_setup_gevent(char *opt, char *value, void *null) {
static void uwsgi_opt_setup_gevent(char *opt, char *value, void *null) {
// set async mode
uwsgi_opt_set_int(opt, value, &uwsgi.async);
@@ -65,7 +16,7 @@ void uwsgi_opt_setup_gevent(char *opt, char *value, void *null) {
}
struct uwsgi_option gevent_options[] = {
static struct uwsgi_option gevent_options[] = {
{"gevent", required_argument, 0, "a shortcut enabling gevent loop engine with the specified number of async cores and optimal parameters", uwsgi_opt_setup_gevent, NULL, UWSGI_OPT_THREADS},
{0, 0, 0, 0, 0, 0, 0},
@@ -97,7 +48,7 @@ PyObject *py_uwsgi_gevent_graceful(PyObject *self, PyObject *args) {
return Py_None;
}
void uwsgi_gevent_gbcw() {
static void uwsgi_gevent_gbcw() {
uwsgi_log("...The work of process %d is done. Seeya!\n", getpid());
@@ -194,7 +145,7 @@ edge:
set_harakiri(uwsgi.shared->options[UWSGI_OPTION_HARAKIRI]);
}
// accept the connection
// accept the connection (since uWSGI 1.5 all of teh sockets are non-blocking)
if (wsgi_req_simple_accept(wsgi_req, uwsgi_sock->fd)) {
free_req_queue;
if (uwsgi_sock->retry && uwsgi_sock->retry[wsgi_req->async_id]) {
@@ -203,11 +154,6 @@ edge:
goto clear;
}
// on linux we need to set the socket in non-blocking as it is not inherited
#ifdef __linux__
uwsgi_socket_nb(wsgi_req->poll.fd);
#endif
// hack to easily pass wsgi_req pointer to the greenlet
PyTuple_SetItem(ugevent.greenlet_args, 1, PyLong_FromLong((long)wsgi_req));
@@ -710,98 +656,6 @@ error:
return -1;
}
void uwsgi_gevent_nb_write(struct wsgi_request *wsgi_req, PyObject *str) {
PyObject *ret;
char *content = PyString_AsString(str);
size_t content_len = PyString_Size(str);
// do not try to write empty chunks
if (content_len == 0) return;
/// create a watcher for writes
PyObject *watcher = PyObject_CallMethod(ugevent.hub_loop, "io", "ii", wsgi_req->poll.fd, 2);
if (!watcher) goto error;
PyObject *timer = PyObject_CallMethod(ugevent.hub_loop, "timer", "i", uwsgi.shared->options[UWSGI_OPTION_SOCKET_TIMEOUT]);
if (!timer) {
Py_DECREF(watcher);
goto error;
}
PyObject *current_greenlet = GET_CURRENT_GREENLET;
PyObject *current = PyObject_GetAttrString(current_greenlet, "switch");
char *ptr = content;
size_t remains = content_len;
// this is the main writing cycle, wait for writability and send...
for(;;) {
ret = PyObject_CallMethod(watcher, "start", "OO", current, watcher);
if (!ret) {
stop_the_watchers_and_clear
goto error;
}
Py_DECREF(ret);
ret = PyObject_CallMethod(timer, "start", "OO", current, timer);
if (!ret) {
stop_the_watchers_and_clear
goto error;
}
Py_DECREF(ret);
ret = PyObject_CallMethod(ugevent.hub, "switch", NULL);
wsgi_req->switches++;
if (!ret) {
stop_the_watchers_and_clear
goto error;
}
Py_DECREF(ret);
if (ret == timer) {
goto fail;
}
// ok we can write a chunk to the socket
UWSGI_RELEASE_GIL
ssize_t len = write(wsgi_req->poll.fd, ptr, remains);
UWSGI_GET_GIL
if (len > 0) {
ptr += len;
remains -= len;
wsgi_req->response_size += len;
if (remains == 0) {
break;
}
stop_the_watchers
continue;
}
else if (len < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {
stop_the_watchers
continue;
}
}
fail:
stop_the_watchers_and_clear
goto error;
}
stop_the_watchers
Py_DECREF(current); Py_DECREF(current_greenlet);
Py_DECREF(watcher);
Py_DECREF(timer);
return ;
error:
if (PyErr_Occurred())
PyErr_Print();
wsgi_req->write_errors++;
}
PyObject *uwsgi_gevent_wait(PyObject *watcher, PyObject *timer, PyObject *current) {
PyObject *ret;
@@ -924,15 +778,15 @@ PyMethodDef uwsgi_gevent_my_signal_def[] = { {"uwsgi_gevent_my_signal", py_uwsgi
PyMethodDef uwsgi_gevent_signal_handler_def[] = { {"uwsgi_gevent_signal_handler", py_uwsgi_gevent_signal_handler, METH_VARARGS, ""} };
PyMethodDef uwsgi_gevent_unix_signal_handler_def[] = { {"uwsgi_gevent_unix_signal_handler", py_uwsgi_gevent_graceful, METH_VARARGS, ""} };
void gil_gevent_get() {
static void gil_gevent_get() {
pthread_setspecific(up.upt_gil_key, (void *) PyGILState_Ensure());
}
void gil_gevent_release() {
static void gil_gevent_release() {
PyGILState_Release((PyGILState_STATE) pthread_getspecific(up.upt_gil_key));
}
void gevent_loop() {
static void gevent_loop() {
if (!uwsgi.has_threads && uwsgi.mywid == 1) {
uwsgi_log("!!! Running gevent without threads IS NOT recommended, enable them with --enable-threads !!!\n");
@@ -955,6 +809,7 @@ void gevent_loop() {
uwsgi.channel_recv_hook = uwsgi_channel_gevent_recv;
// buffer write generic hook
uwsgi.buffer_write_hook = uwsgi_buffer_gevent_write;
uwsgi.wait_write_hook = uwsgi_gevent_wait_write_hook;
struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
@@ -1084,7 +939,7 @@ void gevent_loop() {
}
void gevent_init() {
static void gevent_init() {
uwsgi_register_loop( (char *) "gevent", gevent_loop);
}
+54
View File
@@ -0,0 +1,54 @@
#include "gevent.h"
extern struct uwsgi_server uwsgi;
extern struct uwsgi_gevent ugevent;
int uwsgi_gevent_wait_write_hook(struct wsgi_request *wsgi_req) {
PyObject *ret = NULL;
/// create a watcher for writes
PyObject *watcher = PyObject_CallMethod(ugevent.hub_loop, "io", "ii", wsgi_req->poll.fd, 2);
if (!watcher) return -1;
PyObject *timer = PyObject_CallMethod(ugevent.hub_loop, "timer", "i", uwsgi.shared->options[UWSGI_OPTION_SOCKET_TIMEOUT]);
if (!timer) {
Py_DECREF(watcher);
return -1;
}
PyObject *current_greenlet = GET_CURRENT_GREENLET;
PyObject *current = PyObject_GetAttrString(current_greenlet, "switch");
ret = PyObject_CallMethod(watcher, "start", "OO", current, watcher);
if (!ret) {
stop_the_watchers_and_clear
return -1;
}
Py_DECREF(ret);
ret = PyObject_CallMethod(timer, "start", "OO", current, timer);
if (!ret) {
stop_the_watchers_and_clear
return -1;
}
Py_DECREF(ret);
ret = PyObject_CallMethod(ugevent.hub, "switch", NULL);
wsgi_req->switches++;
if (!ret) {
stop_the_watchers_and_clear
return -1;
}
Py_DECREF(ret);
if (ret == timer) {
stop_the_watchers_and_clear;
return -1;
}
stop_the_watchers_and_clear;
return 0;
}
+1 -1
View File
@@ -5,4 +5,4 @@ CFLAGS = ['-I' + sysconfig.get_python_inc(), '-I' + sysconfig.get_python_inc(pla
LDFLAGS = []
LIBS = []
GCC_LIST = ['gevent']
GCC_LIST = ['gevent', 'hooks']