diff mbox series

[21/21] mongoose: Replace deprecated mg_mkpipe() with mg_wakeup()

Message ID 20240615191941.40301-22-Michael.Glembotzki@iris-sensing.com
State Changes Requested
Headers show
Series Update Mongoose to 7.14 | expand

Commit Message

Michael Glembotzki June 15, 2024, 7:11 p.m. UTC
Use the new mg_wakeup() [1] mechansim to send data from the broadcast(s) to
the parent event manager thread.

Based on mongoose multi-threaded example [2].

[1] https://mongoose.ws/documentation/#mg_wakeup
[2] https://github.com/cesanta/mongoose/blob/master/tutorials/core/multi-threaded-12m/main.c

Signed-off-by: Michael Glembotzki <Michael.Glembotzki@iris-sensing.com>
---
 mongoose/mongoose_interface.c | 104 ++++++++++++++++++++--------------
 1 file changed, 61 insertions(+), 43 deletions(-)
diff mbox series

Patch

diff --git a/mongoose/mongoose_interface.c b/mongoose/mongoose_interface.c
index f67b1089..260ed8f9 100644
--- a/mongoose/mongoose_interface.c
+++ b/mongoose/mongoose_interface.c
@@ -64,6 +64,11 @@  struct file_upload_state {
 	uint64_t last_io_time;
 };
 
+struct parent_connection_info {
+	struct mg_mgr *mgr;
+	unsigned long conn_id;
+};
+
 static bool run_postupdate;
 static unsigned int watchdog_conn = 0;
 static struct mg_http_serve_opts s_http_server_opts;
@@ -74,8 +79,6 @@  static bool ssl;
 static struct mg_tls_opts tls_opts;
 #endif
 
-static int ws_pipe;
-
 static int s_signo = 0;
 static void signal_handler(int signo) {
 	s_signo = signo;
@@ -358,30 +361,6 @@  static void restart_handler(struct mg_connection *nc, void *ev_data)
 	mg_http_reply(nc, 201, "", "%s", "Device will reboot now.\n");
 }
 
-static void broadcast_callback(struct mg_connection *nc, int ev,
-		void __attribute__ ((__unused__)) *ev_data)
-{
-	static uint64_t last_io_time = 0;
-	if (ev == MG_EV_READ) {
-		struct mg_connection *t;
-		for (t = nc->mgr->conns; t != NULL; t = t->next) {
-			if (!t->is_websocket) continue;
-			mg_ws_send(t,(char *)nc->recv.buf, nc->recv.len, WEBSOCKET_OP_TEXT);
-		}
-		mg_iobuf_del(&nc->recv, 0, nc->recv.len);
-		last_io_time = mg_millis();
-	} else if (ev == MG_EV_POLL) {
-		struct mg_connection *t;
-		uint64_t now = *((uint64_t *)ev_data);
-		if (now < last_io_time + 20000) return;
-		for (t = nc->mgr->conns; t != NULL; t = t->next) {
-			if (!t->is_websocket) continue;
-			mg_ws_send(t, "", 0, WEBSOCKET_OP_PING);
-		}
-		last_io_time = now;
-	}
-}
-
 static int level_to_rfc_5424(int level)
 {
 	switch(level) {
@@ -398,14 +377,18 @@  static int level_to_rfc_5424(int level)
 	}
 }
 
-static void broadcast(char *str)
+static void broadcast(struct parent_connection_info *p, char *str)
 {
-	send(ws_pipe, str, strlen(str), 0);
+	mg_wakeup(p->mgr, p->conn_id, str, strlen(str));
 }
 
-static void *broadcast_message_thread(void __attribute__ ((__unused__)) *data)
+static void *broadcast_message_thread(void *data)
 {
 	int fd = -1;
+	struct parent_connection_info *p = (struct parent_connection_info *) data;
+
+	if(!p)
+		return NULL;
 
 	for (;;) {
 		ipc_message msg;
@@ -423,7 +406,7 @@  static void *broadcast_message_thread(void __attribute__ ((__unused__)) *data)
 
 		ret = ipc_notify_receive(&fd, &msg);
 		if (ret != sizeof(msg))
-			return NULL;
+			break;
 
 		if (strlen(msg.data.notify.msg) != 0 &&
 				msg.data.status.current != PROGRESS) {
@@ -441,18 +424,24 @@  static void *broadcast_message_thread(void __attribute__ ((__unused__)) *data)
 					 level_to_rfc_5424(msg.data.notify.level), /* RFC 5424 */
 					 text);
 
-			broadcast(str);
+			broadcast(p, str);
 		}
 	}
+	free(p);
+	return NULL;
 }
 
-static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
+static void *broadcast_progress_thread(void *data)
 {
 	RECOVERY_STATUS status = -1;
 	sourcetype source = -1;
 	unsigned int step = 0;
 	uint8_t percent = 0;
 	int fd = -1;
+	struct parent_connection_info *p = (struct parent_connection_info *) data;
+
+	if(!p)
+		return NULL;
 
 	for (;;) {
 		struct progress_msg msg;
@@ -472,7 +461,7 @@  static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
 
 		ret = progress_ipc_receive(&fd, &msg);
 		if (ret != sizeof(msg))
-			return NULL;
+			break;
 
 		if (msg.status != PROGRESS &&
 		    (msg.status != status || msg.status == FAILURE)) {
@@ -486,7 +475,7 @@  static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
 				"\t\"status\": \"%s\"\r\n"
 				"}\r\n",
 				escaped);
-			broadcast(str);
+			broadcast(p, str);
 		}
 
 		if (msg.source != source) {
@@ -498,7 +487,7 @@  static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
 				"\t\"source\": \"%s\"\r\n"
 				"}\r\n",
 				get_source_string(msg.source));
-			broadcast(str);
+			broadcast(p, str);
 		}
 
 		if (msg.status == SUCCESS && msg.source == SOURCE_WEBSERVER && run_postupdate) {
@@ -516,7 +505,7 @@  static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
 				"\t\"source\": \"%s\"\r\n"
 				"}\r\n",
 				escaped);
-			broadcast(str);
+			broadcast(p, str);
 		}
 
 		if ((msg.cur_step != step || msg.cur_percent != percent) &&
@@ -538,9 +527,11 @@  static void *broadcast_progress_thread(void __attribute__ ((__unused__)) *data)
 				msg.cur_step,
 				escaped,
 				msg.cur_percent);
-			broadcast(str);
+			broadcast(p, str);
 		}
 	}
+	free(p);
+	return NULL;
 }
 
 static void timer_ev_handler(void *fn_data)
@@ -682,11 +673,20 @@  static void websocket_handler(struct mg_connection *nc, void *ev_data)
 {
 	struct mg_http_message *hm = (struct mg_http_message *) ev_data;
 	mg_ws_upgrade(nc, hm, NULL);
+	nc->data[0] = 'W'; // Set unique Websocket marker
 }
 
 static void ev_handler(struct mg_connection *nc, int ev, void *ev_data)
 {
-	if (nc->data[0] != 'M' && ev == MG_EV_HTTP_MSG) {
+	static uint64_t last_io_time = 0;
+	if (ev == MG_EV_OPEN && nc->is_listening) {
+		struct parent_connection_info *data = calloc(2, sizeof(struct parent_connection_info));
+		data[0].mgr = nc->mgr;
+		data[0].conn_id = nc->id;
+		memcpy(&data[1], &data[0], sizeof(struct parent_connection_info));
+		start_thread(broadcast_message_thread, &data[0]);
+		start_thread(broadcast_progress_thread, &data[1]);
+	} else if (nc->data[0] != 'M' && nc->data[0] != 'W' && ev == MG_EV_HTTP_MSG) {
 		struct mg_http_message *hm = (struct mg_http_message *) ev_data;
 		if (!mg_http_is_authorized(hm, global_auth_domain, global_auth_file))
 			mg_http_send_digest_auth_request(nc, global_auth_domain);
@@ -696,7 +696,7 @@  static void ev_handler(struct mg_connection *nc, int ev, void *ev_data)
 			restart_handler(nc, ev_data);
 		else
 			mg_http_serve_dir(nc, ev_data, &s_http_server_opts);
-	} else if (nc->data[0] != 'M' && ev == MG_EV_READ) {
+	} else if (nc->data[0] != 'M' && nc->data[0] != 'W' && ev == MG_EV_READ) {
 		struct mg_http_message hm;
 		int hlen = mg_http_parse((char *) nc->recv.buf, nc->recv.len, &hm);
 		if (hlen > 0) {
@@ -727,6 +727,27 @@  static void ev_handler(struct mg_connection *nc, int ev, void *ev_data)
 		ERROR("%p %s", nc->fd, (char *) ev_data);
 	} else if (ev == MG_EV_WS_MSG) {
 		mg_iobuf_del(&nc->recv, 0, nc->recv.len);
+	} else if (ev == MG_EV_WAKEUP) {
+		// forward broadcast messages
+		struct mg_str *data = (struct mg_str *) ev_data;
+		struct mg_connection *t;
+		for (t = nc->mgr->conns; t != NULL; t = t->next) {
+			if (t->data[0] == 'W') {
+				mg_ws_send(t, data->buf, data->len, WEBSOCKET_OP_TEXT);
+			}
+		}
+		last_io_time = mg_millis();
+	} else if (ev == MG_EV_POLL) {
+		// websocket heartbeat every 20s
+		struct mg_connection *t;
+		uint64_t now = *((uint64_t *)ev_data);
+		if (now < last_io_time + 20000) return;
+		for (t = nc->mgr->conns; t != NULL; t = t->next) {
+			if (t->data[0] == 'W') {
+				mg_ws_send(t, "", 0, WEBSOCKET_OP_PING);
+			}
+		}
+		last_io_time = now;
 	}
 }
 
@@ -908,8 +929,6 @@  int start_mongoose(const char *cfgfname, int argc, char *argv[])
 	signal(SIGTERM, signal_handler);
 	mg_mgr_init(&mgr);
 
-	ws_pipe = mg_mkpipe(&mgr, broadcast_callback, NULL, true);
-
 	/* Parse url with port only fallback */
 	if (opts.port) {
 		if (mg_url_port(opts.port) != 0) {
@@ -934,8 +953,7 @@  int start_mongoose(const char *cfgfname, int argc, char *argv[])
 		exit(EXIT_FAILURE);
 	}
 
-	start_thread(broadcast_message_thread, NULL);
-	start_thread(broadcast_progress_thread, NULL);
+	mg_wakeup_init(&mgr);
 
 	INFO("Mongoose web server v%s with PID %d listening on %s and serving %s",
 		MG_VERSION, getpid(), url, s_http_server_opts.root_dir);