Message ID | 1450780978-19123-7-git-send-email-zhangchen.fnst@cn.fujitsu.com |
---|---|
State | New |
Headers | show |
* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote: > From: zhangchen <zhangchen.fnst@cn.fujitsu.com> > > Colo need to forward packets > we start socket server in secondary and primary > connect to secondary in startup > the packet recv by primary forward to secondary > the packet send by secondary forward to primary > > Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com> > Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com> I found one problem with the socket setup is that the packets from the primary and secondary aren't tied to the checkpoint they are part of; so for example a packet from the secondary may reach the primary at the start of the next checkpoint, causing a miscomparison. I added a counter to discard old packets. Dave > --- > net/colo-proxy.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ > 1 file changed, 114 insertions(+) > > diff --git a/net/colo-proxy.c b/net/colo-proxy.c > index ba2bbe7..2347bbf 100644 > --- a/net/colo-proxy.c > +++ b/net/colo-proxy.c > @@ -172,6 +172,69 @@ bool colo_proxy_query_checkpoint(void) > return colo_do_checkpoint; > } > > +/* > + * send a packet to peer > + * >=0: success > + * <0: fail > + */ > +static ssize_t colo_proxy_sock_send(NetFilterState *nf, > + const struct iovec *iov, > + int iovcnt) > +{ > + COLOProxyState *s = FILTER_COLO_PROXY(nf); > + ssize_t ret = 0; > + ssize_t size = 0; > + struct iovec sizeiov = { > + .iov_base = &size, > + .iov_len = sizeof(size) > + }; > + size = iov_size(iov, iovcnt); > + if (!size) { > + return 0; > + } > + > + ret = iov_send(s->sockfd, &sizeiov, 1, 0, sizeof(size)); > + if (ret < 0) { > + return ret; > + } > + ret = iov_send(s->sockfd, iov, iovcnt, 0, size); > + return ret; > +} > + > +/* > + * receive a packet from peer > + * in primary: enqueue packet to secondary_list > + * in secondary: pass packet to next > + */ > +static void colo_proxy_sock_receive(void *opaque) > +{ > + NetFilterState *nf = opaque; > + COLOProxyState *s = FILTER_COLO_PROXY(nf); > + ssize_t len = 0; > + struct iovec sizeiov = { > + .iov_base = &len, > + .iov_len = sizeof(len) > + }; > + > + iov_recv(s->sockfd, &sizeiov, 1, 0, sizeof(len)); > + if (len > 0 && len < NET_BUFSIZE) { > + char *buf = g_malloc0(len); > + struct iovec iov = { > + .iov_base = buf, > + .iov_len = len > + }; > + > + iov_recv(s->sockfd, &iov, 1, 0, len); > + if (s->colo_mode == COLO_MODE_PRIMARY) { > + colo_proxy_enqueue_secondary_packet(nf, buf, len); > + /* buf will be release when pakcet destroy */ > + } else { > + qemu_net_queue_send(s->incoming_queue, nf->netdev, > + 0, (const uint8_t *)buf, len, NULL); > + } > + } > +} > + > static ssize_t colo_proxy_receive_iov(NetFilterState *nf, > NetClientState *sender, > unsigned flags, > @@ -208,6 +271,57 @@ static void colo_proxy_cleanup(NetFilterState *nf) > qemu_event_destroy(&s->need_compare_ev); > } > > +/* wait for peer connecting > + * NOTE: this function will block the caller > + * 0 on success, otherwise returns -1 > + */ > +static int colo_wait_incoming(COLOProxyState *s) > +{ > + struct sockaddr_in addr; > + socklen_t addrlen = sizeof(addr); > + int accept_sock, err; > + int fd = inet_listen(s->addr, NULL, 256, SOCK_STREAM, 0, NULL); > + > + if (fd < 0) { > + error_report("colo proxy listen failed"); > + return -1; > + } > + > + do { > + accept_sock = qemu_accept(fd, (struct sockaddr *)&addr, &addrlen); > + err = socket_error(); > + } while (accept_sock < 0 && err == EINTR); > + closesocket(fd); > + > + if (accept_sock < 0) { > + error_report("colo proxy accept failed(%s)", strerror(err)); > + return -1; > + } > + s->sockfd = accept_sock; > + > + qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s); > + > + return 0; > +} > + > +/* try to connect listening server > + * 0 on success, otherwise something wrong > + */ > +static ssize_t colo_proxy_connect(COLOProxyState *s) > +{ > + int sock; > + sock = inet_connect(s->addr, NULL); > + > + if (sock < 0) { > + error_report("colo proxy inet_connect failed"); > + return -1; > + } > + s->sockfd = sock; > + qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s); > + > + return 0; > +} > + > static void colo_proxy_notify_checkpoint(void) > { > trace_colo_proxy("colo_proxy_notify_checkpoint"); > -- > 1.9.1 > > > > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On 02/20/2016 04:01 AM, Dr. David Alan Gilbert wrote: > * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote: >> From: zhangchen <zhangchen.fnst@cn.fujitsu.com> >> >> Colo need to forward packets >> we start socket server in secondary and primary >> connect to secondary in startup >> the packet recv by primary forward to secondary >> the packet send by secondary forward to primary >> >> Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com> >> Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com> > I found one problem with the socket setup is that the > packets from the primary and secondary aren't tied to the > checkpoint they are part of; so for example a packet from the secondary > may reach the primary at the start of the next checkpoint, causing a > miscomparison. > I added a counter to discard old packets. > > Dave I will fix it in colo-compare module. Thanks zhangchen > >> --- >> net/colo-proxy.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ >> 1 file changed, 114 insertions(+) >> >> diff --git a/net/colo-proxy.c b/net/colo-proxy.c >> index ba2bbe7..2347bbf 100644 >> --- a/net/colo-proxy.c >> +++ b/net/colo-proxy.c >> @@ -172,6 +172,69 @@ bool colo_proxy_query_checkpoint(void) >> return colo_do_checkpoint; >> } >> >> +/* >> + * send a packet to peer >> + * >=0: success >> + * <0: fail >> + */ >> +static ssize_t colo_proxy_sock_send(NetFilterState *nf, >> + const struct iovec *iov, >> + int iovcnt) >> +{ >> + COLOProxyState *s = FILTER_COLO_PROXY(nf); >> + ssize_t ret = 0; >> + ssize_t size = 0; >> + struct iovec sizeiov = { >> + .iov_base = &size, >> + .iov_len = sizeof(size) >> + }; >> + size = iov_size(iov, iovcnt); >> + if (!size) { >> + return 0; >> + } >> + >> + ret = iov_send(s->sockfd, &sizeiov, 1, 0, sizeof(size)); >> + if (ret < 0) { >> + return ret; >> + } >> + ret = iov_send(s->sockfd, iov, iovcnt, 0, size); >> + return ret; >> +} >> + >> +/* >> + * receive a packet from peer >> + * in primary: enqueue packet to secondary_list >> + * in secondary: pass packet to next >> + */ >> +static void colo_proxy_sock_receive(void *opaque) >> +{ >> + NetFilterState *nf = opaque; >> + COLOProxyState *s = FILTER_COLO_PROXY(nf); >> + ssize_t len = 0; >> + struct iovec sizeiov = { >> + .iov_base = &len, >> + .iov_len = sizeof(len) >> + }; >> + >> + iov_recv(s->sockfd, &sizeiov, 1, 0, sizeof(len)); >> + if (len > 0 && len < NET_BUFSIZE) { >> + char *buf = g_malloc0(len); >> + struct iovec iov = { >> + .iov_base = buf, >> + .iov_len = len >> + }; >> + >> + iov_recv(s->sockfd, &iov, 1, 0, len); >> + if (s->colo_mode == COLO_MODE_PRIMARY) { >> + colo_proxy_enqueue_secondary_packet(nf, buf, len); >> + /* buf will be release when pakcet destroy */ >> + } else { >> + qemu_net_queue_send(s->incoming_queue, nf->netdev, >> + 0, (const uint8_t *)buf, len, NULL); >> + } >> + } >> +} >> + >> static ssize_t colo_proxy_receive_iov(NetFilterState *nf, >> NetClientState *sender, >> unsigned flags, >> @@ -208,6 +271,57 @@ static void colo_proxy_cleanup(NetFilterState *nf) >> qemu_event_destroy(&s->need_compare_ev); >> } >> >> +/* wait for peer connecting >> + * NOTE: this function will block the caller >> + * 0 on success, otherwise returns -1 >> + */ >> +static int colo_wait_incoming(COLOProxyState *s) >> +{ >> + struct sockaddr_in addr; >> + socklen_t addrlen = sizeof(addr); >> + int accept_sock, err; >> + int fd = inet_listen(s->addr, NULL, 256, SOCK_STREAM, 0, NULL); >> + >> + if (fd < 0) { >> + error_report("colo proxy listen failed"); >> + return -1; >> + } >> + >> + do { >> + accept_sock = qemu_accept(fd, (struct sockaddr *)&addr, &addrlen); >> + err = socket_error(); >> + } while (accept_sock < 0 && err == EINTR); >> + closesocket(fd); >> + >> + if (accept_sock < 0) { >> + error_report("colo proxy accept failed(%s)", strerror(err)); >> + return -1; >> + } >> + s->sockfd = accept_sock; >> + >> + qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s); >> + >> + return 0; >> +} >> + >> +/* try to connect listening server >> + * 0 on success, otherwise something wrong >> + */ >> +static ssize_t colo_proxy_connect(COLOProxyState *s) >> +{ >> + int sock; >> + sock = inet_connect(s->addr, NULL); >> + >> + if (sock < 0) { >> + error_report("colo proxy inet_connect failed"); >> + return -1; >> + } >> + s->sockfd = sock; >> + qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s); >> + >> + return 0; >> +} >> + >> static void colo_proxy_notify_checkpoint(void) >> { >> trace_colo_proxy("colo_proxy_notify_checkpoint"); >> -- >> 1.9.1 >> >> >> >> > -- > Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK > > > . >
diff --git a/net/colo-proxy.c b/net/colo-proxy.c index ba2bbe7..2347bbf 100644 --- a/net/colo-proxy.c +++ b/net/colo-proxy.c @@ -172,6 +172,69 @@ bool colo_proxy_query_checkpoint(void) return colo_do_checkpoint; } +/* + * send a packet to peer + * >=0: success + * <0: fail + */ +static ssize_t colo_proxy_sock_send(NetFilterState *nf, + const struct iovec *iov, + int iovcnt) +{ + COLOProxyState *s = FILTER_COLO_PROXY(nf); + ssize_t ret = 0; + ssize_t size = 0; + struct iovec sizeiov = { + .iov_base = &size, + .iov_len = sizeof(size) + }; + size = iov_size(iov, iovcnt); + if (!size) { + return 0; + } + + ret = iov_send(s->sockfd, &sizeiov, 1, 0, sizeof(size)); + if (ret < 0) { + return ret; + } + ret = iov_send(s->sockfd, iov, iovcnt, 0, size); + return ret; +} + +/* + * receive a packet from peer + * in primary: enqueue packet to secondary_list + * in secondary: pass packet to next + */ +static void colo_proxy_sock_receive(void *opaque) +{ + NetFilterState *nf = opaque; + COLOProxyState *s = FILTER_COLO_PROXY(nf); + ssize_t len = 0; + struct iovec sizeiov = { + .iov_base = &len, + .iov_len = sizeof(len) + }; + + iov_recv(s->sockfd, &sizeiov, 1, 0, sizeof(len)); + if (len > 0 && len < NET_BUFSIZE) { + char *buf = g_malloc0(len); + struct iovec iov = { + .iov_base = buf, + .iov_len = len + }; + + iov_recv(s->sockfd, &iov, 1, 0, len); + if (s->colo_mode == COLO_MODE_PRIMARY) { + colo_proxy_enqueue_secondary_packet(nf, buf, len); + /* buf will be release when pakcet destroy */ + } else { + qemu_net_queue_send(s->incoming_queue, nf->netdev, + 0, (const uint8_t *)buf, len, NULL); + } + } +} + static ssize_t colo_proxy_receive_iov(NetFilterState *nf, NetClientState *sender, unsigned flags, @@ -208,6 +271,57 @@ static void colo_proxy_cleanup(NetFilterState *nf) qemu_event_destroy(&s->need_compare_ev); } +/* wait for peer connecting + * NOTE: this function will block the caller + * 0 on success, otherwise returns -1 + */ +static int colo_wait_incoming(COLOProxyState *s) +{ + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + int accept_sock, err; + int fd = inet_listen(s->addr, NULL, 256, SOCK_STREAM, 0, NULL); + + if (fd < 0) { + error_report("colo proxy listen failed"); + return -1; + } + + do { + accept_sock = qemu_accept(fd, (struct sockaddr *)&addr, &addrlen); + err = socket_error(); + } while (accept_sock < 0 && err == EINTR); + closesocket(fd); + + if (accept_sock < 0) { + error_report("colo proxy accept failed(%s)", strerror(err)); + return -1; + } + s->sockfd = accept_sock; + + qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s); + + return 0; +} + +/* try to connect listening server + * 0 on success, otherwise something wrong + */ +static ssize_t colo_proxy_connect(COLOProxyState *s) +{ + int sock; + sock = inet_connect(s->addr, NULL); + + if (sock < 0) { + error_report("colo proxy inet_connect failed"); + return -1; + } + s->sockfd = sock; + qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s); + + return 0; +} + static void colo_proxy_notify_checkpoint(void) { trace_colo_proxy("colo_proxy_notify_checkpoint");