diff mbox series

[1/3] progress: acknowledge client registration

Message ID d7d42367-b963-42c4-a1cf-4f547f1a5683@witekio.com
State Changes Requested
Headers show
Series [1/3] progress: acknowledge client registration | expand

Commit Message

fhoerni.opensource Oct. 11, 2024, 1:16 p.m. UTC
From: Frederic Hoerni <fhoerni@witekio.com>

The client that connected to the progress socket had no clear confirmation that
the daemon was ready to send progress events.

This patch makes the daemon send an acknowledgement when the subscription is
duly registered.

On the client side, the function progress_ipc_connect() waits for the
acknowledgement before returning.

Signed-off-by: Frederic Hoerni <fhoerni@witekio.com>
---
  core/progress_thread.c |  18 +++++++
  include/progress_ipc.h |  13 ++++-
  ipc/progress_ipc.c     | 120 ++++++++++++++++++++++++++++++++++++++++-
  3 files changed, 149 insertions(+), 2 deletions(-)

Comments

Stefano Babic Oct. 11, 2024, 1:29 p.m. UTC | #1
Hi Frederic,

On 11.10.24 15:16, 'Frederic Hoerni' via swupdate wrote:
> From: Frederic Hoerni <fhoerni@witekio.com>
>
> The client that connected to the progress socket had no clear
> confirmation that
> the daemon was ready to send progress events.
>
> This patch makes the daemon send an acknowledgement when the
> subscription is
> duly registered.
>
> On the client side, the function progress_ipc_connect() waits for the
> acknowledgement before returning.
>
> Signed-off-by: Frederic Hoerni <fhoerni@witekio.com>
> ---
>   core/progress_thread.c |  18 +++++++
>   include/progress_ipc.h |  13 ++++-
>   ipc/progress_ipc.c     | 120 ++++++++++++++++++++++++++++++++++++++++-
>   3 files changed, 149 insertions(+), 2 deletions(-)
>
> diff --git a/core/progress_thread.c b/core/progress_thread.c
> index e17d931c..8d700db6 100644
> --- a/core/progress_thread.c
> +++ b/core/progress_thread.c
> @@ -253,6 +253,22 @@ void swupdate_progress_done(const char *info)
>       pthread_mutex_unlock(&pprog->lock);
>   }
>
> +static void progress_send_connect_ack(int connfd)
> +{
> +    int n;
> +    struct progress_connect_ack ack;
> +    memset(&ack, 0, sizeof(ack));
> +    ack.apiversion = PROGRESS_API_VERSION;
> +    strncpy(ack.magic, PROGRESS_CONNECT_ACK_MAGIC,
> strlen(PROGRESS_CONNECT_ACK_MAGIC));
> +    do {
> +        n = send(connfd, &ack, sizeof(ack), MSG_NOSIGNAL | MSG_DONTWAIT);
> +    } while (n == -1 && errno == EINTR);
> +
> +    if (n != sizeof(ack)) {
> +        fprintf(stderr, "progress_send_connect_ack error: n=%d,
> errno=%d\n", n, errno);
> +    }
> +}
> +
>   void *progress_bar_thread (void __attribute__ ((__unused__)) *data)
>   {
>       int listen, connfd;
> @@ -297,6 +313,8 @@ void *progress_bar_thread (void __attribute__
> ((__unused__)) *data)
>           conn->sockfd = connfd;
>           pthread_mutex_lock(&pprog->lock);
>           SIMPLEQ_INSERT_TAIL(&pprog->conns, conn, next);
> +        /* Send an ACK to the client to indicate that it is duly
> registered */
> +        progress_send_connect_ack(connfd);
>           pthread_mutex_unlock(&pprog->lock);
>       } while(1);
>   }
> diff --git a/include/progress_ipc.h b/include/progress_ipc.h
> index f0b4b816..e79d476d 100644
> --- a/include/progress_ipc.h
> +++ b/include/progress_ipc.h
> @@ -38,13 +38,18 @@ extern char* SOCKET_PROGRESS_PATH;
>    * - changes in major mean an incompatibility and clients do not work
> anymore
>    */
>
> -#define PROGRESS_API_MAJOR    1
> +#define PROGRESS_API_MAJOR    2
>   #define PROGRESS_API_MINOR    0
>   #define PROGRESS_API_PATCH    0
>
>   #define PROGRESS_API_VERSION     ((PROGRESS_API_MAJOR & 0xFFFF) << 16 | \
>                   (PROGRESS_API_MINOR & 0xFF) << 8 | \
>                   (PROGRESS_API_PATCH & 0xFF))
> +
> +inline int progress_is_major_version_compatible(int other_version)
> +{
> +    return  PROGRESS_API_MAJOR == ((other_version >> 16) & 0xFFFF);
> +}
>   /*
>    * Message sent via progress socket.
>    * Data is sent in LE if required.
> @@ -64,6 +69,12 @@ struct progress_msg {
>       char        info[PRINFOSIZE]; /* additional information about
> install */
>   };
>
> +#define PROGRESS_CONNECT_ACK_MAGIC "ACK"
> +struct progress_connect_ack {
> +    unsigned int apiversion; /* API Version for compatibility check */
> +    char magic[4];           /* null-terminated string */
> +};
> +
>   char *get_prog_socket(void);
>
>   /* Standard function to connect to progress interface */
> diff --git a/ipc/progress_ipc.c b/ipc/progress_ipc.c
> index c5ac9b95..dc8b4db3 100644
> --- a/ipc/progress_ipc.c
> +++ b/ipc/progress_ipc.c
> @@ -8,12 +8,13 @@
>   #include <sys/socket.h>
>   #include <sys/un.h>
>   #include <errno.h>
> +#include <poll.h>
>   #include <string.h>
>   #include <stdio.h>
>   #include <stdlib.h>
> +#include <time.h>
>   #include <unistd.h>
>   #include <stdbool.h>
> -
>   #include <progress_ipc.h>
>
>   #ifdef CONFIG_SOCKET_PROGRESS_PATH
> @@ -40,10 +41,120 @@ char *get_prog_socket(void) {
>       return SOCKET_PROGRESS_PATH;
>   }
>
> +/* Decrease timeout depending on elapsed time */
> +static int update_timeout(const struct timespec *initial_time, int
> *timeout_ms)
> +{
> +    struct timespec current_time;
> +    int diff_timeout_ms;
> +    struct timespec elapsed;
> +    int err = clock_gettime(CLOCK_MONOTONIC, &current_time);
> +    if (err) {
> +        perror("update_timeout: clock_gettime");
> +        return -1;
> +    }
> +    elapsed.tv_sec = current_time.tv_sec - initial_time->tv_sec;
> +    elapsed.tv_nsec = current_time.tv_nsec - initial_time->tv_nsec;
> +
> +    diff_timeout_ms = *timeout_ms - (elapsed.tv_sec*1000 +
> elapsed.tv_nsec/1E6);
> +
> +    *timeout_ms = diff_timeout_ms;
> +    return 0;
> +}
> +
> +static int progress_ipc_recv_ack(int fd, struct progress_connect_ack *ack)
> +{
> +    struct timespec initial_time;
> +    int err;
> +    int timeout_ms = 5000; /* 5 s should be enough in most cases as the
> socket is local */
> +    err = clock_gettime(CLOCK_MONOTONIC, &initial_time);
> +    if (err) {
> +        perror("progress_ipc_recv_ack: clock_gettime");
> +        return -1;
> +    }
> +
> +    unsigned int size_to_read = sizeof(struct progress_connect_ack);
> +    unsigned int offset = 0;
> +
> +    while (size_to_read > 0) {
> +        int err_poll;
> +        struct pollfd pfds[1];
> +        pfds[0].fd = fd;
> +        pfds[0].events = POLLIN;
> +        do {
> +            err_poll = poll(pfds, 1, timeout_ms);
> +            int err_update_timeout = update_timeout(&initial_time,
> &timeout_ms);
> +            if (err_update_timeout) return -1;
> +        } while (err_poll < 0 && errno == EINTR);
> +
> +        if (err_poll == -1) {
> +            fprintf(stderr, "progress_ipc_recv_ack: poll error\n");
> +            break;
> +        } else if (err_poll == 0) {
> +            /* Timeout */
> +            fprintf(stderr, "progress_ipc_recv_ack error: timeout\n");
> +            break;
> +        } else if (pfds[0].revents & POLLHUP) {
> +            /* The peer closed its end of the channel */
> +            /* (note that some operating systems also set POLLIN in
> this case) */
> +            fprintf(stderr, "progress_ipc_recv_ack error: peer closed\n");
> +            break;
> +        } else if (pfds[0].revents & POLLIN) {
> +            /* There is a message to read */
> +            int n = read(fd, (void*)ack + offset, size_to_read);
> +            if (n == -1 && errno == EINTR) {
> +                continue; /* redo poll() and timeout management */
> +            } else if (n <= 0) {
> +                /* read error, as at least 1 byte should be pending */
> +                fprintf(stderr, "progress_ipc_recv_ack: read error:
> n=%d, errno=%d\n", n, errno);
> +                break;
> +            }
> +            size_to_read -= n;
> +            offset += n;
> +
> +        } else {
> +            /* unexpected error */
> +            fprintf(stderr, "progress_ipc_recv_ack: poll returned %d
> with revents=0x%x\n", err_poll, pfds[0].revents);
> +        }
> +    }
> +    if (size_to_read == 0) {
> +        /* The expected number of bytes was received */
> +        return 0;
> +    } else {
> +        return -1;
> +    }
> +}
> +
> +/* Wait for the daemon to send an ACK
> + *
> + * Returns:
> + *     -1 error (timeout, peer closed, invalid ACK, ...)
> + *      0 success
> + */
> +static int progress_ipc_wait_for_ack(int fd)
> +{
> +    struct progress_connect_ack ack;
> +    int err = progress_ipc_recv_ack(fd, &ack);
> +    if (err) {
> +        return -1;
> +    }
> +    if (! progress_is_major_version_compatible(ack.apiversion)) {
> +        fprintf(stderr, "progress_ipc_wait_for_ack: recv incompatible
> version: %x\n",
> +                ack.apiversion);
> +        return -1;
> +    }
> +    if (0 != strcmp(ack.magic, PROGRESS_CONNECT_ACK_MAGIC)) {
> +        fprintf(stderr, "progress_ipc_wait_for_ack: recv invalid magic:
> %s (expected %s)\n",
> +                ack.magic, PROGRESS_CONNECT_ACK_MAGIC);
> +        return -1;
> +    }
> +    return 0;
> +}
> +
>   static int _progress_ipc_connect(const char *socketpath, bool reconnect)
>   {
>       struct sockaddr_un servaddr;
>       int fd = socket(AF_LOCAL, SOCK_STREAM, 0);
> +    int ret;
>       bzero(&servaddr, sizeof(servaddr));
>       servaddr.sun_family = AF_LOCAL;
>       strncpy(servaddr.sun_path, socketpath, sizeof(servaddr.sun_path) -
> 1);
> @@ -67,6 +178,13 @@ static int _progress_ipc_connect(const char
> *socketpath, bool reconnect)
>           usleep(10000);
>       } while (true);
>
> +    /* Connected. Wait for ACK */
> +    ret = progress_ipc_wait_for_ack(fd);
> +    if (ret == -1) {
> +        close(fd);
> +        return -1;
> +    }
> +
>       return fd;
>   }
>



I cannot apply them again,

Applying patch #1996122 using "git am"
Description: [1/3] progress: acknowledge client registration
Applying: progress: acknowledge client registration
error: patch failed: core/progress_thread.c:253
error: core/progress_thread.c: patch does not apply
error: patch failed: include/progress_ipc.h:38
error: include/progress_ipc.h: patch does not apply
error: patch failed: ipc/progress_ipc.c:8
error: ipc/progress_ipc.c: patch does not apply
Patch failed at 0001 progress: acknowledge client registration
diff mbox series

Patch

diff --git a/core/progress_thread.c b/core/progress_thread.c
index e17d931c..8d700db6 100644
--- a/core/progress_thread.c
+++ b/core/progress_thread.c
@@ -253,6 +253,22 @@  void swupdate_progress_done(const char *info)
  	pthread_mutex_unlock(&pprog->lock);
  }
  
+static void progress_send_connect_ack(int connfd)
+{
+	int n;
+	struct progress_connect_ack ack;
+	memset(&ack, 0, sizeof(ack));
+	ack.apiversion = PROGRESS_API_VERSION;
+	strncpy(ack.magic, PROGRESS_CONNECT_ACK_MAGIC, strlen(PROGRESS_CONNECT_ACK_MAGIC));
+	do {
+		n = send(connfd, &ack, sizeof(ack), MSG_NOSIGNAL | MSG_DONTWAIT);
+	} while (n == -1 && errno == EINTR);
+
+	if (n != sizeof(ack)) {
+		fprintf(stderr, "progress_send_connect_ack error: n=%d, errno=%d\n", n, errno);
+	}
+}
+
  void *progress_bar_thread (void __attribute__ ((__unused__)) *data)
  {
  	int listen, connfd;
@@ -297,6 +313,8 @@  void *progress_bar_thread (void __attribute__ ((__unused__)) *data)
  		conn->sockfd = connfd;
  		pthread_mutex_lock(&pprog->lock);
  		SIMPLEQ_INSERT_TAIL(&pprog->conns, conn, next);
+		/* Send an ACK to the client to indicate that it is duly registered */
+		progress_send_connect_ack(connfd);
  		pthread_mutex_unlock(&pprog->lock);
  	} while(1);
  }
diff --git a/include/progress_ipc.h b/include/progress_ipc.h
index f0b4b816..e79d476d 100644
--- a/include/progress_ipc.h
+++ b/include/progress_ipc.h
@@ -38,13 +38,18 @@  extern char* SOCKET_PROGRESS_PATH;
   * - changes in major mean an incompatibility and clients do not work anymore
   */
  
-#define PROGRESS_API_MAJOR	1
+#define PROGRESS_API_MAJOR	2
  #define PROGRESS_API_MINOR	0
  #define PROGRESS_API_PATCH	0
  
  #define PROGRESS_API_VERSION 	((PROGRESS_API_MAJOR & 0xFFFF) << 16 | \
  				(PROGRESS_API_MINOR & 0xFF) << 8 | \
  				(PROGRESS_API_PATCH & 0xFF))
+
+inline int progress_is_major_version_compatible(int other_version)
+{
+	return  PROGRESS_API_MAJOR == ((other_version >> 16) & 0xFFFF);
+}
  /*
   * Message sent via progress socket.
   * Data is sent in LE if required.
@@ -64,6 +69,12 @@  struct progress_msg {
  	char		info[PRINFOSIZE]; /* additional information about install */
  };
  
+#define PROGRESS_CONNECT_ACK_MAGIC "ACK"
+struct progress_connect_ack {
+	unsigned int apiversion; /* API Version for compatibility check */
+	char magic[4];           /* null-terminated string */
+};
+
  char *get_prog_socket(void);
  
  /* Standard function to connect to progress interface */
diff --git a/ipc/progress_ipc.c b/ipc/progress_ipc.c
index c5ac9b95..dc8b4db3 100644
--- a/ipc/progress_ipc.c
+++ b/ipc/progress_ipc.c
@@ -8,12 +8,13 @@ 
  #include <sys/socket.h>
  #include <sys/un.h>
  #include <errno.h>
+#include <poll.h>
  #include <string.h>
  #include <stdio.h>
  #include <stdlib.h>
+#include <time.h>
  #include <unistd.h>
  #include <stdbool.h>
-
  #include <progress_ipc.h>
  
  #ifdef CONFIG_SOCKET_PROGRESS_PATH
@@ -40,10 +41,120 @@  char *get_prog_socket(void) {
  	return SOCKET_PROGRESS_PATH;
  }
  
+/* Decrease timeout depending on elapsed time */
+static int update_timeout(const struct timespec *initial_time, int *timeout_ms)
+{
+	struct timespec current_time;
+	int diff_timeout_ms;
+	struct timespec elapsed;
+	int err = clock_gettime(CLOCK_MONOTONIC, &current_time);
+	if (err) {
+		perror("update_timeout: clock_gettime");
+		return -1;
+	}
+	elapsed.tv_sec = current_time.tv_sec - initial_time->tv_sec;
+	elapsed.tv_nsec = current_time.tv_nsec - initial_time->tv_nsec;
+
+	diff_timeout_ms = *timeout_ms - (elapsed.tv_sec*1000 + elapsed.tv_nsec/1E6);
+
+	*timeout_ms = diff_timeout_ms;
+	return 0;
+}
+
+static int progress_ipc_recv_ack(int fd, struct progress_connect_ack *ack)
+{
+	struct timespec initial_time;
+	int err;
+	int timeout_ms = 5000; /* 5 s should be enough in most cases as the socket is local */
+	err = clock_gettime(CLOCK_MONOTONIC, &initial_time);
+	if (err) {
+		perror("progress_ipc_recv_ack: clock_gettime");
+		return -1;
+	}
+
+	unsigned int size_to_read = sizeof(struct progress_connect_ack);
+	unsigned int offset = 0;
+
+	while (size_to_read > 0) {
+		int err_poll;
+		struct pollfd pfds[1];
+		pfds[0].fd = fd;
+		pfds[0].events = POLLIN;
+		do {
+			err_poll = poll(pfds, 1, timeout_ms);
+			int err_update_timeout = update_timeout(&initial_time, &timeout_ms);
+			if (err_update_timeout) return -1;
+		} while (err_poll < 0 && errno == EINTR);
+
+		if (err_poll == -1) {
+			fprintf(stderr, "progress_ipc_recv_ack: poll error\n");
+			break;
+		} else if (err_poll == 0) {
+			/* Timeout */
+			fprintf(stderr, "progress_ipc_recv_ack error: timeout\n");
+			break;
+		} else if (pfds[0].revents & POLLHUP) {
+			/* The peer closed its end of the channel */
+			/* (note that some operating systems also set POLLIN in this case) */
+			fprintf(stderr, "progress_ipc_recv_ack error: peer closed\n");
+			break;
+		} else if (pfds[0].revents & POLLIN) {
+			/* There is a message to read */
+			int n = read(fd, (void*)ack + offset, size_to_read);
+			if (n == -1 && errno == EINTR) {
+				continue; /* redo poll() and timeout management */
+			} else if (n <= 0) {
+				/* read error, as at least 1 byte should be pending */
+				fprintf(stderr, "progress_ipc_recv_ack: read error: n=%d, errno=%d\n", n, errno);
+				break;
+			}
+			size_to_read -= n;
+			offset += n;
+
+		} else {
+			/* unexpected error */
+			fprintf(stderr, "progress_ipc_recv_ack: poll returned %d with revents=0x%x\n", err_poll, pfds[0].revents);
+		}
+	}
+	if (size_to_read == 0) {
+		/* The expected number of bytes was received */
+		return 0;
+	} else {
+		return -1;
+	}
+}
+
+/* Wait for the daemon to send an ACK
+ *
+ * Returns:
+ *     -1 error (timeout, peer closed, invalid ACK, ...)
+ *      0 success
+ */
+static int progress_ipc_wait_for_ack(int fd)
+{
+	struct progress_connect_ack ack;
+	int err = progress_ipc_recv_ack(fd, &ack);
+	if (err) {
+		return -1;
+	}
+	if (! progress_is_major_version_compatible(ack.apiversion)) {
+		fprintf(stderr, "progress_ipc_wait_for_ack: recv incompatible version: %x\n",
+		        ack.apiversion);
+		return -1;
+	}
+	if (0 != strcmp(ack.magic, PROGRESS_CONNECT_ACK_MAGIC)) {
+		fprintf(stderr, "progress_ipc_wait_for_ack: recv invalid magic: %s (expected %s)\n",
+		        ack.magic, PROGRESS_CONNECT_ACK_MAGIC);
+		return -1;
+	}
+	return 0;
+}
+
  static int _progress_ipc_connect(const char *socketpath, bool reconnect)
  {
  	struct sockaddr_un servaddr;
  	int fd = socket(AF_LOCAL, SOCK_STREAM, 0);
+	int ret;
  	bzero(&servaddr, sizeof(servaddr));
  	servaddr.sun_family = AF_LOCAL;
  	strncpy(servaddr.sun_path, socketpath, sizeof(servaddr.sun_path) - 1);
@@ -67,6 +178,13 @@  static int _progress_ipc_connect(const char *socketpath, bool reconnect)
  		usleep(10000);
  	} while (true);
  
+	/* Connected. Wait for ACK */
+	ret = progress_ipc_wait_for_ack(fd);
+	if (ret == -1) {
+		close(fd);
+		return -1;
+	}
+
  	return fd;
  }