#include "uwsgi.h" /* subscription subsystem each subscription slot is as an hashed item in a dictionary each slot has a circular linked list containing the nodes names the structure and system is very similar to uwsgi_dyn_dict already used by the mime type parser This system is not mean to run on shared memory. If you have multiple processes for the same app, you have to create a new subscriptions slot list. To avoid removal of already using nodes, a reference count system has been implemented */ extern struct uwsgi_server uwsgi; char *uwsgi_subscription_algo_name(void *ptr) { struct uwsgi_string_list *usl = uwsgi.subscription_algos; while(usl) { if (usl->custom_ptr == ptr) { return usl->value; } usl = usl->next; } return NULL; } #ifdef UWSGI_SSL static void uwsgi_subscription_sni_check(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_req *usr) { if (usr->sni_key_len > 0 && usr->sni_crt_len > 0) { if (!current_slot->sni_enabled) { char *sni_key = uwsgi_concat2n(usr->sni_key, usr->sni_key_len, "", 0); char *sni_crt = uwsgi_concat2n(usr->sni_crt, usr->sni_crt_len, "", 0); char *sni_ca = NULL; if (usr->sni_ca_len > 0) { sni_ca = uwsgi_concat2n(usr->sni_ca, usr->sni_ca_len, "", 0); } char *servername = NULL; char *colon = memchr(current_slot->key, ':', current_slot->keylen); if (colon) { servername = uwsgi_concat2n(current_slot->key, colon - current_slot->key, "", 0); } else { servername = uwsgi_concat2n(current_slot->key, current_slot->keylen, "", 0); } if (uwsgi_ssl_add_sni_item(servername, sni_crt, sni_key, uwsgi.sni_dir_ciphers, sni_ca)) { current_slot->sni_enabled = 1; } if (sni_key) free(sni_key); if (sni_crt) free(sni_crt); if (sni_ca) free(sni_ca); } } } #endif int uwsgi_subscription_credentials_check(struct uwsgi_subscribe_slot *slot, struct uwsgi_subscribe_req *usr) { struct uwsgi_string_list *usl = NULL; uwsgi_foreach(usl, uwsgi.subscriptions_credentials_check_dir) { char *filename = uwsgi_concat2n(usl->value, usl->len, slot->key, slot->keylen); struct stat st; int ret = stat(filename, &st); free(filename); if (ret != 0) continue; if (st.st_uid != usr->uid) continue; if (st.st_gid != usr->gid) continue; // accepted... return 1; } return 0; } struct uwsgi_subscribe_slot *uwsgi_get_subscribe_slot(struct uwsgi_subscribe_slot **slot, char *key, uint16_t keylen) { int retried = 0; retry: if (keylen > 0xff) return NULL; uint32_t hash = djb33x_hash(key, keylen); int hash_key = hash % 0xffff; struct uwsgi_subscribe_slot *current_slot = slot[hash_key]; #ifdef UWSGI_DEBUG uwsgi_log("****************************\n"); while (current_slot) { uwsgi_log("slot %.*s %d\n", current_slot->keylen, current_slot->key, current_slot->hits); current_slot = current_slot->next; } uwsgi_log("****************************\n"); current_slot = slot[hash_key]; #endif while (current_slot) { if (!uwsgi_strncmp(key, keylen, current_slot->key, current_slot->keylen)) { // auto optimization if (current_slot->prev) { if (current_slot->hits > current_slot->prev->hits) { struct uwsgi_subscribe_slot *slot_parent = current_slot->prev->prev, *slot_prev = current_slot->prev; if (slot_parent) { slot_parent->next = current_slot; } else { slot[hash_key] = current_slot; } if (current_slot->next) { current_slot->next->prev = slot_prev; } slot_prev->prev = current_slot; slot_prev->next = current_slot->next; current_slot->next = slot_prev; current_slot->prev = slot_parent; } } return current_slot; } current_slot = current_slot->next; // check for loopy optimization if (current_slot == slot[hash_key]) break; } // if we are here and in mountpoints mode, try the domain only variant if (uwsgi.subscription_mountpoints && !retried) { char *slash = memchr(key, '/', keylen); if (slash) { keylen = slash - key; retried = 1; goto retry; } } return NULL; } struct uwsgi_subscribe_node *uwsgi_get_subscribe_node(struct uwsgi_subscribe_slot **slot, char *key, uint16_t keylen, struct uwsgi_subscription_client *client) { if (keylen > 0xff) return NULL; struct uwsgi_subscribe_slot *current_slot = uwsgi_get_subscribe_slot(slot, key, keylen); if (!current_slot) return NULL; // slot found, move up in the list increasing hits current_slot->hits++; time_t now = uwsgi_now(); struct uwsgi_subscribe_node *node = current_slot->nodes; while (node) { // is the node alive ? if (now - node->last_check > uwsgi.subscription_tolerance) { if (node->death_mark == 0) uwsgi_log("[uwsgi-subscription for pid %d] %.*s => marking %.*s as failed (no announce received in %d seconds)\n", (int) uwsgi.mypid, (int) keylen, key, (int) node->len, node->name, uwsgi.subscription_tolerance); node->failcnt++; node->death_mark = 1; } // do i need to remove the node ? if (node->death_mark && node->reference == 0) { // remove the node and move to next struct uwsgi_subscribe_node *dead_node = node; node = node->next; // if the slot has been removed, return NULL; if (uwsgi_remove_subscribe_node(slot, dead_node) == 1) { return NULL; } continue; } struct uwsgi_subscribe_node *choosen_node = current_slot->algo(current_slot, node, client); if (choosen_node) return choosen_node; node = node->next; } return current_slot->algo(current_slot, node, client); } struct uwsgi_subscribe_node *uwsgi_get_subscribe_node_by_name(struct uwsgi_subscribe_slot **slot, char *key, uint16_t keylen, char *val, uint16_t vallen) { if (keylen > 0xff) return NULL; struct uwsgi_subscribe_slot *current_slot = uwsgi_get_subscribe_slot(slot, key, keylen); if (current_slot) { struct uwsgi_subscribe_node *node = current_slot->nodes; while (node) { if (!uwsgi_strncmp(val, vallen, node->name, node->len)) { return node; } node = node->next; } } return NULL; } int uwsgi_remove_subscribe_node(struct uwsgi_subscribe_slot **slot, struct uwsgi_subscribe_node *node) { int ret = 0; struct uwsgi_subscribe_node *a_node; struct uwsgi_subscribe_slot *node_slot = node->slot; struct uwsgi_subscribe_slot *prev_slot = node_slot->prev; struct uwsgi_subscribe_slot *next_slot = node_slot->next; int hash_key = node_slot->hash; // over-engineering to avoid race conditions node->len = 0; if (node == node_slot->nodes) { node_slot->nodes = node->next; } else { a_node = node_slot->nodes; while (a_node) { if (a_node->next == node) { a_node->next = node->next; break; } a_node = a_node->next; } } free(node); // no more nodes, remove the slot too if (node_slot->nodes == NULL) { ret = 1; // first check if i am the only node if ((!prev_slot && !next_slot) || next_slot == node_slot) { #ifdef UWSGI_SSL if (node_slot->sign_ctx) { EVP_PKEY_free(node_slot->sign_public_key); EVP_MD_CTX_destroy(node_slot->sign_ctx); } #ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME // if there is a SNI context active, destroy it if (node_slot->sni_enabled) { uwsgi_ssl_del_sni_item(node_slot->key, node_slot->keylen); } #endif #endif free(node_slot); slot[hash_key] = NULL; goto end; } // if i am the main entry point, set the next value if (node_slot == slot[hash_key]) { slot[hash_key] = next_slot; } if (prev_slot) { prev_slot->next = next_slot; } if (next_slot) { next_slot->prev = prev_slot; } #ifdef UWSGI_SSL if (node_slot->sign_ctx) { EVP_PKEY_free(node_slot->sign_public_key); EVP_MD_CTX_destroy(node_slot->sign_ctx); } #endif free(node_slot); } end: return ret; } #ifdef UWSGI_SSL static int subscription_new_sign_ctx(struct uwsgi_subscribe_slot *, struct uwsgi_subscribe_req *); static int subscription_is_safe(struct uwsgi_subscribe_req *); #endif struct uwsgi_subscribe_node *uwsgi_add_subscribe_node(struct uwsgi_subscribe_slot **slot, struct uwsgi_subscribe_req *usr) { struct uwsgi_subscribe_slot *current_slot = uwsgi_get_subscribe_slot(slot, usr->key, usr->keylen), *old_slot = NULL, *a_slot; struct uwsgi_subscribe_node *node, *old_node = NULL; if (usr->address_len > 0xff || usr->address_len == 0) return NULL; if (current_slot) { #ifdef UWSGI_SSL if (uwsgi.subscriptions_sign_check_dir && !uwsgi_subscription_sign_check(current_slot, usr)) { return NULL; } #endif if (uwsgi.subscriptions_credentials_check_dir && !uwsgi_subscription_credentials_check(current_slot, usr)) { return NULL; } node = current_slot->nodes; while (node) { if (!uwsgi_strncmp(node->name, node->len, usr->address, usr->address_len)) { #ifdef UWSGI_SSL // this should avoid sending sniffed packets... if (current_slot->sign_ctx && !subscription_is_safe(usr) && usr->unix_check <= node->unix_check) { uwsgi_log("[uwsgi-subscription for pid %d] invalid (sniffed ?) packet sent for slot: %.*s node: %.*s unix_check: %lu\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, (unsigned long) usr->unix_check); return NULL; } // eventually the packet could be upgraded to sni... uwsgi_subscription_sni_check(current_slot, usr); #endif // remove death mark and update cores and load node->death_mark = 0; node->last_check = uwsgi_now(); node->cores = usr->cores; node->load = usr->load; node->weight = usr->weight; node->backup_level = usr->backup_level; if (usr->proto_len > 0) { node->proto = usr->proto[0]; } if (!node->weight) node->weight = 1; node->last_requests = 0; return node; } old_node = node; node = node->next; } #ifdef UWSGI_SSL if (current_slot->sign_ctx && !subscription_is_safe(usr) && usr->unix_check < (uwsgi_now() - (time_t) uwsgi.subscriptions_sign_check_tolerance)) { uwsgi_log("[uwsgi-subscription for pid %d] invalid (sniffed ?) packet sent for slot: %.*s node: %.*s unix_check: %lu\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, (unsigned long) usr->unix_check); return NULL; } // check here as we are sure the node will be added uwsgi_subscription_sni_check(current_slot, usr); #endif node = uwsgi_malloc(sizeof(struct uwsgi_subscribe_node)); node->len = usr->address_len; node->modifier1 = usr->modifier1; node->modifier2 = usr->modifier2; node->requests = 0; node->last_requests = 0; node->tx = 0; node->rx = 0; node->reference = 0; node->death_mark = 0; node->failcnt = 0; node->cores = usr->cores; node->load = usr->load; node->weight = usr->weight; node->backup_level = usr->backup_level; if (usr->proto_len > 0) { node->proto = usr->proto[0]; } node->unix_check = usr->unix_check; if (!node->weight) node->weight = 1; node->wrr = 0; node->pid = usr->pid; node->uid = usr->uid; node->gid = usr->gid; node->notify[0] = 0; if (usr->notify_len > 0 && usr->notify_len < 102) { memcpy(node->notify, usr->notify, usr->notify_len); node->notify[usr->notify_len] = 0; } node->last_check = uwsgi_now(); node->slot = current_slot; memcpy(node->name, usr->address, usr->address_len); if (old_node) { old_node->next = node; } node->next = NULL; uwsgi_log("[uwsgi-subscription for pid %d] %.*s => new node: %.*s (weight: %d, backup: %d)\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, usr->weight, usr->backup_level); if (node->notify[0]) { char buf[1024]; int ret = snprintf(buf, 1024, "[subscription ack] %.*s => new node: %.*s", usr->keylen, usr->key, usr->address_len, usr->address); if (ret > 0 && ret < 1024) uwsgi_notify_msg(node->notify, buf, ret); } return node; } else { current_slot = uwsgi_malloc(sizeof(struct uwsgi_subscribe_slot)); #ifdef UWSGI_SSL current_slot->sign_ctx = NULL; if (uwsgi.subscriptions_sign_check_dir && !subscription_new_sign_ctx(current_slot, usr)) { free(current_slot); return NULL; } #endif uint32_t hash = djb33x_hash(usr->key, usr->keylen); int hash_key = hash % 0xffff; current_slot->hash = hash_key; current_slot->keylen = usr->keylen; memcpy(current_slot->key, usr->key, usr->keylen); if (uwsgi.subscriptions_credentials_check_dir) { if (!uwsgi_subscription_credentials_check(current_slot, usr)) { free(current_slot); return NULL; } } current_slot->key[usr->keylen] = 0; current_slot->hits = 0; #ifdef UWSGI_SSL current_slot->sni_enabled = 0; uwsgi_subscription_sni_check(current_slot, usr); #endif current_slot->nodes = uwsgi_malloc(sizeof(struct uwsgi_subscribe_node)); current_slot->nodes->slot = current_slot; current_slot->nodes->len = usr->address_len; current_slot->nodes->reference = 0; current_slot->nodes->requests = 0; current_slot->nodes->last_requests = 0; current_slot->nodes->tx = 0; current_slot->nodes->rx = 0; current_slot->nodes->death_mark = 0; current_slot->nodes->failcnt = 0; current_slot->nodes->modifier1 = usr->modifier1; current_slot->nodes->modifier2 = usr->modifier2; current_slot->nodes->cores = usr->cores; current_slot->nodes->load = usr->load; current_slot->nodes->weight = usr->weight; current_slot->nodes->backup_level = usr->backup_level; if (usr->proto_len > 0) { current_slot->nodes->proto = usr->proto[0]; } current_slot->nodes->unix_check = usr->unix_check; if (!current_slot->nodes->weight) current_slot->nodes->weight = 1; current_slot->nodes->wrr = 0; current_slot->nodes->pid = usr->pid; current_slot->nodes->uid = usr->uid; current_slot->nodes->gid = usr->gid; current_slot->nodes->notify[0] = 0; if (usr->notify_len > 0 && usr->notify_len < 102) { memcpy(current_slot->nodes->notify, usr->notify, usr->notify_len); current_slot->nodes->notify[usr->notify_len] = 0; } memcpy(current_slot->nodes->name, usr->address, usr->address_len); current_slot->nodes->last_check = uwsgi_now(); current_slot->nodes->next = NULL; a_slot = slot[hash_key]; while (a_slot) { old_slot = a_slot; a_slot = a_slot->next; } if (old_slot) { old_slot->next = current_slot; } current_slot->prev = old_slot; current_slot->next = NULL; current_slot->algo = usr->algo; if (!current_slot->algo) current_slot->algo = uwsgi.subscription_algo; if (!slot[hash_key] || current_slot->prev == NULL) { slot[hash_key] = current_slot; } uwsgi_log("[uwsgi-subscription for pid %d] new pool: %.*s (hash key: %d, algo: %s)\n", (int) uwsgi.mypid, usr->keylen, usr->key, current_slot->hash, uwsgi_subscription_algo_name(current_slot->algo)); uwsgi_log("[uwsgi-subscription for pid %d] %.*s => new node: %.*s (weight: %d, backup: %d)\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, usr->weight, usr->backup_level); if (current_slot->nodes->notify[0]) { char buf[1024]; int ret = snprintf(buf, 1024, "[subscription ack] %.*s => new node: %.*s", usr->keylen, usr->key, usr->address_len, usr->address); if (ret > 0 && ret < 1024) uwsgi_notify_msg(current_slot->nodes->notify, buf, ret); } return current_slot->nodes; } } static void send_subscription(int sfd, char *host, char *message, uint16_t message_size) { int fd = sfd; struct sockaddr_in udp_addr; struct sockaddr_un un_addr; ssize_t ret; char *udp_port = strchr(host, ':'); if (fd == -1) { if (udp_port) { fd = socket(AF_INET, SOCK_DGRAM, 0); } else { fd = socket(AF_UNIX, SOCK_DGRAM, 0); } if (fd < 0) { uwsgi_error("send_subscription()/socket()"); return; } uwsgi_socket_nb(fd); } else if (fd == -2) { static int unix_fd = -1; static int inet_fd = -1; if (udp_port) { if (inet_fd == -1) { inet_fd = socket(AF_INET, SOCK_DGRAM, 0); if (inet_fd < 0) { uwsgi_error("send_subscription()/socket()"); return; } uwsgi_socket_nb(inet_fd); } fd = inet_fd; } else { if (unix_fd == -1) { unix_fd = socket(AF_UNIX, SOCK_DGRAM, 0); if (unix_fd < 0) { uwsgi_error("send_subscription()/socket()"); return; } uwsgi_socket_nb(unix_fd); } fd = unix_fd; } } if (udp_port) { udp_port[0] = 0; memset(&udp_addr, 0, sizeof(struct sockaddr_in)); udp_addr.sin_family = AF_INET; udp_addr.sin_port = htons(atoi(udp_port + 1)); udp_addr.sin_addr.s_addr = inet_addr(host); ret = sendto(fd, message, message_size, 0, (struct sockaddr *) &udp_addr, sizeof(udp_addr)); udp_port[0] = ':'; } else { memset(&un_addr, 0, sizeof(struct sockaddr_un)); un_addr.sun_family = AF_UNIX; // use 102 as the magic number strncat(un_addr.sun_path, host, 102); if (uwsgi.subscriptions_use_credentials) { // could be useless as internally the socket could add them automagically ret = uwsgi_pass_cred2(fd, message, message_size, (struct sockaddr *) &un_addr, sizeof(un_addr)); } else { ret = sendto(fd, message, message_size, 0, (struct sockaddr *) &un_addr, sizeof(un_addr)); } } if (ret < 0) { uwsgi_error("send_subscription()/sendto()"); } if (sfd == -1) close(fd); } static int uwsgi_subscription_ub_fix(struct uwsgi_buffer *ub, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *sign) { #ifdef UWSGI_SSL if (sign) { if (uwsgi_buffer_append_keynum(ub, "unix", 4, (uwsgi_now() + (time_t) cmd))) return -1; unsigned int signature_len = 0; char *signature = uwsgi_rsa_sign(sign, ub->buf + 4, ub->pos - 4, &signature_len); if (signature && signature_len > 0) { if (uwsgi_buffer_append_keyval(ub, "sign", 4, signature, signature_len)) { free(signature); return -1; } free(signature); } } #endif // add uwsgi header if (uwsgi_buffer_set_uh(ub, 224, cmd)) return -1; return 0; } static struct uwsgi_buffer *uwsgi_subscription_ub(char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) { struct uwsgi_buffer *ub = uwsgi_buffer_new(4096); // make space for uwsgi header ub->pos = 4; if (uwsgi_buffer_append_keyval(ub, "key", 3, key, keysize)) goto end; if (uwsgi_buffer_append_keyval(ub, "address", 7, socket_name, strlen(socket_name))) goto end; if (uwsgi.subscribe_with_modifier1) { modifier1 = atoi(uwsgi.subscribe_with_modifier1); } if (uwsgi_buffer_append_keynum(ub, "modifier1", 9, modifier1)) goto end; if (uwsgi_buffer_append_keynum(ub, "modifier2", 9, modifier2)) goto end; if (uwsgi_buffer_append_keynum(ub, "cores", 5, uwsgi.numproc * uwsgi.cores)) goto end; if (uwsgi_buffer_append_keynum(ub, "load", 4, uwsgi.shared->load)) goto end; if (uwsgi.auto_weight) { if (uwsgi_buffer_append_keynum(ub, "weight", 6, uwsgi.numproc * uwsgi.cores)) goto end; } else { if (uwsgi_buffer_append_keynum(ub, "weight", 6, uwsgi.weight)) goto end; } if (sni_key) { if (uwsgi_buffer_append_keyval(ub, "sni_key", 7, sni_key, strlen(sni_key))) goto end; } if (sni_crt) { if (uwsgi_buffer_append_keyval(ub, "sni_crt", 7, sni_crt, strlen(sni_crt))) goto end; } if (sni_ca) { if (uwsgi_buffer_append_keyval(ub, "sni_ca", 6, sni_ca, strlen(sni_ca))) goto end; } if (uwsgi.subscription_notify_socket) { if (uwsgi_buffer_append_keyval(ub, "notify", 6, uwsgi.subscription_notify_socket, strlen(uwsgi.subscription_notify_socket))) goto end; } else if (uwsgi.notify_socket_fd > -1 && uwsgi.notify_socket) { if (uwsgi_buffer_append_keyval(ub, "notify", 6, uwsgi.notify_socket, strlen(uwsgi.notify_socket))) goto end; } if (uwsgi_subscription_ub_fix(ub, modifier1, modifier2, cmd, sign)) goto end; return ub; end: uwsgi_buffer_destroy(ub); return NULL; } void uwsgi_send_subscription_from_fd(int fd, char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) { if (socket_name == NULL && !uwsgi.sockets) return; if (!socket_name) { socket_name = uwsgi.sockets->name; } struct uwsgi_buffer *ub = uwsgi_subscription_ub(key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca); if (!ub) return; send_subscription(fd, udp_address, ub->buf, ub->pos); uwsgi_buffer_destroy(ub); } void uwsgi_send_subscription(char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) { uwsgi_send_subscription_from_fd(-1, udp_address, key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca); } #ifdef UWSGI_SSL static int subscription_is_safe(struct uwsgi_subscribe_req *usr) { struct uwsgi_string_list *usl = NULL; uwsgi_foreach(usl, uwsgi.subscriptions_sign_skip_uid) { if (usl->custom == 0) { usl->custom = atoi(usl->value); } if (usr->uid > 0 && usr->uid == (uid_t) usl->custom) { return 1; } } return 0; } static int subscription_new_sign_ctx(struct uwsgi_subscribe_slot *slot, struct uwsgi_subscribe_req *usr) { if (subscription_is_safe(usr)) return 1; if (usr->sign_len == 0 || usr->base_len == 0) return 0; if (usr->unix_check < (uwsgi_now() - (time_t) uwsgi.subscriptions_sign_check_tolerance)) { uwsgi_log("[uwsgi-subscription for pid %d] invalid (sniffed ?) packet sent for slot: %.*s node: %.*s unix_check: %lu\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, (unsigned long) usr->unix_check); return 0; } char *keyfile = uwsgi_sanitize_cert_filename(uwsgi.subscriptions_sign_check_dir, usr->key, usr->keylen); FILE *kf = fopen(keyfile, "r"); free(keyfile); if (!kf) return 0; slot->sign_public_key = PEM_read_PUBKEY(kf, NULL, NULL, NULL); fclose(kf); if (!slot->sign_public_key) { uwsgi_log("unable to load public key for %.*s\n", usr->keylen, usr->key); return 0; } slot->sign_ctx = EVP_MD_CTX_create(); if (!slot->sign_ctx) { uwsgi_log("unable to initialize EVP context for %.*s\n", usr->keylen, usr->key); EVP_PKEY_free(slot->sign_public_key); return 0; } if (!uwsgi_subscription_sign_check(slot, usr)) { EVP_PKEY_free(slot->sign_public_key); EVP_MD_CTX_destroy(slot->sign_ctx); return 0; } return 1; } int uwsgi_subscription_sign_check(struct uwsgi_subscribe_slot *slot, struct uwsgi_subscribe_req *usr) { if (subscription_is_safe(usr)) return 1; if (usr->sign_len == 0 || usr->base_len == 0) return 0; if (!slot->sign_ctx) { if (!subscription_new_sign_ctx(slot, usr)) return 0; } if (EVP_VerifyInit_ex(slot->sign_ctx, uwsgi.subscriptions_sign_check_md, NULL) == 0) { ERR_print_errors_fp(stderr); return 0; } if (EVP_VerifyUpdate(slot->sign_ctx, usr->base, usr->base_len) == 0) { ERR_print_errors_fp(stderr); return 0; } if (EVP_VerifyFinal(slot->sign_ctx, (unsigned char *) usr->sign, usr->sign_len, slot->sign_public_key) != 1) { #ifdef UWSGI_DEBUG ERR_print_errors_fp(stderr); #endif return 0; } return 1; } #endif int uwsgi_no_subscriptions(struct uwsgi_subscribe_slot **slot) { int i; for (i = 0; i < UMAX16; i++) { if (slot[i]) return 0; } return 1; } void uwsgi_subscribe(char *subscription, uint8_t cmd) { size_t subfile_size; size_t i; char *key = NULL; int keysize = 0; char *modifier1 = NULL; int modifier1_len = 0; char *socket_name = NULL; char *udp_address = subscription; char *udp_port = NULL; char *subscription_key = NULL; char *sign = NULL; // check for explicit socket_name char *equal = strchr(subscription, '='); if (equal) { socket_name = subscription; if (socket_name[0] == '=') { equal = strchr(socket_name + 1, '='); if (!equal) return; *equal = '\0'; struct uwsgi_socket *us = uwsgi_get_shared_socket_by_num(atoi(socket_name + 1)); if (!us) return; socket_name = us->name; } *equal = '\0'; udp_address = equal + 1; } // check for unix socket if (udp_address[0] != '/') { udp_port = strchr(udp_address, ':'); if (!udp_port) { if (equal) *equal = '='; return; } subscription_key = strchr(udp_port + 1, ':'); } else { subscription_key = strchr(udp_address + 1, ':'); } if (!subscription_key) { if (equal) *equal = '='; return; } udp_address = uwsgi_concat2n(udp_address, subscription_key - udp_address, "", 0); if (subscription_key[1] == '@') { if (!uwsgi_file_exists(subscription_key + 2)) goto clear; char *lines = uwsgi_open_and_read(subscription_key + 2, &subfile_size, 1, NULL); if (subfile_size > 0) { key = lines; for (i = 0; i < subfile_size; i++) { if (lines[i] == 0) { if (keysize > 0) { if (key[0] != '#' && key[0] != '\n') { modifier1 = strchr(key, ','); if (modifier1) { modifier1[0] = 0; modifier1++; modifier1_len = strlen(modifier1); keysize = strlen(key); } uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL); modifier1 = NULL; modifier1_len = 0; } } break; } else if (lines[i] == '\n') { if (keysize > 0) { if (key[0] != '#' && key[0] != '\n') { lines[i] = 0; modifier1 = strchr(key, ','); if (modifier1) { modifier1[0] = 0; modifier1++; modifier1_len = strlen(modifier1); keysize = strlen(key); } uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL); modifier1 = NULL; modifier1_len = 0; lines[i] = '\n'; } } key = lines + i + 1; keysize = 0; continue; } keysize++; } } free(lines); } else { modifier1 = strchr(subscription_key + 1, ','); if (modifier1) { modifier1[0] = 0; modifier1++; sign = strchr(modifier1 + 1, ','); if (sign) { *sign = 0; sign++; } modifier1_len = strlen(modifier1); } uwsgi_send_subscription(udp_address, subscription_key + 1, strlen(subscription_key + 1), uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL); if (modifier1) modifier1[-1] = ','; if (sign) sign[-1] = ','; } clear: if (equal) *equal = '='; free(udp_address); } void uwsgi_subscribe2(char *arg, uint8_t cmd) { char *s2_server = NULL; char *s2_key = NULL; char *s2_socket = NULL; char *s2_addr = NULL; char *s2_weight = NULL; char *s2_sign = NULL; char *s2_modifier1 = NULL; char *s2_modifier2 = NULL; char *s2_check = NULL; char *s2_sni_key = NULL; char *s2_sni_crt = NULL; char *s2_sni_ca = NULL; char *s2_proto = NULL; char *s2_algo = NULL; char *s2_backup = NULL; struct uwsgi_buffer *ub = NULL; if (uwsgi_kvlist_parse(arg, strlen(arg), ',', '=', "server", &s2_server, "key", &s2_key, "socket", &s2_socket, "addr", &s2_addr, "weight", &s2_weight, "modifier1", &s2_modifier1, "modifier2", &s2_modifier2, "sign", &s2_sign, "check", &s2_check, "sni_key", &s2_sni_key, "sni_crt", &s2_sni_crt, "sni_ca", &s2_sni_ca, "proto", &s2_proto, "algo", &s2_algo, "backup", &s2_backup, NULL)) { return; } if (!s2_server || !s2_key) goto end; if (s2_check) { if (uwsgi_file_exists(s2_check)) goto end; } int weight = 1; int backup = 0; if (uwsgi.auto_weight) weight = uwsgi.numproc * uwsgi.cores; if (s2_weight) { weight = atoi(s2_weight); } if (s2_backup) { backup = atoi(s2_backup); } if (s2_socket) { struct uwsgi_socket *us = uwsgi_get_socket_by_num(atoi(s2_socket)); if (us) { if (s2_addr) { free(s2_addr); } s2_addr = uwsgi_str(us->name); } } uint8_t modifier1 = 0; uint8_t modifier2 = 0; if (s2_modifier1) { modifier1 = atoi(s2_modifier1); } if (s2_modifier2) { modifier2 = atoi(s2_modifier2); } if (s2_addr == NULL) { // no socket... no subscription if (!uwsgi.sockets) goto end; s2_addr = uwsgi_str(uwsgi.sockets->name); } ub = uwsgi_buffer_new(uwsgi.page_size); if (!ub) goto end; // leave space for the header ub->pos = 4; if (uwsgi_buffer_append_keyval(ub, "key", 3, s2_key, strlen(s2_key))) goto end; if (uwsgi_buffer_append_keyval(ub, "address", 7, s2_addr, strlen(s2_addr))) goto end; if (uwsgi_buffer_append_keynum(ub, "modifier1", 9, modifier1)) goto end; if (uwsgi_buffer_append_keynum(ub, "modifier2", 9, modifier2)) goto end; if (uwsgi_buffer_append_keynum(ub, "cores", 5, uwsgi.numproc * uwsgi.cores)) goto end; if (uwsgi_buffer_append_keynum(ub, "load", 4, uwsgi.shared->load)) goto end; if (uwsgi_buffer_append_keynum(ub, "weight", 6, weight)) goto end; if (uwsgi_buffer_append_keynum(ub, "backup", 6, backup)) goto end; if (s2_sni_key) { if (uwsgi_buffer_append_keyval(ub, "sni_key", 7, s2_sni_key, strlen(s2_sni_key))) goto end; } if (s2_sni_crt) { if (uwsgi_buffer_append_keyval(ub, "sni_crt", 7, s2_sni_crt, strlen(s2_sni_crt))) goto end; } if (s2_sni_ca) { if (uwsgi_buffer_append_keyval(ub, "sni_ca", 6, s2_sni_ca, strlen(s2_sni_ca))) goto end; } if (s2_proto) { if (uwsgi_buffer_append_keyval(ub, "proto", 5, s2_proto, strlen(s2_proto))) goto end; } if (s2_algo) { if (uwsgi_buffer_append_keyval(ub, "algo", 4, s2_algo, strlen(s2_algo))) goto end; } if (uwsgi.subscription_notify_socket) { if (uwsgi_buffer_append_keyval(ub, "notify", 6, uwsgi.subscription_notify_socket, strlen(uwsgi.subscription_notify_socket))) goto end; } else if (uwsgi.notify_socket_fd > -1 && uwsgi.notify_socket) { if (uwsgi_buffer_append_keyval(ub, "notify", 6, uwsgi.notify_socket, strlen(uwsgi.notify_socket))) goto end; } if (uwsgi_subscription_ub_fix(ub, modifier1, modifier2, cmd, s2_sign)) goto end; send_subscription(-1, s2_server, ub->buf, ub->pos); end: if (ub) uwsgi_buffer_destroy(ub); if (s2_server) free(s2_server); if (s2_key) free(s2_key); if (s2_socket) free(s2_socket); if (s2_addr) free(s2_addr); if (s2_weight) free(s2_weight); if (s2_modifier1) free(s2_modifier1); if (s2_modifier2) free(s2_modifier2); if (s2_sign) free(s2_sign); if (s2_check) free(s2_check); if (s2_sni_crt) free(s2_sni_crt); if (s2_sni_key) free(s2_sni_key); if (s2_sni_ca) free(s2_sni_ca); if (s2_proto) free(s2_proto); if (s2_algo) free(s2_algo); if (s2_backup) free(s2_backup); } void uwsgi_subscribe_all(uint8_t cmd, int verbose) { if (uwsgi.subscriptions_blocked) return; // -- subscribe struct uwsgi_string_list *subscriptions = uwsgi.subscriptions; while (subscriptions) { if (verbose) { uwsgi_log("%s %s\n", cmd ? "unsubscribing from" : "subscribing to", subscriptions->value); } uwsgi_subscribe(subscriptions->value, cmd); subscriptions = subscriptions->next; } // --subscribe2 subscriptions = uwsgi.subscriptions2; while (subscriptions) { if (verbose) { uwsgi_log("%s %s\n", cmd ? "unsubscribing from" : "subscribing to", subscriptions->value); } uwsgi_subscribe2(subscriptions->value, cmd); subscriptions = subscriptions->next; } } // iphash static struct uwsgi_subscribe_node *uwsgi_subscription_algo_iphash(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node, struct uwsgi_subscription_client *client) { // if node is NULL we are in the second step (in lrc mode we do not use the first step) if (node) return NULL; // iphash does not support requests without client data if (!client) return NULL; if (!client->sockaddr) return NULL; uint64_t count = 0; // first step is counting the number of nodes node = current_slot->nodes; while(node) { if (!node->death_mark) count++; node = node->next; } if (count == 0) return NULL; uint64_t hash = 0; //hash the ip if (client->sockaddr->sa.sa_family == AF_INET) { hash = client->sockaddr->sa_in.sin_addr.s_addr % count; } #ifdef AF_INET6 else if (client->sockaddr->sa.sa_family == AF_INET6) { hash = djb33x_hash((char *)client->sockaddr->sa_in6.sin6_addr.s6_addr, 16) % count; } #endif // now re-iterate until count matches; count = 0; struct uwsgi_subscribe_node *choosen_node = NULL; node = current_slot->nodes; while (node) { if (!node->death_mark) { if (count == hash) { choosen_node = node; break; } count++; } node = node->next; } if (choosen_node) { choosen_node->reference++; } return choosen_node; } // least reference count static struct uwsgi_subscribe_node *uwsgi_subscription_algo_lrc(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node, struct uwsgi_subscription_client *client) { uint64_t backup_level = 0; uint64_t has_backup = 0; // if node is NULL we are in the second step (in lrc mode we do not use the first step) if (node) return NULL; struct uwsgi_subscribe_node *choosen_node = NULL; retry: node = current_slot->nodes; uint64_t min_rc = 0; while (node) { if (!node->death_mark) { if (node->backup_level == backup_level) { if (min_rc == 0 || node->reference < min_rc) { min_rc = node->reference; choosen_node = node; if (min_rc == 0 && !(node->next && node->next->reference <= node->reference && node->next->last_requests <= node->last_requests)) break; } } else if (node->backup_level > backup_level && (!has_backup || has_backup > node->backup_level)) { has_backup = node->backup_level; } } node = node->next; } if (choosen_node) { choosen_node->reference++; } else if (has_backup) { backup_level = has_backup; goto retry; } return choosen_node; } // weighted least reference count static struct uwsgi_subscribe_node *uwsgi_subscription_algo_wlrc(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node, struct uwsgi_subscription_client *client) { uint64_t backup_level = 0; uint64_t has_backup = 0; // if node is NULL we are in the second step (in wlrc mode we do not use the first step) if (node) return NULL; struct uwsgi_subscribe_node *choosen_node = NULL; retry: node = current_slot->nodes; has_backup = 0; double min_rc = 0; while (node) { if (!node->death_mark) { if (node->backup_level == backup_level) { // node->weight is always >= 1, we can safely use it as divider double ref = (double) node->reference / (double) node->weight; double next_node_ref = 0; if (node->next) next_node_ref = (double) node->next->reference / (double) node->next->weight; if (min_rc == 0 || ref < min_rc) { min_rc = ref; choosen_node = node; if (min_rc == 0 && !(node->next && next_node_ref <= ref && node->next->last_requests <= node->last_requests)) break; } } else if (node->backup_level > backup_level && (!has_backup || has_backup > node->backup_level)) { has_backup = node->backup_level; } } node = node->next; } if (choosen_node) { choosen_node->reference++; } else if (has_backup) { backup_level = has_backup; goto retry; } return choosen_node; } // weighted round robin algo (with backup support) static struct uwsgi_subscribe_node *uwsgi_subscription_algo_wrr(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node, struct uwsgi_subscription_client *client) { uint64_t backup_level = 0; uint64_t has_backup = 0; // if node is NULL we are in the second step if (node) { if (node->death_mark == 0 && node->wrr > 0) { node->wrr--; node->reference++; return node; } return NULL; } // no wrr > 0 node found, reset them node = current_slot->nodes; uint64_t min_weight = 0; while (node) { if (!node->death_mark) { if (min_weight == 0 || node->weight < min_weight) min_weight = node->weight; } node = node->next; } // now set wrr retry: node = current_slot->nodes; has_backup = 0; struct uwsgi_subscribe_node *choosen_node = NULL; while (node) { if (!node->death_mark) { if (node->backup_level == backup_level) { node->wrr = node->weight / min_weight; choosen_node = node; } else if (node->backup_level > backup_level && (!has_backup || has_backup > node->backup_level)) { has_backup = node->backup_level; } } node = node->next; } if (choosen_node) { choosen_node->wrr--; choosen_node->reference++; } else if (has_backup) { backup_level = has_backup; goto retry; } return choosen_node; } void uwsgi_subscription_init_algos() { uwsgi_register_subscription_algo("wrr", uwsgi_subscription_algo_wrr); uwsgi_register_subscription_algo("lrc", uwsgi_subscription_algo_lrc); uwsgi_register_subscription_algo("wlrc", uwsgi_subscription_algo_wlrc); uwsgi_register_subscription_algo("iphash", uwsgi_subscription_algo_iphash); } void uwsgi_subscription_set_algo(char *algo) { if (!uwsgi.subscription_algos) { uwsgi_subscription_init_algos(); } if (!algo) goto wrr; uwsgi.subscription_algo = uwsgi_subscription_algo_get(algo, strlen(algo)); if (uwsgi.subscription_algo) return ; wrr: uwsgi.subscription_algo = uwsgi_subscription_algo_wrr; } // we are lazy for subscription algos, we initialize them only if needed struct uwsgi_subscribe_slot **uwsgi_subscription_init_ht() { if (!uwsgi.subscription_algo) { uwsgi_subscription_set_algo(NULL); } return uwsgi_calloc(sizeof(struct uwsgi_subscription_slot *) * UMAX16); } struct uwsgi_subscribe_node *(*uwsgi_subscription_algo_get(char *name , size_t len))(struct uwsgi_subscribe_slot *, struct uwsgi_subscribe_node *, struct uwsgi_subscription_client *) { struct uwsgi_string_list *usl = NULL; uwsgi_foreach(usl, uwsgi.subscription_algos) { if (!uwsgi_strncmp(usl->value, usl->len, name, len)) { return (struct uwsgi_subscribe_node *(*)(struct uwsgi_subscribe_slot *, struct uwsgi_subscribe_node *, struct uwsgi_subscription_client *)) usl->custom_ptr; } } return NULL; } void uwsgi_register_subscription_algo(char *name, struct uwsgi_subscribe_node *(*func)(struct uwsgi_subscribe_slot *, struct uwsgi_subscribe_node *, struct uwsgi_subscription_client *)) { struct uwsgi_string_list *usl = uwsgi_string_new_list(&uwsgi.subscription_algos, name); usl->custom_ptr = func; }