improved websockets support

This commit is contained in:
Unbit
2013-02-16 13:13:04 +01:00
parent f4b8f3e49d
commit 0dd2a34e22
11 changed files with 163 additions and 69 deletions
-3
View File
@@ -495,9 +495,6 @@ static struct uwsgi_option uwsgi_base_options[] = {
{"websockets-ping-freq", required_argument, 0, "set the frequency (in seconds) of websockets automatic ping packets", uwsgi_opt_set_int, &uwsgi.websockets_ping_freq, 0},
{"websocket-ping-freq", required_argument, 0, "set the frequency (in seconds) of websockets automatic ping packets", uwsgi_opt_set_int, &uwsgi.websockets_ping_freq, 0},
{"websockets-pong-freq", required_argument, 0, "set the frequency (in seconds) of websockets automatic pong/keepalive packets", uwsgi_opt_set_int, &uwsgi.websockets_pong_freq, 0},
{"websocket-pong-freq", required_argument, 0, "set the frequency (in seconds) of websockets automatic pong/keepalive packets", uwsgi_opt_set_int, &uwsgi.websockets_pong_freq, 0},
{"websockets-max-size", required_argument, 0, "set the max allowed size of websocket messages (in Kbytes, default 1024)", uwsgi_opt_set_64bit, &uwsgi.websockets_max_size, 0},
{"websocket-max-size", required_argument, 0, "set the max allowed size of websocket messages (in Kbytes, default 1024)", uwsgi_opt_set_64bit, &uwsgi.websockets_max_size, 0},
+54 -30
View File
@@ -10,7 +10,7 @@
extern struct uwsgi_server uwsgi;
struct uwsgi_buffer *uwsgi_websocket_message(char *msg, size_t len) {
static struct uwsgi_buffer *uwsgi_websocket_message(char *msg, size_t len) {
struct uwsgi_buffer *ub = uwsgi_buffer_new(10 + len);
if (uwsgi_buffer_u8(ub, 0x81)) goto error;
if (len < 126) {
@@ -33,7 +33,7 @@ error:
return NULL;
}
int uwsgi_websockets_ping(struct wsgi_request *wsgi_req) {
static int uwsgi_websockets_ping(struct wsgi_request *wsgi_req) {
if (uwsgi_response_write_body_do(wsgi_req, uwsgi.websockets_ping->buf, uwsgi.websockets_ping->pos)) {
return -1;
}
@@ -41,28 +41,29 @@ int uwsgi_websockets_ping(struct wsgi_request *wsgi_req) {
return 0;
}
int uwsgi_websockets_pong(struct wsgi_request *wsgi_req) {
time_t now = uwsgi_now();
if (wsgi_req->websocket_last_ping == 0 ||
( wsgi_req->websocket_last_ping > 0 && now - wsgi_req->websocket_last_ping > uwsgi.websockets_ping_freq)) {
if (uwsgi_websockets_ping(wsgi_req)) return -1;
return 0;
}
// check if last pong arrived in time
else if (wsgi_req->websocket_last_ping > 0) {
if (wsgi_req->websocket_last_pong < wsgi_req->websocket_last_ping) {
if (wsgi_req->websocket_last_ping - wsgi_req->websocket_last_pong >
uwsgi.websockets_ping_freq) {
uwsgi_log("[uwsgi-websocket] no PONG received in %d seconds !!!\n", uwsgi.websockets_ping_freq);
return -1;
}
}
}
static int uwsgi_websockets_pong(struct wsgi_request *wsgi_req) {
return uwsgi_response_write_body_do(wsgi_req, uwsgi.websockets_pong->buf, uwsgi.websockets_pong->pos);
}
int uwsgi_websocket_send_do(struct wsgi_request *wsgi_req, char *msg, size_t len) {
static int uwsgi_websockets_check_pingpong(struct wsgi_request *wsgi_req) {
time_t now = uwsgi_now();
if (wsgi_req->websocket_last_ping == 0 ||
(now - wsgi_req->websocket_last_ping > uwsgi.websockets_ping_freq)) {
if (uwsgi_websockets_ping(wsgi_req)) return -1;
}
else if (wsgi_req->websocket_last_ping > 0) {
if (wsgi_req->websocket_last_pong < wsgi_req->websocket_last_ping) {
if (wsgi_req->websocket_last_ping - wsgi_req->websocket_last_pong >
uwsgi.websockets_ping_freq) {
uwsgi_log("[uwsgi-websocket] no PONG received in %d seconds !!!\n", uwsgi.websockets_ping_freq);
return -1;
}
}
}
return 0;
}
static int uwsgi_websocket_send_do(struct wsgi_request *wsgi_req, char *msg, size_t len) {
struct uwsgi_buffer *ub = uwsgi_websocket_message(msg, len);
if (!ub) return -1;
@@ -83,7 +84,7 @@ int uwsgi_websocket_send(struct wsgi_request *wsgi_req, char *msg, size_t len) {
return ret;
}
void uwsgi_websocket_parse_header(struct wsgi_request *wsgi_req) {
static void uwsgi_websocket_parse_header(struct wsgi_request *wsgi_req) {
uint8_t byte1 = wsgi_req->websocket_buf->buf[0];
uint8_t byte2 = wsgi_req->websocket_buf->buf[1];
wsgi_req->websocket_opcode = byte1 & 0xf;
@@ -91,7 +92,7 @@ void uwsgi_websocket_parse_header(struct wsgi_request *wsgi_req) {
wsgi_req->websocket_size = byte2 & 0x7f;
}
struct uwsgi_buffer *uwsgi_websockets_parse(struct wsgi_request *wsgi_req) {
static struct uwsgi_buffer *uwsgi_websockets_parse(struct wsgi_request *wsgi_req) {
// de-mask buffer
uint8_t *ptr = (uint8_t *) (wsgi_req->websocket_buf->buf + (wsgi_req->websocket_pktsize - wsgi_req->websocket_size));
size_t i;
@@ -115,7 +116,7 @@ error:
}
ssize_t uwsgi_websockets_recv_pkt(struct wsgi_request *wsgi_req) {
static ssize_t uwsgi_websockets_recv_pkt(struct wsgi_request *wsgi_req, int nb) {
int ret = -1;
@@ -125,6 +126,12 @@ ssize_t uwsgi_websockets_recv_pkt(struct wsgi_request *wsgi_req) {
if (rlen == 0) return -1;
if (rlen < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {
if (nb) {
if (uwsgi_websockets_check_pingpong(wsgi_req)) {
return -1;
}
return 0;
}
goto wait;
}
uwsgi_error("uwsgi_websockets_recv_pkt()");
@@ -132,7 +139,7 @@ ssize_t uwsgi_websockets_recv_pkt(struct wsgi_request *wsgi_req) {
}
wait:
ret = uwsgi.wait_read_hook(wsgi_req->fd, uwsgi.websockets_pong_freq);
ret = uwsgi.wait_read_hook(wsgi_req->fd, uwsgi.websockets_ping_freq);
if (ret > 0) {
rlen = wsgi_req->socket->proto_read_body(wsgi_req, wsgi_req->websocket_buf->buf + wsgi_req->websocket_buf->pos, wsgi_req->websocket_buf->len - wsgi_req->websocket_buf->pos);
if (rlen > 0) return rlen;
@@ -143,7 +150,7 @@ wait:
return -1;
}
// send unsolicited pong
if (uwsgi_websockets_pong(wsgi_req)) {
if (uwsgi_websockets_check_pingpong(wsgi_req)) {
return -1;
}
}
@@ -152,7 +159,7 @@ wait:
}
struct uwsgi_buffer *uwsgi_websocket_recv_do(struct wsgi_request *wsgi_req) {
static struct uwsgi_buffer *uwsgi_websocket_recv_do(struct wsgi_request *wsgi_req, int nb) {
if (!wsgi_req->websocket_buf) {
// this buffer will be destroyed on connection close
wsgi_req->websocket_buf = uwsgi_buffer_new(uwsgi.page_size);
@@ -257,8 +264,12 @@ struct uwsgi_buffer *uwsgi_websocket_recv_do(struct wsgi_request *wsgi_req) {
// need more data
else {
if (uwsgi_buffer_ensure(wsgi_req->websocket_buf, uwsgi.page_size)) return NULL;
ssize_t len = uwsgi_websockets_recv_pkt(wsgi_req);
ssize_t len = uwsgi_websockets_recv_pkt(wsgi_req, nb);
if (len <= 0) {
if (nb == 1 && len == 0) {
// return an empty buffer to signal blocking event
return uwsgi_buffer_new(0);
}
return NULL;
}
// update buffer size
@@ -273,13 +284,25 @@ struct uwsgi_buffer *uwsgi_websocket_recv(struct wsgi_request *wsgi_req) {
if (wsgi_req->websocket_closed) {
return NULL;
}
struct uwsgi_buffer *ub = uwsgi_websocket_recv_do(wsgi_req);
struct uwsgi_buffer *ub = uwsgi_websocket_recv_do(wsgi_req, 0);
if (!ub) {
wsgi_req->websocket_closed = 1;
}
return ub;
}
struct uwsgi_buffer *uwsgi_websocket_recv_nb(struct wsgi_request *wsgi_req) {
if (wsgi_req->websocket_closed) {
return NULL;
}
struct uwsgi_buffer *ub = uwsgi_websocket_recv_do(wsgi_req, 1);
if (!ub) {
wsgi_req->websocket_closed = 1;
}
return ub;
}
ssize_t uwsgi_websockets_simple_send(struct wsgi_request *wsgi_req, struct uwsgi_buffer *ub) {
ssize_t len = wsgi_req->socket->proto_write(wsgi_req, ub->buf, ub->pos);
@@ -313,6 +336,8 @@ int uwsgi_websocket_handshake(struct wsgi_request *wsgi_req, char *key, uint16_t
}
free(b64);
wsgi_req->websocket_last_pong = uwsgi_now();
return uwsgi_response_write_headers_do(wsgi_req);
#else
uwsgi_log("you need to build uWSGI with SSL support to use the websocket handshake api function !!!\n");
@@ -326,6 +351,5 @@ void uwsgi_websockets_init() {
uwsgi.websockets_ping = uwsgi_buffer_new(2);
uwsgi_buffer_append(uwsgi.websockets_ping, "\x89\0", 2);
uwsgi.websockets_ping_freq = 30;
uwsgi.websockets_pong_freq = 10;
uwsgi.websockets_max_size = 1024;
}
+11 -4
View File
@@ -74,15 +74,17 @@ static void uwsgi_gevent_gbcw() {
}
struct wsgi_request *uwsgi_gevent_current_wsgi_req(void) {
struct wsgi_request *wsgi_req = NULL;
PyObject *current_greenlet = GET_CURRENT_GREENLET;
PyObject *py_wsgi_req = PyObject_GetAttrString(current_greenlet, "uwsgi_wsgi_req");
// not in greenlet
if (!py_wsgi_req) {
uwsgi_log("[BUG] current_wsgi_req NOT FOUND !!!\n");
exit(1);
goto end;
}
struct wsgi_request *wsgi_req = (struct wsgi_request*) PyLong_AsLong(py_wsgi_req);
wsgi_req = (struct wsgi_request*) PyLong_AsLong(py_wsgi_req);
Py_DECREF(py_wsgi_req);
end:
Py_DECREF(current_greenlet);
return wsgi_req;
}
@@ -408,8 +410,13 @@ static void gevent_loop() {
python_call(ugevent.signal, ge_signal_tuple, 0, NULL);
if (!PyObject_CallMethod(ugevent.hub, "join", NULL)) {
PyErr_Print();
for(;;) {
if (!PyObject_CallMethod(ugevent.hub, "join", NULL)) {
PyErr_Print();
}
else {
break;
}
}
if (uwsgi.workers[uwsgi.mywid].manage_next_request == 0) {
+20
View File
@@ -378,6 +378,25 @@ XS(XS_websocket_recv) {
XSRETURN(1);
}
XS(XS_websocket_recv_nb) {
dXSARGS;
psgi_check_args(0);
struct wsgi_request *wsgi_req = current_wsgi_req();
struct uwsgi_buffer *ub = uwsgi_websocket_recv_nb(wsgi_req);
if (!ub) {
croak("unable to receive websocket message");
XSRETURN_UNDEF;
}
ST(0) = newSVpv(ub->buf, ub->pos);
uwsgi_buffer_destroy(ub);
sv_2mortal(ST(0));
XSRETURN(1);
}
void init_perl_embedded_module() {
@@ -400,6 +419,7 @@ void init_perl_embedded_module() {
psgi_xs(alarm);
psgi_xs(websocket_handshake);
psgi_xs(websocket_recv);
psgi_xs(websocket_recv_nb);
psgi_xs(websocket_send);
psgi_xs(postfork);
psgi_xs(atexit);
+32 -17
View File
@@ -5,7 +5,7 @@ extern struct uwsgi_python up;
PyObject *py_uwsgi_signal_wait(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
int wait_for_specific_signal = 0;
uint8_t uwsgi_signal = 0;
int received_signal;
@@ -42,7 +42,7 @@ PyObject *py_uwsgi_signal_wait(PyObject * self, PyObject * args) {
PyObject *py_uwsgi_signal_received(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
return PyInt_FromLong(wsgi_req->signal_received);
}
@@ -506,7 +506,7 @@ clear:
PyObject *py_uwsgi_log_this(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
wsgi_req->log_this = 1;
@@ -532,7 +532,7 @@ PyObject *py_uwsgi_get_logvar(PyObject * self, PyObject * args) {
char *key = NULL;
Py_ssize_t keylen = 0;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
if (!PyArg_ParseTuple(args, "s#:get_logvar", &key, &keylen)) {
return NULL;
@@ -554,7 +554,7 @@ PyObject *py_uwsgi_set_logvar(PyObject * self, PyObject * args) {
Py_ssize_t keylen = 0;
char *val = NULL;
Py_ssize_t vallen = 0;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
if (!PyArg_ParseTuple(args, "s#s#:set_logvar", &key, &keylen, &val, &vallen)) {
return NULL;
@@ -677,7 +677,7 @@ PyObject *py_uwsgi_send(PyObject * self, PyObject * args) {
PyObject *data;
PyObject *arg1, *arg2;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
int uwsgi_fd = wsgi_req->fd;
@@ -708,7 +708,7 @@ PyObject *py_uwsgi_offload(PyObject * self, PyObject * args) {
/*
size_t len = 0;
char *filename = NULL;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
if (!PyArg_ParseTuple(args, "s|i:offload_transfer", &filename, &len)) {
return NULL;
@@ -747,7 +747,7 @@ PyObject *py_uwsgi_advanced_sendfile(PyObject * self, PyObject * args) {
size_t chunk = 0;
off_t pos = 0;
size_t filesize = 0;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
int fd = -1;
@@ -964,7 +964,7 @@ PyObject *py_uwsgi_unlock(PyObject * self, PyObject * args) {
}
PyObject *py_uwsgi_connection_fd(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
return PyInt_FromLong(wsgi_req->fd);
}
@@ -979,7 +979,7 @@ PyObject *py_uwsgi_websocket_handshake(PyObject * self, PyObject * args) {
return NULL;
}
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
UWSGI_RELEASE_GIL
int ret = uwsgi_websocket_handshake(wsgi_req, key, key_len, origin, origin_len);
@@ -1001,7 +1001,7 @@ PyObject *py_uwsgi_websocket_send(PyObject * self, PyObject * args) {
return NULL;
}
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
UWSGI_RELEASE_GIL
int ret = uwsgi_websocket_send(wsgi_req, message, message_len);
@@ -1014,7 +1014,7 @@ PyObject *py_uwsgi_websocket_send(PyObject * self, PyObject * args) {
}
PyObject *py_uwsgi_websocket_recv(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
UWSGI_RELEASE_GIL
struct uwsgi_buffer *ub = uwsgi_websocket_recv(wsgi_req);
UWSGI_GET_GIL
@@ -1027,6 +1027,20 @@ PyObject *py_uwsgi_websocket_recv(PyObject * self, PyObject * args) {
return ret;
}
PyObject *py_uwsgi_websocket_recv_nb(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = py_current_wsgi_req();
UWSGI_RELEASE_GIL
struct uwsgi_buffer *ub = uwsgi_websocket_recv_nb(wsgi_req);
UWSGI_GET_GIL
if (!ub) {
return PyErr_Format(PyExc_IOError, "unable to receive websocket message");
}
PyObject *ret = PyString_FromStringAndSize(ub->buf, ub->pos);
uwsgi_buffer_destroy(ub);
return ret;
}
PyObject *py_uwsgi_embedded_data(PyObject * self, PyObject * args) {
@@ -1634,7 +1648,7 @@ PyObject *py_uwsgi_send_spool(PyObject * self, PyObject * args, PyObject *kw) {
char *cur_buf;
int i;
char spool_filename[1024];
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
char *priority = NULL;
long numprio = 0;
time_t at = 0;
@@ -2212,7 +2226,7 @@ PyObject *py_uwsgi_mem(PyObject * self, PyObject * args) {
PyObject *py_uwsgi_cl(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
return PyLong_FromUnsignedLongLong(wsgi_req->post_cl);
@@ -2220,7 +2234,7 @@ PyObject *py_uwsgi_cl(PyObject * self, PyObject * args) {
PyObject *py_uwsgi_disconnect(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
#ifdef UWSGI_DEBUG
uwsgi_log("disconnecting worker %d (pid :%d) from session...\n", uwsgi.mywid, uwsgi.mypid);
@@ -2354,7 +2368,7 @@ PyObject *py_uwsgi_parse_file(PyObject * self, PyObject * args) {
PyObject *py_uwsgi_grunt(PyObject * self, PyObject * args) {
pid_t grunt_pid;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
if (uwsgi.grunt) {
uwsgi_log("spawning a grunt from worker %d (pid :%d)...\n", uwsgi.mywid, uwsgi.mypid);
@@ -2425,7 +2439,7 @@ static PyMethodDef uwsgi_spooler_methods[] = {
PyObject *py_uwsgi_suspend(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
if (uwsgi.schedule_to_main) uwsgi.schedule_to_main(wsgi_req);
@@ -2522,6 +2536,7 @@ static PyMethodDef uwsgi_advanced_methods[] = {
//{"call_hook", py_uwsgi_call_hook, METH_VARARGS, ""},
{"websocket_recv", py_uwsgi_websocket_recv, METH_VARARGS, ""},
{"websocket_recv_nb", py_uwsgi_websocket_recv_nb, METH_VARARGS, ""},
{"websocket_send", py_uwsgi_websocket_send, METH_VARARGS, ""},
{"websocket_handshake", py_uwsgi_websocket_handshake, METH_VARARGS, ""},
+5
View File
@@ -267,6 +267,11 @@ PyObject *uwsgi_python_setup_thread(char *);
#undef UWSGI_MINTERPRETERS
#endif
#define py_current_wsgi_req() current_wsgi_req();\
if (!wsgi_req) {\
return PyErr_Format(PyExc_SystemError, "you can call uwsgi api function only from the main callable");\
}
#define uwsgi_pyexit {PyErr_Print();exit(1);}
#ifdef __linux__
+4 -4
View File
@@ -187,7 +187,7 @@ PyObject *py_uwsgi_write(PyObject * self, PyObject * args) {
char *content;
size_t content_len;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
data = PyTuple_GetItem(args, 0);
if (PyString_Check(data)) {
@@ -211,7 +211,7 @@ PyObject *py_uwsgi_write(PyObject * self, PyObject * args) {
PyObject *py_eventfd_read(PyObject * self, PyObject * args) {
int fd, timeout = 0;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
if (!PyArg_ParseTuple(args, "i|i", &fd, &timeout)) {
return NULL;
@@ -228,7 +228,7 @@ PyObject *py_eventfd_read(PyObject * self, PyObject * args) {
PyObject *py_eventfd_write(PyObject * self, PyObject * args) {
int fd, timeout = 0;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
if (!PyArg_ParseTuple(args, "i|i", &fd, &timeout)) {
return NULL;
@@ -458,7 +458,7 @@ void uwsgi_after_request_wsgi(struct wsgi_request *wsgi_req) {
PyObject *py_uwsgi_sendfile(PyObject * self, PyObject * args) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
if (!PyArg_ParseTuple(args, "O|i:uwsgi_sendfile", &wsgi_req->async_sendfile, &wsgi_req->sendfile_fd_chunk)) {
return NULL;
+1 -1
View File
@@ -11,7 +11,7 @@ PyObject *py_uwsgi_spit(PyObject * self, PyObject * args) {
PyObject *exc_info = NULL;
size_t i;
struct wsgi_request *wsgi_req = current_wsgi_req();
struct wsgi_request *wsgi_req = py_current_wsgi_req();
// avoid double sending of headers
if (wsgi_req->headers_sent) {
+16
View File
@@ -930,6 +930,21 @@ VALUE uwsgi_ruby_websocket_recv(VALUE *class) {
}
VALUE uwsgi_ruby_websocket_recv_nb(VALUE *class) {
struct wsgi_request *wsgi_req = current_wsgi_req();
struct uwsgi_buffer *ub = uwsgi_websocket_recv_nb(wsgi_req);
if (!ub) {
rb_raise(rb_eRuntimeError, "unable to receive websocket message");
return Qnil;
}
VALUE ret = rb_str_new(ub->buf, ub->pos);
uwsgi_buffer_destroy(ub);
return ret;
}
void uwsgi_rack_init_api() {
@@ -957,6 +972,7 @@ void uwsgi_rack_init_api() {
uwsgi_rack_api("websocket_handshake", uwsgi_ruby_websocket_handshake, -1);
uwsgi_rack_api("websocket_send", uwsgi_ruby_websocket_send, 1);
uwsgi_rack_api("websocket_recv", uwsgi_ruby_websocket_recv, 0);
uwsgi_rack_api("websocket_recv_nb", uwsgi_ruby_websocket_recv_nb, 0);
uwsgi_rack_api("setprocname", rack_uwsgi_setprocname, 1);
uwsgi_rack_api("mem", rack_uwsgi_mem, 0);
+19 -7
View File
@@ -1,6 +1,10 @@
#!./uwsgi --https :8443,foobar.crt,foobar.key --http-raw-body --gevent 100 --channel room001 --processes 4 --module tests.websocket
#!./uwsgi --https :8443,foobar.crt,foobar.key --http-websockets --gevent 100 --module tests.websocket
import uwsgi
import time
import gevent
from gevent.queue import JoinableQueue
from gevent.socket import wait_read
queue = JoinableQueue()
def application(env, sr):
@@ -51,9 +55,17 @@ def application(env, sr):
elif env['PATH_INFO'] == '/foobar/':
uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', ''))
print "websockets..."
uwsgi.websocket_channel_join('room001')
while True:
msg = uwsgi.websocket_recv()
print len(msg)
#uwsgi.websocket_send("hello %s = %s" % (time.time(), msg))
uwsgi.channel_send('room001', "channel %s = %s" % (time.time(), msg))
msg = uwsgi.websocket_recv_nb()
if msg:
queue.put(msg)
else:
try:
wait_read(uwsgi.connection_fd(), 0.1)
except gevent.socket.timeout:
try:
msg = queue.get_nowait()
uwsgi.websocket_send(msg)
except:
pass
return ""
+1 -3
View File
@@ -2187,7 +2187,6 @@ struct uwsgi_server {
struct uwsgi_buffer *websockets_ping;
struct uwsgi_buffer *websockets_pong;
int websockets_ping_freq;
int websockets_pong_freq;
uint64_t websockets_max_size;
int (*wait_write_hook) (int, int);
@@ -3724,13 +3723,12 @@ void uwsgi_subscribe_all(uint8_t, int);
void uwsgi_websockets_init(void);
int uwsgi_websocket_send(struct wsgi_request *, char *, size_t);
struct uwsgi_buffer *uwsgi_websocket_recv(struct wsgi_request *);
struct uwsgi_buffer *uwsgi_websocket_recv_nb(struct wsgi_request *);
uint16_t uwsgi_be16(char *);
uint32_t uwsgi_be32(char *);
uint64_t uwsgi_be64(char *);
int uwsgi_websockets_pong(struct wsgi_request *);
int uwsgi_websocket_handshake(struct wsgi_request *, char *, uint16_t, char *, uint16_t);
int uwsgi_response_prepare_headers(struct wsgi_request *, char *, uint16_t);