@@ -64,6 +64,11 @@ struct file_upload_state {
uint64_t last_io_time;
};
+struct parent_connection_info {
+ struct mg_mgr *mgr;
+ unsigned long conn_id;
+};
+
static bool run_postupdate;
static unsigned int watchdog_conn = 0;
static struct mg_http_serve_opts s_http_server_opts;
@@ -74,8 +79,6 @@ static bool ssl;
static struct mg_tls_opts tls_opts;
#endif
-static int ws_pipe;
-
static int s_signo = 0;
static void signal_handler(int signo) {
s_signo = signo;
@@ -358,30 +361,6 @@ static void restart_handler(struct mg_connection *nc, void *ev_data)
mg_http_reply(nc, 201, "", "%s", "Device will reboot now.\n");
}
-static void broadcast_callback(struct mg_connection *nc, int ev,
- void __attribute__ ((__unused__)) *ev_data)
-{
- static uint64_t last_io_time = 0;
- if (ev == MG_EV_READ) {
- struct mg_connection *t;
- for (t = nc->mgr->conns; t != NULL; t = t->next) {
- if (!t->is_websocket) continue;
- mg_ws_send(t,(char *)nc->recv.buf, nc->recv.len, WEBSOCKET_OP_TEXT);
- }
- mg_iobuf_del(&nc->recv, 0, nc->recv.len);
- last_io_time = mg_millis();
- } else if (ev == MG_EV_POLL) {
- struct mg_connection *t;
- uint64_t now = *((uint64_t *)ev_data);
- if (now < last_io_time + 20000) return;
- for (t = nc->mgr->conns; t != NULL; t = t->next) {
- if (!t->is_websocket) continue;
- mg_ws_send(t, "", 0, WEBSOCKET_OP_PING);
- }
- last_io_time = now;
- }
-}
-
static int level_to_rfc_5424(int level)
{
switch(level) {
@@ -398,14 +377,18 @@ static int level_to_rfc_5424(int level)
}
}
-static void broadcast(char *str)
+static void broadcast(struct parent_connection_info *p, char *str)
{
- send(ws_pipe, str, strlen(str), 0);
+ mg_wakeup(p->mgr, p->conn_id, str, strlen(str));
}
-static void *broadcast_message_thread(void __attribute__ ((__unused__)) *data)
+static void *broadcast_message_thread(void *data)
{
int fd = -1;
+ struct parent_connection_info *p = (struct parent_connection_info *) data;
+
+ if(!p)
+ return NULL;
for (;;) {
ipc_message msg;
@@ -423,7 +406,7 @@ static void *broadcast_message_thread(void __attribute__ ((__unused__)) *data)
ret = ipc_notify_receive(&fd, &msg);
if (ret != sizeof(msg))
- return NULL;
+ break;
if (strlen(msg.data.notify.msg) != 0 &&
msg.data.status.current != PROGRESS) {
@@ -441,18 +424,24 @@ static void *broadcast_message_thread(void __attribute__ ((__unused__)) *data)
level_to_rfc_5424(msg.data.notify.level), /* RFC 5424 */
text);
- broadcast(str);
+ broadcast(p, str);
}
}
+ free(p);
+ return NULL;
}
-static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
+static void *broadcast_progress_thread(void *data)
{
RECOVERY_STATUS status = -1;
sourcetype source = -1;
unsigned int step = 0;
uint8_t percent = 0;
int fd = -1;
+ struct parent_connection_info *p = (struct parent_connection_info *) data;
+
+ if(!p)
+ return NULL;
for (;;) {
struct progress_msg msg;
@@ -472,7 +461,7 @@ static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
ret = progress_ipc_receive(&fd, &msg);
if (ret != sizeof(msg))
- return NULL;
+ break;
if (msg.status != PROGRESS &&
(msg.status != status || msg.status == FAILURE)) {
@@ -486,7 +475,7 @@ static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
"\t\"status\": \"%s\"\r\n"
"}\r\n",
escaped);
- broadcast(str);
+ broadcast(p, str);
}
if (msg.source != source) {
@@ -498,7 +487,7 @@ static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
"\t\"source\": \"%s\"\r\n"
"}\r\n",
get_source_string(msg.source));
- broadcast(str);
+ broadcast(p, str);
}
if (msg.status == SUCCESS && msg.source == SOURCE_WEBSERVER && run_postupdate) {
@@ -516,7 +505,7 @@ static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
"\t\"source\": \"%s\"\r\n"
"}\r\n",
escaped);
- broadcast(str);
+ broadcast(p, str);
}
if ((msg.cur_step != step || msg.cur_percent != percent) &&
@@ -538,9 +527,11 @@ static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
msg.cur_step,
escaped,
msg.cur_percent);
- broadcast(str);
+ broadcast(p, str);
}
}
+ free(p);
+ return NULL;
}
static void timer_ev_handler(void *fn_data)
@@ -682,11 +673,20 @@ static void websocket_handler(struct mg_connection *nc, void *ev_data)
{
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
mg_ws_upgrade(nc, hm, NULL);
+ nc->data[0] = 'W'; // Set unique Websocket marker
}
static void ev_handler(struct mg_connection *nc, int ev, void *ev_data)
{
- if (nc->data[0] != 'M' && ev == MG_EV_HTTP_MSG) {
+ static uint64_t last_io_time = 0;
+ if (ev == MG_EV_OPEN && nc->is_listening) {
+ struct parent_connection_info *data = calloc(2, sizeof(struct parent_connection_info));
+ data[0].mgr = nc->mgr;
+ data[0].conn_id = nc->id;
+ memcpy(&data[1], &data[0], sizeof(struct parent_connection_info));
+ start_thread(broadcast_message_thread, &data[0]);
+ start_thread(broadcast_progress_thread, &data[1]);
+ } else if (nc->data[0] != 'M' && nc->data[0] != 'W' && ev == MG_EV_HTTP_MSG) {
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
if (!mg_http_is_authorized(hm, global_auth_domain, global_auth_file))
mg_http_send_digest_auth_request(nc, global_auth_domain);
@@ -696,7 +696,7 @@ static void ev_handler(struct mg_connection *nc, int ev, void *ev_data)
restart_handler(nc, ev_data);
else
mg_http_serve_dir(nc, ev_data, &s_http_server_opts);
- } else if (nc->data[0] != 'M' && ev == MG_EV_READ) {
+ } else if (nc->data[0] != 'M' && nc->data[0] != 'W' && ev == MG_EV_READ) {
struct mg_http_message hm;
int hlen = mg_http_parse((char *) nc->recv.buf, nc->recv.len, &hm);
if (hlen > 0) {
@@ -727,6 +727,27 @@ static void ev_handler(struct mg_connection *nc, int ev, void *ev_data)
ERROR("%p %s", nc->fd, (char *) ev_data);
} else if (ev == MG_EV_WS_MSG) {
mg_iobuf_del(&nc->recv, 0, nc->recv.len);
+ } else if (ev == MG_EV_WAKEUP) {
+ // forward broadcast messages
+ struct mg_str *data = (struct mg_str *) ev_data;
+ struct mg_connection *t;
+ for (t = nc->mgr->conns; t != NULL; t = t->next) {
+ if (t->data[0] == 'W') {
+ mg_ws_send(t, data->buf, data->len, WEBSOCKET_OP_TEXT);
+ }
+ }
+ last_io_time = mg_millis();
+ } else if (ev == MG_EV_POLL) {
+ // websocket heartbeat every 20s
+ struct mg_connection *t;
+ uint64_t now = *((uint64_t *)ev_data);
+ if (now < last_io_time + 20000) return;
+ for (t = nc->mgr->conns; t != NULL; t = t->next) {
+ if (t->data[0] == 'W') {
+ mg_ws_send(t, "", 0, WEBSOCKET_OP_PING);
+ }
+ }
+ last_io_time = now;
}
}
@@ -908,8 +929,6 @@ int start_mongoose(const char *cfgfname, int argc, char *argv[])
signal(SIGTERM, signal_handler);
mg_mgr_init(&mgr);
- ws_pipe = mg_mkpipe(&mgr, broadcast_callback, NULL, true);
-
/* Parse url with port only fallback */
if (opts.port) {
if (mg_url_port(opts.port) != 0) {
@@ -934,8 +953,7 @@ int start_mongoose(const char *cfgfname, int argc, char *argv[])
exit(EXIT_FAILURE);
}
- start_thread(broadcast_message_thread, NULL);
- start_thread(broadcast_progress_thread, NULL);
+ mg_wakeup_init(&mgr);
INFO("Mongoose web server v%s with PID %d listening on %s and serving %s",
MG_VERSION, getpid(), url, s_http_server_opts.root_dir);
Use the new mg_wakeup() [1] mechansim to send data from the broadcast(s) to the parent event manager thread. Based on mongoose multi-threaded example [2]. [1] https://mongoose.ws/documentation/#mg_wakeup [2] https://github.com/cesanta/mongoose/blob/master/tutorials/core/multi-threaded-12m/main.c Signed-off-by: Michael Glembotzki <Michael.Glembotzki@iris-sensing.com> --- mongoose/mongoose_interface.c | 104 ++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 43 deletions(-)