Message ID | d7d42367-b963-42c4-a1cf-4f547f1a5683@witekio.com |
---|---|
State | Changes Requested |
Headers | show |
Series | [1/3] progress: acknowledge client registration | expand |
Hi Frederic, On 11.10.24 15:16, 'Frederic Hoerni' via swupdate wrote: > From: Frederic Hoerni <fhoerni@witekio.com> > > The client that connected to the progress socket had no clear > confirmation that > the daemon was ready to send progress events. > > This patch makes the daemon send an acknowledgement when the > subscription is > duly registered. > > On the client side, the function progress_ipc_connect() waits for the > acknowledgement before returning. > > Signed-off-by: Frederic Hoerni <fhoerni@witekio.com> > --- > core/progress_thread.c | 18 +++++++ > include/progress_ipc.h | 13 ++++- > ipc/progress_ipc.c | 120 ++++++++++++++++++++++++++++++++++++++++- > 3 files changed, 149 insertions(+), 2 deletions(-) > > diff --git a/core/progress_thread.c b/core/progress_thread.c > index e17d931c..8d700db6 100644 > --- a/core/progress_thread.c > +++ b/core/progress_thread.c > @@ -253,6 +253,22 @@ void swupdate_progress_done(const char *info) > pthread_mutex_unlock(&pprog->lock); > } > > +static void progress_send_connect_ack(int connfd) > +{ > + int n; > + struct progress_connect_ack ack; > + memset(&ack, 0, sizeof(ack)); > + ack.apiversion = PROGRESS_API_VERSION; > + strncpy(ack.magic, PROGRESS_CONNECT_ACK_MAGIC, > strlen(PROGRESS_CONNECT_ACK_MAGIC)); > + do { > + n = send(connfd, &ack, sizeof(ack), MSG_NOSIGNAL | MSG_DONTWAIT); > + } while (n == -1 && errno == EINTR); > + > + if (n != sizeof(ack)) { > + fprintf(stderr, "progress_send_connect_ack error: n=%d, > errno=%d\n", n, errno); > + } > +} > + > void *progress_bar_thread (void __attribute__ ((__unused__)) *data) > { > int listen, connfd; > @@ -297,6 +313,8 @@ void *progress_bar_thread (void __attribute__ > ((__unused__)) *data) > conn->sockfd = connfd; > pthread_mutex_lock(&pprog->lock); > SIMPLEQ_INSERT_TAIL(&pprog->conns, conn, next); > + /* Send an ACK to the client to indicate that it is duly > registered */ > + progress_send_connect_ack(connfd); > pthread_mutex_unlock(&pprog->lock); > } while(1); > } > diff --git a/include/progress_ipc.h b/include/progress_ipc.h > index f0b4b816..e79d476d 100644 > --- a/include/progress_ipc.h > +++ b/include/progress_ipc.h > @@ -38,13 +38,18 @@ extern char* SOCKET_PROGRESS_PATH; > * - changes in major mean an incompatibility and clients do not work > anymore > */ > > -#define PROGRESS_API_MAJOR 1 > +#define PROGRESS_API_MAJOR 2 > #define PROGRESS_API_MINOR 0 > #define PROGRESS_API_PATCH 0 > > #define PROGRESS_API_VERSION ((PROGRESS_API_MAJOR & 0xFFFF) << 16 | \ > (PROGRESS_API_MINOR & 0xFF) << 8 | \ > (PROGRESS_API_PATCH & 0xFF)) > + > +inline int progress_is_major_version_compatible(int other_version) > +{ > + return PROGRESS_API_MAJOR == ((other_version >> 16) & 0xFFFF); > +} > /* > * Message sent via progress socket. > * Data is sent in LE if required. > @@ -64,6 +69,12 @@ struct progress_msg { > char info[PRINFOSIZE]; /* additional information about > install */ > }; > > +#define PROGRESS_CONNECT_ACK_MAGIC "ACK" > +struct progress_connect_ack { > + unsigned int apiversion; /* API Version for compatibility check */ > + char magic[4]; /* null-terminated string */ > +}; > + > char *get_prog_socket(void); > > /* Standard function to connect to progress interface */ > diff --git a/ipc/progress_ipc.c b/ipc/progress_ipc.c > index c5ac9b95..dc8b4db3 100644 > --- a/ipc/progress_ipc.c > +++ b/ipc/progress_ipc.c > @@ -8,12 +8,13 @@ > #include <sys/socket.h> > #include <sys/un.h> > #include <errno.h> > +#include <poll.h> > #include <string.h> > #include <stdio.h> > #include <stdlib.h> > +#include <time.h> > #include <unistd.h> > #include <stdbool.h> > - > #include <progress_ipc.h> > > #ifdef CONFIG_SOCKET_PROGRESS_PATH > @@ -40,10 +41,120 @@ char *get_prog_socket(void) { > return SOCKET_PROGRESS_PATH; > } > > +/* Decrease timeout depending on elapsed time */ > +static int update_timeout(const struct timespec *initial_time, int > *timeout_ms) > +{ > + struct timespec current_time; > + int diff_timeout_ms; > + struct timespec elapsed; > + int err = clock_gettime(CLOCK_MONOTONIC, ¤t_time); > + if (err) { > + perror("update_timeout: clock_gettime"); > + return -1; > + } > + elapsed.tv_sec = current_time.tv_sec - initial_time->tv_sec; > + elapsed.tv_nsec = current_time.tv_nsec - initial_time->tv_nsec; > + > + diff_timeout_ms = *timeout_ms - (elapsed.tv_sec*1000 + > elapsed.tv_nsec/1E6); > + > + *timeout_ms = diff_timeout_ms; > + return 0; > +} > + > +static int progress_ipc_recv_ack(int fd, struct progress_connect_ack *ack) > +{ > + struct timespec initial_time; > + int err; > + int timeout_ms = 5000; /* 5 s should be enough in most cases as the > socket is local */ > + err = clock_gettime(CLOCK_MONOTONIC, &initial_time); > + if (err) { > + perror("progress_ipc_recv_ack: clock_gettime"); > + return -1; > + } > + > + unsigned int size_to_read = sizeof(struct progress_connect_ack); > + unsigned int offset = 0; > + > + while (size_to_read > 0) { > + int err_poll; > + struct pollfd pfds[1]; > + pfds[0].fd = fd; > + pfds[0].events = POLLIN; > + do { > + err_poll = poll(pfds, 1, timeout_ms); > + int err_update_timeout = update_timeout(&initial_time, > &timeout_ms); > + if (err_update_timeout) return -1; > + } while (err_poll < 0 && errno == EINTR); > + > + if (err_poll == -1) { > + fprintf(stderr, "progress_ipc_recv_ack: poll error\n"); > + break; > + } else if (err_poll == 0) { > + /* Timeout */ > + fprintf(stderr, "progress_ipc_recv_ack error: timeout\n"); > + break; > + } else if (pfds[0].revents & POLLHUP) { > + /* The peer closed its end of the channel */ > + /* (note that some operating systems also set POLLIN in > this case) */ > + fprintf(stderr, "progress_ipc_recv_ack error: peer closed\n"); > + break; > + } else if (pfds[0].revents & POLLIN) { > + /* There is a message to read */ > + int n = read(fd, (void*)ack + offset, size_to_read); > + if (n == -1 && errno == EINTR) { > + continue; /* redo poll() and timeout management */ > + } else if (n <= 0) { > + /* read error, as at least 1 byte should be pending */ > + fprintf(stderr, "progress_ipc_recv_ack: read error: > n=%d, errno=%d\n", n, errno); > + break; > + } > + size_to_read -= n; > + offset += n; > + > + } else { > + /* unexpected error */ > + fprintf(stderr, "progress_ipc_recv_ack: poll returned %d > with revents=0x%x\n", err_poll, pfds[0].revents); > + } > + } > + if (size_to_read == 0) { > + /* The expected number of bytes was received */ > + return 0; > + } else { > + return -1; > + } > +} > + > +/* Wait for the daemon to send an ACK > + * > + * Returns: > + * -1 error (timeout, peer closed, invalid ACK, ...) > + * 0 success > + */ > +static int progress_ipc_wait_for_ack(int fd) > +{ > + struct progress_connect_ack ack; > + int err = progress_ipc_recv_ack(fd, &ack); > + if (err) { > + return -1; > + } > + if (! progress_is_major_version_compatible(ack.apiversion)) { > + fprintf(stderr, "progress_ipc_wait_for_ack: recv incompatible > version: %x\n", > + ack.apiversion); > + return -1; > + } > + if (0 != strcmp(ack.magic, PROGRESS_CONNECT_ACK_MAGIC)) { > + fprintf(stderr, "progress_ipc_wait_for_ack: recv invalid magic: > %s (expected %s)\n", > + ack.magic, PROGRESS_CONNECT_ACK_MAGIC); > + return -1; > + } > + return 0; > +} > + > static int _progress_ipc_connect(const char *socketpath, bool reconnect) > { > struct sockaddr_un servaddr; > int fd = socket(AF_LOCAL, SOCK_STREAM, 0); > + int ret; > bzero(&servaddr, sizeof(servaddr)); > servaddr.sun_family = AF_LOCAL; > strncpy(servaddr.sun_path, socketpath, sizeof(servaddr.sun_path) - > 1); > @@ -67,6 +178,13 @@ static int _progress_ipc_connect(const char > *socketpath, bool reconnect) > usleep(10000); > } while (true); > > + /* Connected. Wait for ACK */ > + ret = progress_ipc_wait_for_ack(fd); > + if (ret == -1) { > + close(fd); > + return -1; > + } > + > return fd; > } > I cannot apply them again, Applying patch #1996122 using "git am" Description: [1/3] progress: acknowledge client registration Applying: progress: acknowledge client registration error: patch failed: core/progress_thread.c:253 error: core/progress_thread.c: patch does not apply error: patch failed: include/progress_ipc.h:38 error: include/progress_ipc.h: patch does not apply error: patch failed: ipc/progress_ipc.c:8 error: ipc/progress_ipc.c: patch does not apply Patch failed at 0001 progress: acknowledge client registration
diff --git a/core/progress_thread.c b/core/progress_thread.c index e17d931c..8d700db6 100644 --- a/core/progress_thread.c +++ b/core/progress_thread.c @@ -253,6 +253,22 @@ void swupdate_progress_done(const char *info) pthread_mutex_unlock(&pprog->lock); } +static void progress_send_connect_ack(int connfd) +{ + int n; + struct progress_connect_ack ack; + memset(&ack, 0, sizeof(ack)); + ack.apiversion = PROGRESS_API_VERSION; + strncpy(ack.magic, PROGRESS_CONNECT_ACK_MAGIC, strlen(PROGRESS_CONNECT_ACK_MAGIC)); + do { + n = send(connfd, &ack, sizeof(ack), MSG_NOSIGNAL | MSG_DONTWAIT); + } while (n == -1 && errno == EINTR); + + if (n != sizeof(ack)) { + fprintf(stderr, "progress_send_connect_ack error: n=%d, errno=%d\n", n, errno); + } +} + void *progress_bar_thread (void __attribute__ ((__unused__)) *data) { int listen, connfd; @@ -297,6 +313,8 @@ void *progress_bar_thread (void __attribute__ ((__unused__)) *data) conn->sockfd = connfd; pthread_mutex_lock(&pprog->lock); SIMPLEQ_INSERT_TAIL(&pprog->conns, conn, next); + /* Send an ACK to the client to indicate that it is duly registered */ + progress_send_connect_ack(connfd); pthread_mutex_unlock(&pprog->lock); } while(1); } diff --git a/include/progress_ipc.h b/include/progress_ipc.h index f0b4b816..e79d476d 100644 --- a/include/progress_ipc.h +++ b/include/progress_ipc.h @@ -38,13 +38,18 @@ extern char* SOCKET_PROGRESS_PATH; * - changes in major mean an incompatibility and clients do not work anymore */ -#define PROGRESS_API_MAJOR 1 +#define PROGRESS_API_MAJOR 2 #define PROGRESS_API_MINOR 0 #define PROGRESS_API_PATCH 0 #define PROGRESS_API_VERSION ((PROGRESS_API_MAJOR & 0xFFFF) << 16 | \ (PROGRESS_API_MINOR & 0xFF) << 8 | \ (PROGRESS_API_PATCH & 0xFF)) + +inline int progress_is_major_version_compatible(int other_version) +{ + return PROGRESS_API_MAJOR == ((other_version >> 16) & 0xFFFF); +} /* * Message sent via progress socket. * Data is sent in LE if required. @@ -64,6 +69,12 @@ struct progress_msg { char info[PRINFOSIZE]; /* additional information about install */ }; +#define PROGRESS_CONNECT_ACK_MAGIC "ACK" +struct progress_connect_ack { + unsigned int apiversion; /* API Version for compatibility check */ + char magic[4]; /* null-terminated string */ +}; + char *get_prog_socket(void); /* Standard function to connect to progress interface */ diff --git a/ipc/progress_ipc.c b/ipc/progress_ipc.c index c5ac9b95..dc8b4db3 100644 --- a/ipc/progress_ipc.c +++ b/ipc/progress_ipc.c @@ -8,12 +8,13 @@ #include <sys/socket.h> #include <sys/un.h> #include <errno.h> +#include <poll.h> #include <string.h> #include <stdio.h> #include <stdlib.h> +#include <time.h> #include <unistd.h> #include <stdbool.h> - #include <progress_ipc.h> #ifdef CONFIG_SOCKET_PROGRESS_PATH @@ -40,10 +41,120 @@ char *get_prog_socket(void) { return SOCKET_PROGRESS_PATH; } +/* Decrease timeout depending on elapsed time */ +static int update_timeout(const struct timespec *initial_time, int *timeout_ms) +{ + struct timespec current_time; + int diff_timeout_ms; + struct timespec elapsed; + int err = clock_gettime(CLOCK_MONOTONIC, ¤t_time); + if (err) { + perror("update_timeout: clock_gettime"); + return -1; + } + elapsed.tv_sec = current_time.tv_sec - initial_time->tv_sec; + elapsed.tv_nsec = current_time.tv_nsec - initial_time->tv_nsec; + + diff_timeout_ms = *timeout_ms - (elapsed.tv_sec*1000 + elapsed.tv_nsec/1E6); + + *timeout_ms = diff_timeout_ms; + return 0; +} + +static int progress_ipc_recv_ack(int fd, struct progress_connect_ack *ack) +{ + struct timespec initial_time; + int err; + int timeout_ms = 5000; /* 5 s should be enough in most cases as the socket is local */ + err = clock_gettime(CLOCK_MONOTONIC, &initial_time); + if (err) { + perror("progress_ipc_recv_ack: clock_gettime"); + return -1; + } + + unsigned int size_to_read = sizeof(struct progress_connect_ack); + unsigned int offset = 0; + + while (size_to_read > 0) { + int err_poll; + struct pollfd pfds[1]; + pfds[0].fd = fd; + pfds[0].events = POLLIN; + do { + err_poll = poll(pfds, 1, timeout_ms); + int err_update_timeout = update_timeout(&initial_time, &timeout_ms); + if (err_update_timeout) return -1; + } while (err_poll < 0 && errno == EINTR); + + if (err_poll == -1) { + fprintf(stderr, "progress_ipc_recv_ack: poll error\n"); + break; + } else if (err_poll == 0) { + /* Timeout */ + fprintf(stderr, "progress_ipc_recv_ack error: timeout\n"); + break; + } else if (pfds[0].revents & POLLHUP) { + /* The peer closed its end of the channel */ + /* (note that some operating systems also set POLLIN in this case) */ + fprintf(stderr, "progress_ipc_recv_ack error: peer closed\n"); + break; + } else if (pfds[0].revents & POLLIN) { + /* There is a message to read */ + int n = read(fd, (void*)ack + offset, size_to_read); + if (n == -1 && errno == EINTR) { + continue; /* redo poll() and timeout management */ + } else if (n <= 0) { + /* read error, as at least 1 byte should be pending */ + fprintf(stderr, "progress_ipc_recv_ack: read error: n=%d, errno=%d\n", n, errno); + break; + } + size_to_read -= n; + offset += n; + + } else { + /* unexpected error */ + fprintf(stderr, "progress_ipc_recv_ack: poll returned %d with revents=0x%x\n", err_poll, pfds[0].revents); + } + } + if (size_to_read == 0) { + /* The expected number of bytes was received */ + return 0; + } else { + return -1; + } +} + +/* Wait for the daemon to send an ACK + * + * Returns: + * -1 error (timeout, peer closed, invalid ACK, ...) + * 0 success + */ +static int progress_ipc_wait_for_ack(int fd) +{ + struct progress_connect_ack ack; + int err = progress_ipc_recv_ack(fd, &ack); + if (err) { + return -1; + } + if (! progress_is_major_version_compatible(ack.apiversion)) { + fprintf(stderr, "progress_ipc_wait_for_ack: recv incompatible version: %x\n", + ack.apiversion); + return -1; + } + if (0 != strcmp(ack.magic, PROGRESS_CONNECT_ACK_MAGIC)) { + fprintf(stderr, "progress_ipc_wait_for_ack: recv invalid magic: %s (expected %s)\n", + ack.magic, PROGRESS_CONNECT_ACK_MAGIC); + return -1; + } + return 0; +} + static int _progress_ipc_connect(const char *socketpath, bool reconnect) { struct sockaddr_un servaddr; int fd = socket(AF_LOCAL, SOCK_STREAM, 0); + int ret; bzero(&servaddr, sizeof(servaddr)); servaddr.sun_family = AF_LOCAL; strncpy(servaddr.sun_path, socketpath, sizeof(servaddr.sun_path) - 1); @@ -67,6 +178,13 @@ static int _progress_ipc_connect(const char *socketpath, bool reconnect) usleep(10000); } while (true); + /* Connected. Wait for ACK */ + ret = progress_ipc_wait_for_ack(fd); + if (ret == -1) { + close(fd); + return -1; + } + return fd; }