support for websockets over https

This commit is contained in:
Roberto De Ioris
2013-01-13 12:46:03 +01:00
parent 7ac4282047
commit ee129f02bc
8 changed files with 273 additions and 15 deletions
+5 -2
View File
@@ -1,3 +1,8 @@
/*
apxs2 -i -c mod_proxy_uwsgi.c
*/
#define APR_WANT_MEMFUNC
#define APR_WANT_STRFUNC
#include "apr_strings.h"
@@ -344,8 +349,6 @@ static int uwsgi_handler(request_rec *r, proxy_worker *worker,
// ADD PATH_INFO
size_t w_len = strlen(worker->name);
char *u_path_info = r->filename + 6 + w_len;
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
"URL %s: %s %s %s", url, worker->name, r->filename, u_path_info);
int delta = 0;
if (u_path_info[0] != '/') {
delta = 1;
+13
View File
@@ -208,6 +208,19 @@ void uwsgi_buffer_destroy(struct uwsgi_buffer *ub) {
free(ub);
}
ssize_t uwsgi_buffer_write_simple(struct wsgi_request *wsgi_req, struct uwsgi_buffer *ub) {
size_t remains = ub->pos;
while(remains) {
ssize_t len = write(wsgi_req->poll.fd, ub->buf + (ub->pos - remains), remains);
if (len <= 0) {
return len;
}
remains -= len;
}
return ub->pos;
}
int uwsgi_buffer_send(struct uwsgi_buffer *ub, int fd) {
size_t remains = ub->pos;
char *ptr = ub->buf;
+1
View File
@@ -90,6 +90,7 @@ void uwsgi_init_default() {
uwsgi.multicast_loop = 1;
#endif
uwsgi.buffer_write_hook = uwsgi_buffer_write_simple;
uwsgi_websockets_init();
}
+36
View File
@@ -354,6 +354,42 @@ retry:
return -1;
}
int uwsgi_websocket_handshake(struct wsgi_request *wsgi_req, char *key, uint16_t key_len, char *origin, uint16_t origin_len) {
#ifdef UWSGI_SSL
char sha1[20];
struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
if (uwsgi_buffer_append(ub, "HTTP/1.1 101 Web Socket Protocol Handshake\r\n", 44)) goto end;
if (uwsgi_buffer_append(ub, "Upgrade: WebSocket\r\n", 20)) goto end;
if (uwsgi_buffer_append(ub, "Connection: Upgrade\r\n", 21)) goto end;
if (uwsgi_buffer_append(ub, "Sec-WebSocket-Origin: ", 22)) goto end;
if (origin_len > 0) {
if (uwsgi_buffer_append(ub, origin, origin_len)) goto end;
}
else {
if (uwsgi_buffer_append(ub, "*", 1)) goto end;
}
if (uwsgi_buffer_append(ub, "\r\nSec-WebSocket-Accept: ", 24)) goto end;
if (!uwsgi_sha1_2n(key, key_len, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", 36, sha1)) goto end;
if (uwsgi_buffer_append_base64(ub, sha1, 20)) goto end;
if (uwsgi_buffer_append(ub, "\r\n\r\n", 4)) goto end;
ssize_t len = uwsgi.buffer_write_hook(wsgi_req, ub);
if (len <= 0) {
goto end;
}
wsgi_req->headers_size += len;
wsgi_req->header_cnt += 4;
uwsgi_buffer_destroy(ub);
return 0;
end:
uwsgi_buffer_destroy(ub);
return -1;
#else
uwsgi_log("you need to build uWSGI with SSL support to use the websocket handshake api function !!!\n");
return -1;
#endif
}
void uwsgi_websockets_init() {
uwsgi.websockets_hook_send = uwsgi_websockets_simple_send;
uwsgi.websockets_hook_recv = uwsgi_websockets_simple_recv;
+180 -11
View File
@@ -23,6 +23,17 @@ extern struct uwsgi_python up;
Py_DECREF(current); Py_DECREF(current_greenlet);\
Py_DECREF(watcher);\
#define stop_the_c_watchers if (c_watchers) { int j;\
for(j=0;j<count;j++) {\
if (c_watchers[j]) {\
ret = PyObject_CallMethod(watcher, "stop", NULL);\
if (ret) { Py_DECREF(ret); }\
Py_DECREF(c_watchers[j]);\
}\
}\
free(c_watchers);\
}
@@ -352,13 +363,46 @@ ssize_t uwsgi_websockets_gevent_recv(struct wsgi_request *wsgi_req) {
return -1;
}
int count = 0;
struct uwsgi_channel *channel = uwsgi.channels;
while(channel) {
int pos = (uwsgi.cores * (uwsgi.mywid - 1)) + wsgi_req->async_id;
if (channel->subscriptions[pos] == 2) {
count++;
}
channel = channel->next;
}
PyObject *current_greenlet = GET_CURRENT_GREENLET;
PyObject *current = PyObject_GetAttrString(current_greenlet, "switch");
PyObject **c_watchers = NULL;
if (count > 0) {
c_watchers = uwsgi_calloc(sizeof(PyObject *) * count);
count = 0;
channel = uwsgi.channels;
while(channel) {
int pos = (uwsgi.cores * (uwsgi.mywid - 1)) + wsgi_req->async_id;
if (channel->subscriptions[pos] == 2) {
c_watchers[count] = PyObject_CallMethod(ugevent.hub_loop, "io", "ii", channel->fd[(pos*2)+1], 1);
if (!c_watchers[count]) {
PyObject *ret;
stop_the_c_watchers
stop_the_watchers_and_clear
return -1;
}
count++;
}
channel = channel->next;
}
}
for(;;) {
PyObject *ret = PyObject_CallMethod(watcher, "start", "OO", current, watcher);
if (!ret) {
stop_the_c_watchers
stop_the_watchers_and_clear
return -1;
}
@@ -366,14 +410,30 @@ ssize_t uwsgi_websockets_gevent_recv(struct wsgi_request *wsgi_req) {
ret = PyObject_CallMethod(timer, "start", "OO", current, timer);
if (!ret) {
stop_the_c_watchers
stop_the_watchers_and_clear
return -1;
}
Py_DECREF(ret);
if (c_watchers) {
int j;
for(j=0;j<count;j++) {
ret = PyObject_CallMethod(c_watchers[j], "start", "OO", current, c_watchers[j]);
if (!ret) {
stop_the_c_watchers
stop_the_watchers_and_clear
return -1;
}
Py_DECREF(ret);
}
}
ret = PyObject_CallMethod(ugevent.hub, "switch", NULL);
wsgi_req->switches++;
if (!ret) {
stop_the_c_watchers
stop_the_watchers_and_clear
return -1;
}
@@ -385,30 +445,67 @@ ssize_t uwsgi_websockets_gevent_recv(struct wsgi_request *wsgi_req) {
Py_DECREF(timer);
//unsolicited pong
if (uwsgi_websockets_pong(wsgi_req)) {
stop_the_c_watchers
stop_the_io_and_clear;
return -1;
}
// create a new timer
timer = PyObject_CallMethod(ugevent.hub_loop, "timer", "i", uwsgi.websockets_pong_freq);
if (!timer) {
stop_the_c_watchers
stop_the_io_and_clear;
return -1;
}
continue;
}
UWSGI_RELEASE_GIL;
ssize_t len = read(wsgi_req->poll.fd, wsgi_req->websocket_buf->buf + wsgi_req->websocket_buf->pos, wsgi_req->websocket_buf->len - wsgi_req->websocket_buf->pos);
if (len <= 0)
uwsgi_error("[uwsgi-websocket] uwsgi_websockets_simple_recv()/read()");
UWSGI_GET_GIL
stop_the_watchers_and_clear
return len;
if (ret == watcher) {
ssize_t len = read(wsgi_req->poll.fd, wsgi_req->websocket_buf->buf + wsgi_req->websocket_buf->pos, wsgi_req->websocket_buf->len - wsgi_req->websocket_buf->pos);
if (len <= 0)
uwsgi_error("[uwsgi-websocket] uwsgi_websockets_gevent_recv()/read()");
stop_the_c_watchers
stop_the_watchers_and_clear
return len;
}
channel = uwsgi.channels;
int c_pos = 0;
while(channel) {
if (ret == c_watchers[c_pos]) {
int pos = (uwsgi.cores * (uwsgi.mywid - 1)) + wsgi_req->async_id;
int cfd = channel->fd[(pos*2)+1];
struct uwsgi_buffer *ub = uwsgi_buffer_new(channel->max_packet_size);
ssize_t len = read(cfd, ub->buf, ub->len);
if (len <= 0) {
uwsgi_buffer_destroy(ub);
uwsgi_error("[uwsgi-websocket] uwsgi_websockets_gevent_recv()/read()");
stop_the_c_watchers
stop_the_watchers_and_clear
return -1;
}
ub->pos += len;
if (uwsgi_websocket_send(wsgi_req, ub->buf, ub->pos) <= 0) {
uwsgi_buffer_destroy(ub);
stop_the_c_watchers
stop_the_watchers_and_clear
return -1;
}
uwsgi_buffer_destroy(ub);
break;
}
c_pos++;
channel = channel->next;
}
}
return -1;
}
// not gil as it is called by api
struct uwsgi_buffer *uwsgi_channel_gevent_recv(struct wsgi_request *wsgi_req, int fd, struct uwsgi_buffer *ub, int timeout) {
/// create a watcher for reads
@@ -459,9 +556,7 @@ struct uwsgi_buffer *uwsgi_channel_gevent_recv(struct wsgi_request *wsgi_req, in
return ub;
}
UWSGI_RELEASE_GIL;
ssize_t len = read(fd, ub->buf, ub->len);
UWSGI_GET_GIL
stop_the_watchers_and_clear
if (len <= 0) return NULL;
ub->pos += len;
@@ -470,6 +565,7 @@ struct uwsgi_buffer *uwsgi_channel_gevent_recv(struct wsgi_request *wsgi_req, in
// no gil
ssize_t uwsgi_websockets_gevent_send(struct wsgi_request *wsgi_req, struct uwsgi_buffer *ub) {
PyObject *ret = NULL;
@@ -508,9 +604,7 @@ ssize_t uwsgi_websockets_gevent_send(struct wsgi_request *wsgi_req, struct uwsgi
Py_DECREF(ret);
// ok we can write a chunk to the socket
UWSGI_RELEASE_GIL
ssize_t len = write(wsgi_req->poll.fd, ptr, remains);
UWSGI_GET_GIL
if (len > 0) {
ptr += len;
remains -= len;
@@ -544,6 +638,79 @@ error:
return -1;
}
//no gil
ssize_t uwsgi_buffer_gevent_write(struct wsgi_request *wsgi_req, struct uwsgi_buffer *ub) {
PyObject *ret = NULL;
char *content = ub->buf;
size_t content_len = ub->pos;
// do not try to write empty chunks (returns 1 for making all happy...)
if (content_len == 0) return 1;
/// create a watcher for writes
PyObject *watcher = PyObject_CallMethod(ugevent.hub_loop, "io", "ii", wsgi_req->poll.fd, 2);
if (!watcher) goto error;
PyObject *current_greenlet = GET_CURRENT_GREENLET;
PyObject *current = PyObject_GetAttrString(current_greenlet, "switch");
char *ptr = content;
size_t remains = content_len;
// this is the main writing cycle, wait for writability and send...
for(;;) {
ret = PyObject_CallMethod(watcher, "start", "OO", current, watcher);
if (!ret) {
stop_the_io_and_clear
goto error;
}
Py_DECREF(ret);
ret = PyObject_CallMethod(ugevent.hub, "switch", NULL);
wsgi_req->switches++;
if (!ret) {
stop_the_io_and_clear
goto error;
}
Py_DECREF(ret);
// ok we can write a chunk to the socket
ssize_t len = write(wsgi_req->poll.fd, ptr, remains);
if (len > 0) {
ptr += len;
remains -= len;
if (remains == 0) {
break;
}
stop_the_io
continue;
}
else if (len < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {
stop_the_io
continue;
}
}
stop_the_io_and_clear
goto error;
}
stop_the_io
Py_DECREF(current); Py_DECREF(current_greenlet);
Py_DECREF(watcher);
return 1;
error:
if (PyErr_Occurred())
PyErr_Print();
wsgi_req->write_errors++;
return -1;
}
void uwsgi_gevent_nb_write(struct wsgi_request *wsgi_req, PyObject *str) {
PyObject *ret;
@@ -786,6 +953,8 @@ void gevent_loop() {
uwsgi.websockets_hook_recv = uwsgi_websockets_gevent_recv;
// change channels hooks
uwsgi.channel_recv_hook = uwsgi_channel_gevent_recv;
// buffer write generic hook
uwsgi.buffer_write_hook = uwsgi_buffer_gevent_write;
struct uwsgi_socket *uwsgi_sock = uwsgi.sockets;
+26
View File
@@ -1089,6 +1089,31 @@ PyObject *py_uwsgi_connection_fd(PyObject * self, PyObject * args) {
return PyInt_FromLong(wsgi_req->poll.fd);
}
PyObject *py_uwsgi_websocket_handshake(PyObject * self, PyObject * args) {
char *key = NULL;
Py_ssize_t key_len = 0;
char *origin = NULL;
Py_ssize_t origin_len = 0;
if (!PyArg_ParseTuple(args, "s#|s#:websocket_handshake", &key, &key_len, &origin, &origin_len)) {
return NULL;
}
struct wsgi_request *wsgi_req = current_wsgi_req();
UWSGI_RELEASE_GIL
int ret = uwsgi_websocket_handshake(wsgi_req, key, key_len, origin, origin_len);
UWSGI_GET_GIL
if (ret) {
return PyErr_Format(PyExc_IOError, "unable to complete websocket handshake");
}
Py_INCREF(Py_None);
return Py_None;
}
PyObject *py_uwsgi_websocket_send(PyObject * self, PyObject * args) {
char *message = NULL;
Py_ssize_t message_len = 0;
@@ -3445,6 +3470,7 @@ static PyMethodDef uwsgi_advanced_methods[] = {
{"websocket_recv", py_uwsgi_websocket_recv, METH_VARARGS, ""},
{"websocket_send", py_uwsgi_websocket_send, METH_VARARGS, ""},
{"websocket_channel_join", py_uwsgi_websocket_channel_join, METH_VARARGS, ""},
{"websocket_handshake", py_uwsgi_websocket_handshake, METH_VARARGS, ""},
{"channel_join", py_uwsgi_channel_join, METH_VARARGS, ""},
{"channel_leave", py_uwsgi_channel_join, METH_VARARGS, ""},
+7 -2
View File
@@ -3,13 +3,17 @@ import time
def application(env, sr):
ws_scheme = 'ws'
if 'HTTPS' in env or env['wsgi.scheme'] == 'https':
ws_scheme = 'wss'
if env['PATH_INFO'] == '/':
sr('200 OK', [('Content-Type','text/html')])
return """
<html>
<head>
<script language="Javascript">
var s = new WebSocket("ws://raring64.local:8181/foobar/");
var s = new WebSocket("%s://%s/foobar/");
s.onopen = function() {
alert("connected !!!");
s.send("ciao");
@@ -42,8 +46,9 @@ def application(env, sr):
</div>
</body>
</html>
"""
""" % (ws_scheme, env['HTTP_HOST'])
elif env['PATH_INFO'] == '/foobar/':
uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', ''))
print "websockets..."
uwsgi.websocket_channel_join('room001')
while True:
+5
View File
@@ -2072,6 +2072,8 @@ struct uwsgi_server {
struct uwsgi_channel *channels;
struct uwsgi_buffer *(*channel_recv_hook)(struct wsgi_request *, int, struct uwsgi_buffer *, int);
ssize_t (*buffer_write_hook)(struct wsgi_request *, struct uwsgi_buffer *);
};
struct uwsgi_rpc {
@@ -3572,6 +3574,7 @@ int uwsgi_buffer_append_ipv4(struct uwsgi_buffer *, void *);
int uwsgi_buffer_append_keyipv4(struct uwsgi_buffer *, char *, uint16_t, void *);
int uwsgi_buffer_decapitate(struct uwsgi_buffer *, size_t);
int uwsgi_buffer_append_base64(struct uwsgi_buffer *, char *, size_t);
ssize_t uwsgi_buffer_write_simple(struct wsgi_request *, struct uwsgi_buffer *);
void uwsgi_httpize_var(char *, size_t);
struct uwsgi_buffer *uwsgi_to_http(struct wsgi_request *, char *, uint16_t, char *, uint16_t);
@@ -3751,6 +3754,8 @@ void uwsgi_channels_leave(struct wsgi_request *);
struct uwsgi_buffer *uwsgi_channel_recv(struct wsgi_request *, struct uwsgi_channel *, int);
void uwsgi_channels_reset_worker_subscriptions(int);
int uwsgi_websocket_handshake(struct wsgi_request *, char *, uint16_t, char *, uint16_t);
void uwsgi_check_emperor(void);
#ifdef UWSGI_AS_SHARED_LIBRARY
int uwsgi_init(int, char **, char **);