first part of the fastcgi porting to the new api

This commit is contained in:
Unbit
2013-02-14 17:30:23 +01:00
parent f70c7c1e17
commit 7d8a2968d5
5 changed files with 194 additions and 431 deletions
-119
View File
@@ -1258,125 +1258,6 @@ int uwsgi_string_sendto(int fd, uint8_t modifier1, uint8_t modifier2, struct soc
return rlen;
}
ssize_t fcgi_send_param(int fd, char *key, uint16_t keylen, char *val, uint16_t vallen) {
struct fcgi_record fr;
struct iovec iv[5];
uint8_t ks1 = 0;
uint32_t ks4 = 0;
uint8_t vs1 = 0;
uint32_t vs4 = 0;
uint16_t size = keylen + vallen;
if (keylen > 127) {
size += 4;
ks4 = htonl(keylen) | 0x80000000;
iv[1].iov_base = &ks4;
iv[1].iov_len = 4;
}
else {
size += 1;
ks1 = keylen;
iv[1].iov_base = &ks1;
iv[1].iov_len = 1;
}
if (vallen > 127) {
size += 4;
vs4 = htonl(vallen) | 0x80000000;
iv[2].iov_base = &vs4;
iv[2].iov_len = 4;
}
else {
size += 1;
vs1 = vallen;
iv[2].iov_base = &vs1;
iv[2].iov_len = 1;
}
iv[3].iov_base = key;
iv[3].iov_len = keylen;
iv[4].iov_base = val;
iv[4].iov_len = vallen;
fr.version = 1;
fr.type = 4;
fr.req1 = 0;
fr.req0 = 1;
fr.cl8.cl1 = (uint8_t) ((size >> 8) & 0xff);
fr.cl8.cl0 = (uint8_t) (size & 0xff);
fr.pad = 0;
fr.reserved = 0;
iv[0].iov_base = &fr;
iv[0].iov_len = 8;
return writev(fd, iv, 5);
}
ssize_t fcgi_send_record(int fd, uint8_t type, uint16_t size, char *buffer) {
struct fcgi_record fr;
struct iovec iv[2];
fr.version = 1;
fr.type = type;
fr.req1 = 0;
fr.req0 = 1;
fr.cl8.cl1 = (uint8_t) ((size >> 8) & 0xff);
fr.cl8.cl0 = (uint8_t) (size & 0xff);
fr.pad = 0;
fr.reserved = 0;
iv[0].iov_base = &fr;
iv[0].iov_len = 8;
iv[1].iov_base = buffer;
iv[1].iov_len = size;
return writev(fd, iv, 2);
}
uint16_t fcgi_get_record(int fd, char *buf) {
struct fcgi_record fr;
uint16_t remains = 8;
char *ptr = (char *) &fr;
ssize_t len;
while (remains) {
uwsgi_waitfd(fd, -1);
len = read(fd, ptr, remains);
if (len <= 0)
return 0;
remains -= len;
ptr += len;
}
remains = ntohs(fr.cl) + fr.pad;
ptr = buf;
while (remains) {
uwsgi_waitfd(fd, -1);
len = read(fd, ptr, remains);
if (len <= 0)
return 0;
remains -= len;
ptr += len;
}
if (fr.type != 6)
return 0;
return ntohs(fr.cl);
}
char *uwsgi_req_append(struct wsgi_request *wsgi_req, char *key, uint16_t keylen, char *val, uint16_t vallen) {
if ((wsgi_req->uh->pktsize + (2 + keylen + 2 + vallen)) > uwsgi.buffer_size) {
+7 -3
View File
@@ -1813,9 +1813,13 @@ setup_proto:
else if (requested_protocol && (!strcmp("fastcgi", requested_protocol) || !strcmp("fcgi", requested_protocol))) {
uwsgi_sock->proto = uwsgi_proto_fastcgi_parser;
uwsgi_sock->proto_accept = uwsgi_proto_base_accept;
uwsgi_sock->proto_write = uwsgi_proto_fastcgi_write;
//uwsgi_sock->proto_write_header = uwsgi_proto_fastcgi_write_header;
//uwsgi_sock->proto_sendfile = uwsgi_proto_fastcgi_sendfile;
uwsgi_sock->proto_prepare_headers = uwsgi_proto_base_prepare_headers;
uwsgi_sock->proto_add_header = uwsgi_proto_base_add_header;
uwsgi_sock->proto_fix_headers = uwsgi_proto_base_fix_headers;
uwsgi_sock->proto_read_body = uwsgi_proto_base_read_body;
uwsgi_sock->proto_write = uwsgi_proto_fastcgi_write;
uwsgi_sock->proto_write_headers = uwsgi_proto_fastcgi_write;
uwsgi_sock->proto_sendfile = uwsgi_proto_fastcgi_sendfile;
uwsgi_sock->proto_close = uwsgi_proto_fastcgi_close;
}
#ifdef UWSGI_ZEROMQ
+184 -283
View File
@@ -1,322 +1,223 @@
/* async fastcgi protocol parser */
#include "../uwsgi.h"
#include <uwsgi.h>
extern struct uwsgi_server uwsgi;
#define PROTO_STATUS_RECV_HDR 0
#define PROTO_STATUS_RECV_BODY 1
#define FCGI_END_REQUEST "\1\x06\0\1\0\0\0\0\1\3\0\1\0\x08\0\0\0\0\0\0\0\0\0\0"
struct fcgi_record {
uint8_t version;
uint8_t type;
uint8_t req1;
uint8_t req0;
uint8_t cl1;
uint8_t cl0;
uint8_t pad;
uint8_t reserved;
} __attribute__ ((__packed__));
// convert fastcgi params to uwsgi key/val
int fastcgi_to_uwsgi(struct wsgi_request *wsgi_req, char *buf, size_t len) {
size_t j;
uint8_t octet;
uint32_t keylen, vallen;
for (j = 0; j < len; j++) {
octet = (uint8_t) buf[j];
if (octet > 127) {
if (j + 4 >= len) return -1;
keylen = uwsgi_be32(&buf[j]) ^ 0x80000000;
j += 4;
}
else {
if (j+1 >= len) return -1;
keylen = octet;
j++;
}
octet = (uint8_t) buf[j];
if (octet > 127) {
if (j + 4 >= len) return -1;
vallen = uwsgi_be32(&buf[j]) ^ 0x80000000;
j += 4;
}
else {
if (j+1 >= len) return -1;
vallen = octet;
j++;
}
if (j + (keylen + vallen) > len) {
return -1;
}
if (keylen > 0xffff || vallen > 0xffff) return -1;
uint16_t pktsize = proto_base_add_uwsgi_var(wsgi_req, buf + j, keylen, buf + j + keylen, vallen);
if (pktsize == 0) return -1;
wsgi_req->uh->pktsize += pktsize;
// -1 here as the for() will increment j again
j += (keylen + vallen) - 1;
}
return 0;
}
/*
each fastcgi packet is composed by a header and a body
the parser rebuild a whole packet until it find a 0 PARAMS or a STDIN one
*/
int uwsgi_proto_fastcgi_parser(struct wsgi_request *wsgi_req) {
ssize_t len;
struct fcgi_record *fr;
uint16_t rs;
int j;
uint8_t octet;
uint32_t keylen, vallen;
// allocate space for a fastcgi record
if (!wsgi_req->proto_parser_buf) {
wsgi_req->proto_parser_buf = uwsgi_malloc(8 + 65536);
wsgi_req->proto_parser_buf = uwsgi_malloc(uwsgi.buffer_size);
wsgi_req->proto_parser_buf_size = uwsgi.buffer_size;
}
if (wsgi_req->proto_parser_status == PROTO_STATUS_RECV_HDR) {
len = read(wsgi_req->fd, wsgi_req->proto_parser_buf + wsgi_req->proto_parser_pos, 8 - wsgi_req->proto_parser_pos);
if (len <= 0) {
free(wsgi_req->proto_parser_buf);
uwsgi_error("read()");
return -1;
}
wsgi_req->proto_parser_pos += len;
if (wsgi_req->proto_parser_pos >= 8) {
fr = (struct fcgi_record *) wsgi_req->proto_parser_buf;
rs = ntohs(fr->cl);
#ifdef UWSGI_DEBUG
uwsgi_log("fastcgi rs %d type %d\n", rs, fr->type);
#endif
// empty STDIN ?
if (fr->type == 5 && rs == 0) {
wsgi_req->proto_parser_status = 0;
if (wsgi_req->post_file) {
rewind(wsgi_req->post_file);
}
free(wsgi_req->proto_parser_buf);
return UWSGI_OK;
}
wsgi_req->proto_parser_pos = 0;
if (rs == 0) {
return UWSGI_AGAIN;
}
wsgi_req->proto_parser_status = PROTO_STATUS_RECV_BODY;
}
return UWSGI_AGAIN;
}
else if (wsgi_req->proto_parser_status == PROTO_STATUS_RECV_BODY) {
fr = (struct fcgi_record *) wsgi_req->proto_parser_buf;
rs = ntohs(fr->cl);
len = read(wsgi_req->fd, wsgi_req->proto_parser_buf + 8 + wsgi_req->proto_parser_pos, (rs + fr->pad) - wsgi_req->proto_parser_pos);
if (len <= 0) {
free(wsgi_req->proto_parser_buf);
uwsgi_error("read()");
return -1;
}
wsgi_req->proto_parser_pos += len;
if (wsgi_req->proto_parser_pos >= rs + fr->pad) {
#ifdef UWSGI_DEBUG
uwsgi_log("record parsed = type: %d size: %d padding: %d\n", fr->type, rs, fr->pad);
#endif
// params
if (fr->type == 4) {
for (j = 0; j < rs; j++) {
octet = ((uint8_t *) wsgi_req->proto_parser_buf)[8 + j];
if (octet > 127) {
if (j + 4 >= rs) {
free(wsgi_req->proto_parser_buf);
return -1;
}
memcpy(&keylen, wsgi_req->proto_parser_buf + 8 + j, 4);
keylen = ntohl(keylen) ^ 0x80000000;
j += 4;
}
else {
if (j + 1 >= rs) {
free(wsgi_req->proto_parser_buf);
return -1;
}
keylen = octet;
j++;
}
octet = ((uint8_t *) wsgi_req->proto_parser_buf)[8 + j];
if (octet > 127) {
if (j + 4 > rs) {
free(wsgi_req->proto_parser_buf);
return -1;
}
memcpy(&vallen, wsgi_req->proto_parser_buf + 8 + j, 4);
vallen = ntohl(vallen) ^ 0x80000000;
j += 4;
}
else {
if (j + 1 > rs) {
free(wsgi_req->proto_parser_buf);
return -1;
}
vallen = octet;
j++;
}
if (j + (keylen + vallen) > rs) {
free(wsgi_req->proto_parser_buf);
return -1;
}
if (keylen <= 0xffff && vallen <= 0xffff) {
#ifdef UWSGI_DEBUG
uwsgi_log("keylen %d %.*s vallen %d %.*s\n", keylen, keylen, wsgi_req->proto_parser_buf + 8 + j, vallen, vallen, wsgi_req->proto_parser_buf + 8 + j + keylen);
#endif
wsgi_req->uh->pktsize += proto_base_add_uwsgi_var(wsgi_req, wsgi_req->proto_parser_buf + 8 + j, keylen, wsgi_req->proto_parser_buf + 8 + j + keylen, vallen);
}
j += (keylen + vallen) - 1;
}
}
// stdin
else if (fr->type == 5) {
if (!wsgi_req->post_file) {
wsgi_req->post_file = tmpfile();
if (!wsgi_req->post_file) {
free(wsgi_req->proto_parser_buf);
uwsgi_error("tmpfile()");
ssize_t len = read(wsgi_req->fd, wsgi_req->proto_parser_buf + wsgi_req->proto_parser_pos, wsgi_req->proto_parser_buf_size - wsgi_req->proto_parser_pos);
if (len > 0) {
goto parse;
}
if (len < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {
return UWSGI_AGAIN;
}
uwsgi_error("uwsgi_proto_fastcgi_parser()");
return -1;
}
// mute on 0 len...
if (wsgi_req->proto_parser_pos > 0) {
uwsgi_error("uwsgi_proto_fastcgi_parser()");
}
return -1;
parse:
wsgi_req->proto_parser_pos += len;
// ok let's see what we need to do
for(;;) {
if (wsgi_req->proto_parser_pos >= sizeof(struct fcgi_record)) {
struct fcgi_record *fr = (struct fcgi_record *) wsgi_req->proto_parser_buf;
uint16_t fcgi_len = uwsgi_be16((char *)&fr->cl1);
// if STDIN, end of the loop
if (fr->type == 5) return UWSGI_OK;
// if we have a full packet, parse it and reset the memory
if (wsgi_req->proto_parser_pos >= fcgi_len+sizeof(struct fcgi_record)+fr->pad) {
// PARAMS ? (ignore other types)
if (fr->type == 4) {
if (fastcgi_to_uwsgi(wsgi_req, wsgi_req->proto_parser_buf + sizeof(struct fcgi_record), fcgi_len)) {
return -1;
}
}
if (!fwrite(wsgi_req->proto_parser_buf + 8, rs, 1, wsgi_req->post_file)) {
free(wsgi_req->proto_parser_buf);
uwsgi_error("fwrite()");
memmove(wsgi_req->proto_parser_buf, wsgi_req->proto_parser_buf + (fcgi_len+sizeof(struct fcgi_record)+fr->pad), wsgi_req->proto_parser_pos - (fcgi_len+sizeof(struct fcgi_record)+fr->pad));
wsgi_req->proto_parser_pos -= (fcgi_len+sizeof(struct fcgi_record)+fr->pad);
// end of PARAMS
if (fr->type == 4 && fcgi_len == 0) {
return UWSGI_OK;
}
}
else if ((fcgi_len+fr->pad) > wsgi_req->proto_parser_buf_size - wsgi_req->proto_parser_pos) {
char *tmp_buf = realloc(wsgi_req->proto_parser_buf, wsgi_req->proto_parser_buf_size + ((fcgi_len+fr->pad) - (wsgi_req->proto_parser_buf_size - wsgi_req->proto_parser_pos)));
if (!tmp_buf) {
uwsgi_error("uwsgi_proto_fastcgi_parser()/realloc()");
return -1;
}
wsgi_req->proto_parser_buf = tmp_buf;
wsgi_req->proto_parser_buf_size += ((fcgi_len+fr->pad) - (wsgi_req->proto_parser_buf_size - wsgi_req->proto_parser_pos));
break;
}
wsgi_req->proto_parser_status = PROTO_STATUS_RECV_HDR;
wsgi_req->proto_parser_pos = 0;
else {
break;
}
}
else {
break;
}
}
return UWSGI_AGAIN;
}
ssize_t uwsgi_proto_fastcgi_writev_header(struct wsgi_request * wsgi_req, struct iovec * iovec, size_t iov_len) {
int i;
ssize_t len;
ssize_t ret = 0;
// write a STDOUT packet
int uwsgi_proto_fastcgi_write(struct wsgi_request *wsgi_req, char *buf, size_t len) {
for (i = 0; i < (int) iov_len; i++) {
len = uwsgi_proto_fastcgi_write(wsgi_req, iovec[i].iov_base, iovec[i].iov_len);
if (len <= 0) {
return len;
// fastcgi packets are limited to 64k
if (wsgi_req->proto_parser_status == 0) {
uint16_t fcgi_len = UMIN(len-wsgi_req->write_pos, 0xffff);
wsgi_req->proto_parser_status = fcgi_len;
struct fcgi_record fr;
fr.version = 1;
fr.type = 6;
fr.req1 = 0;
fr.req0 = 1;
fr.pad = 0;
fr.reserved = 0;
fr.cl0 = (uint8_t) (fcgi_len & 0xff);
fr.cl1 = (uint8_t) ((fcgi_len >> 8) & 0xff);
if (uwsgi_write_true_nb(wsgi_req->fd, (char *) &fr, sizeof(struct fcgi_record), uwsgi.shared->options[UWSGI_OPTION_SOCKET_TIMEOUT])) {
return -1;
}
ret += len;
}
return ret;
}
ssize_t uwsgi_proto_fastcgi_writev(struct wsgi_request * wsgi_req, struct iovec * iovec, size_t iov_len) {
return uwsgi_proto_fastcgi_writev_header(wsgi_req, iovec, iov_len);
}
int uwsgi_proto_fastcgi_write(struct wsgi_request * wsgi_req, char *buf, size_t len) {
/*
struct fcgi_record fr;
// in fastcgi we need to not send 0 sized frames
if (!len)
return UWSGI_OK;
fr.version = 1;
fr.type = 6;
fr.req1 = 0;
fr.req0 = 1;
fr.pad = 0;
fr.reserved = 0;
fr.cl = htons(len);
// still trying to send the fcgi header ?
if (*written < 8) {
char *ptr = (char *) &fr;
ssize_t wlen = write(wsgi_req->poll.fd, ptr + *written, 8 - *written);
if (wlen > 0) {
*written += wlen;
if (*written == 8) {
goto body;
}
return UWSGI_AGAIN;
}
if (wlen < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {
return UWSGI_AGAIN;
}
}
return -1;
}
// split response in 64k chunks...
body:
rlen = write(wsgi_req->poll.fd, &fr, 8);
if (rlen <= 0) {
if (!uwsgi.ignore_write_errors) {
uwsgi_req_error("write()");
}
wsgi_req->write_errors++;
return 0;
}
rlen = write(wsgi_req->poll.fd, buf, len);
if (rlen <= 0) {
if (!uwsgi.ignore_write_errors) {
uwsgi_req_error("write()");
}
wsgi_req->write_errors++;
return 0;
}
return rlen;
}
else {
while(len > 0) {
chunk_len = UMIN(65535, len);
fr.cl = htons(chunk_len);
rlen = write(wsgi_req->poll.fd, &fr, 8);
if (rlen != 8) {
if (!uwsgi.ignore_write_errors) {
uwsgi_req_error("write()");
}
wsgi_req->write_errors++;
return 0;
}
rlen = write(wsgi_req->poll.fd, ptr, chunk_len);
if (rlen <= 0) {
if (!uwsgi.ignore_write_errors) {
uwsgi_req_error("write()");
}
wsgi_req->write_errors++;
return 0;
}
ptr += rlen;
len -= rlen;
}
return ptr-buf;
}
*/
return -1;
ssize_t wlen = write(wsgi_req->fd, buf+wsgi_req->write_pos, wsgi_req->proto_parser_status);
if (wlen > 0) {
wsgi_req->write_pos += wlen;
wsgi_req->proto_parser_status -= wlen;
if (wsgi_req->write_pos == len) {
return UWSGI_OK;
}
return UWSGI_AGAIN;
}
if (wlen < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {
return UWSGI_AGAIN;
}
}
return -1;
}
void uwsgi_proto_fastcgi_close(struct wsgi_request *wsgi_req) {
if (write(wsgi_req->fd, FCGI_END_REQUEST, 24) <= 0) {
uwsgi_req_error("write()");
}
// special case here, we run i nvoid context, so we need to wait directly here
(void) uwsgi_write_true_nb(wsgi_req->fd, (char *) FCGI_END_REQUEST, sizeof(FCGI_END_REQUEST), uwsgi.shared->options[UWSGI_OPTION_SOCKET_TIMEOUT]);
uwsgi_proto_base_close(wsgi_req);
}
int uwsgi_proto_fastcgi_sendfile(struct wsgi_request *wsgi_req, int fd, size_t pos, size_t len) {
/*
struct fcgi_record fr;
char buf[65536];
size_t remains = wsgi_req->sendfile_fd_size - wsgi_req->sendfile_fd_pos;
wsgi_req->sendfile_fd_chunk = 65536;
fr.version = 1;
fr.type = 6;
fr.req1 = 0;
fr.req0 = 1;
fr.pad = 0;
fr.reserved = 0;
if (uwsgi.async > 1) {
fr.cl = htons(UMIN(remains, wsgi_req->sendfile_fd_chunk));
len = write(wsgi_req->poll.fd, &fr, 8);
if (len != 8) {
uwsgi_error("write()");
return -1;
}
len = read(wsgi_req->sendfile_fd, buf, ntohs(fr.cl));
if (len != (ssize_t) ntohs(fr.cl)) {
uwsgi_error("read()");
return -1;
}
wsgi_req->sendfile_fd_pos += len;
return write(wsgi_req->poll.fd, buf, len);
}
while (remains) {
fr.cl = htons(UMIN(remains, wsgi_req->sendfile_fd_chunk));
len = write(wsgi_req->poll.fd, &fr, 8);
if (len != 8) {
uwsgi_error("write()");
return -1;
}
len = read(wsgi_req->sendfile_fd, buf, ntohs(fr.cl));
if (len != (ssize_t) ntohs(fr.cl)) {
uwsgi_error("read()");
return -1;
}
wsgi_req->sendfile_fd_pos += len;
if (write(wsgi_req->poll.fd, buf, len) != len) {
uwsgi_error("write()");
return -1;
}
remains = wsgi_req->sendfile_fd_size - wsgi_req->sendfile_fd_pos;
}
return wsgi_req->sendfile_fd_pos;
*/
return -1;
// fastcgi packets are limited to 64k
if (wsgi_req->proto_parser_status == 0) {
uint16_t fcgi_len = (uint16_t) UMIN(len-wsgi_req->write_pos, 0xffff);
wsgi_req->proto_parser_status = fcgi_len;
struct fcgi_record fr;
fr.version = 1;
fr.type = 6;
fr.req1 = 0;
fr.req0 = 1;
fr.pad = 0;
fr.reserved = 0;
fr.cl0 = (uint8_t) (fcgi_len & 0xff);
fr.cl1 = (uint8_t) ((fcgi_len >> 8) & 0xff);
if (uwsgi_write_true_nb(wsgi_req->fd, (char *) &fr, sizeof(struct fcgi_record), uwsgi.shared->options[UWSGI_OPTION_SOCKET_TIMEOUT])) {
return -1;
}
}
ssize_t wlen = uwsgi_sendfile_do(wsgi_req->fd, fd, pos+wsgi_req->write_pos, wsgi_req->proto_parser_status);
if (wlen > 0) {
wsgi_req->write_pos += wlen;
wsgi_req->proto_parser_status -= wlen;
if (wsgi_req->write_pos == len) {
return UWSGI_OK;
}
return UWSGI_AGAIN;
}
if (wlen < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS) {
return UWSGI_AGAIN;
}
}
return -1;
}
+1 -1
View File
@@ -1,6 +1,6 @@
/* async uwsgi protocol parser */
#include "../uwsgi.h"
#include <uwsgi.h>
extern struct uwsgi_server uwsgi;
+2 -25
View File
@@ -1290,8 +1290,9 @@ struct uwsgi_cache {
int headers_hvec;
uint64_t proto_parser_pos;
int proto_parser_status;
int64_t proto_parser_status;
void *proto_parser_buf;
uint64_t proto_parser_buf_size;
void *proto_parser_remains_buf;
size_t proto_parser_remains;
@@ -2706,30 +2707,6 @@ void uwsgi_rpc_init(void);
char *uwsgi_open_and_read(char *, size_t *, int, char *[]);
char *uwsgi_get_last_char(char *, char);
struct uwsgi_twobytes {
uint8_t cl1;
uint8_t cl0;
} __attribute__ ((__packed__));
struct fcgi_record {
uint8_t version;
uint8_t type;
uint8_t req1;
uint8_t req0;
union {
uint16_t cl;
struct uwsgi_twobytes cl8;
};
uint8_t pad;
uint8_t reserved;
} __attribute__ ((__packed__));
#define FCGI_BEGIN_REQUEST "\0\1\0\0\0\0\0\0"
#define FCGI_END_REQUEST "\1\x06\0\1\0\0\0\0\1\3\0\1\0\x08\0\0\0\0\0\0\0\0\0\0"
ssize_t fcgi_send_record(int, uint8_t, uint16_t, char *);
ssize_t fcgi_send_param(int, char *, uint16_t, char *, uint16_t);
uint16_t fcgi_get_record(int, char *);
void uwsgi_spawn_daemon(struct uwsgi_daemon *);
void uwsgi_detach_daemons();