diff mbox

[RFC,v2,06/10] net/colo-proxy: add socket used by forward func

Message ID 1450780978-19123-7-git-send-email-zhangchen.fnst@cn.fujitsu.com
State New
Headers show

Commit Message

Zhang Chen Dec. 22, 2015, 10:42 a.m. UTC
From: zhangchen <zhangchen.fnst@cn.fujitsu.com>

Colo need to forward packets
we start socket server in secondary and primary
connect to secondary in startup
the packet recv by primary forward to secondary
the packet send by secondary forward to primary

Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com>
---
 net/colo-proxy.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 114 insertions(+)

Comments

Dr. David Alan Gilbert Feb. 19, 2016, 8:01 p.m. UTC | #1
* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> From: zhangchen <zhangchen.fnst@cn.fujitsu.com>
> 
> Colo need to forward packets
> we start socket server in secondary and primary
> connect to secondary in startup
> the packet recv by primary forward to secondary
> the packet send by secondary forward to primary
> 
> Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com>

I found one problem with the socket setup is that the
packets from the primary and secondary aren't tied to the
checkpoint they are part of; so for example a packet from the secondary
may reach the primary at the start of the next checkpoint, causing a
miscomparison.
I added a counter to discard old packets.

Dave

> ---
>  net/colo-proxy.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 114 insertions(+)
> 
> diff --git a/net/colo-proxy.c b/net/colo-proxy.c
> index ba2bbe7..2347bbf 100644
> --- a/net/colo-proxy.c
> +++ b/net/colo-proxy.c
> @@ -172,6 +172,69 @@ bool colo_proxy_query_checkpoint(void)
>      return colo_do_checkpoint;
>  }
>  
> +/*
> + * send a packet to peer
> + * >=0: success
> + * <0: fail
> + */
> +static ssize_t colo_proxy_sock_send(NetFilterState *nf,
> +                                         const struct iovec *iov,
> +                                         int iovcnt)
> +{
> +    COLOProxyState *s = FILTER_COLO_PROXY(nf);
> +    ssize_t ret = 0;
> +    ssize_t size = 0;
> +    struct iovec sizeiov = {
> +        .iov_base = &size,
> +        .iov_len = sizeof(size)
> +    };
> +    size = iov_size(iov, iovcnt);
> +    if (!size) {
> +        return 0;
> +    }
> +
> +    ret = iov_send(s->sockfd, &sizeiov, 1, 0, sizeof(size));
> +    if (ret < 0) {
> +        return ret;
> +    }
> +    ret = iov_send(s->sockfd, iov, iovcnt, 0, size);
> +    return ret;
> +}
> +
> +/*
> + * receive a packet from peer
> + * in primary: enqueue packet to secondary_list
> + * in secondary: pass packet to next
> + */
> +static void colo_proxy_sock_receive(void *opaque)
> +{
> +    NetFilterState *nf = opaque;
> +    COLOProxyState *s = FILTER_COLO_PROXY(nf);
> +    ssize_t len = 0;
> +    struct iovec sizeiov = {
> +        .iov_base = &len,
> +        .iov_len = sizeof(len)
> +    };
> +
> +    iov_recv(s->sockfd, &sizeiov, 1, 0, sizeof(len));
> +    if (len > 0 && len < NET_BUFSIZE) {
> +        char *buf = g_malloc0(len);
> +        struct iovec iov = {
> +            .iov_base = buf,
> +            .iov_len = len
> +        };
> +
> +        iov_recv(s->sockfd, &iov, 1, 0, len);
> +        if (s->colo_mode == COLO_MODE_PRIMARY) {
> +            colo_proxy_enqueue_secondary_packet(nf, buf, len);
> +            /* buf will be release when pakcet destroy */
> +        } else {
> +            qemu_net_queue_send(s->incoming_queue, nf->netdev,
> +                            0, (const uint8_t *)buf, len, NULL);
> +        }
> +    }
> +}
> +
>  static ssize_t colo_proxy_receive_iov(NetFilterState *nf,
>                                           NetClientState *sender,
>                                           unsigned flags,
> @@ -208,6 +271,57 @@ static void colo_proxy_cleanup(NetFilterState *nf)
>      qemu_event_destroy(&s->need_compare_ev);
>  }
>  
> +/* wait for peer connecting
> + * NOTE: this function will block the caller
> + * 0 on success, otherwise returns -1
> + */
> +static int colo_wait_incoming(COLOProxyState *s)
> +{
> +    struct sockaddr_in addr;
> +    socklen_t addrlen = sizeof(addr);
> +    int accept_sock, err;
> +    int fd = inet_listen(s->addr, NULL, 256, SOCK_STREAM, 0, NULL);
> +
> +    if (fd < 0) {
> +        error_report("colo proxy listen failed");
> +        return -1;
> +    }
> +
> +    do {
> +        accept_sock = qemu_accept(fd, (struct sockaddr *)&addr, &addrlen);
> +        err = socket_error();
> +    } while (accept_sock < 0 && err == EINTR);
> +    closesocket(fd);
> +
> +    if (accept_sock < 0) {
> +        error_report("colo proxy accept failed(%s)", strerror(err));
> +        return -1;
> +    }
> +    s->sockfd = accept_sock;
> +
> +    qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s);
> +
> +    return 0;
> +}
> +
> +/* try to connect listening server
> + * 0 on success, otherwise something wrong
> + */
> +static ssize_t colo_proxy_connect(COLOProxyState *s)
> +{
> +    int sock;
> +    sock = inet_connect(s->addr, NULL);
> +
> +    if (sock < 0) {
> +        error_report("colo proxy inet_connect failed");
> +        return -1;
> +    }
> +    s->sockfd = sock;
> +    qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s);
> +
> +    return 0;
> +}
> +
>  static void colo_proxy_notify_checkpoint(void)
>  {
>      trace_colo_proxy("colo_proxy_notify_checkpoint");
> -- 
> 1.9.1
> 
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Zhang Chen Feb. 22, 2016, 5:51 a.m. UTC | #2
On 02/20/2016 04:01 AM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> From: zhangchen <zhangchen.fnst@cn.fujitsu.com>
>>
>> Colo need to forward packets
>> we start socket server in secondary and primary
>> connect to secondary in startup
>> the packet recv by primary forward to secondary
>> the packet send by secondary forward to primary
>>
>> Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com>
> I found one problem with the socket setup is that the
> packets from the primary and secondary aren't tied to the
> checkpoint they are part of; so for example a packet from the secondary
> may reach the primary at the start of the next checkpoint, causing a
> miscomparison.
> I added a counter to discard old packets.
>
> Dave

I will fix it in colo-compare module.

Thanks
zhangchen

>
>> ---
>>   net/colo-proxy.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   1 file changed, 114 insertions(+)
>>
>> diff --git a/net/colo-proxy.c b/net/colo-proxy.c
>> index ba2bbe7..2347bbf 100644
>> --- a/net/colo-proxy.c
>> +++ b/net/colo-proxy.c
>> @@ -172,6 +172,69 @@ bool colo_proxy_query_checkpoint(void)
>>       return colo_do_checkpoint;
>>   }
>>   
>> +/*
>> + * send a packet to peer
>> + * >=0: success
>> + * <0: fail
>> + */
>> +static ssize_t colo_proxy_sock_send(NetFilterState *nf,
>> +                                         const struct iovec *iov,
>> +                                         int iovcnt)
>> +{
>> +    COLOProxyState *s = FILTER_COLO_PROXY(nf);
>> +    ssize_t ret = 0;
>> +    ssize_t size = 0;
>> +    struct iovec sizeiov = {
>> +        .iov_base = &size,
>> +        .iov_len = sizeof(size)
>> +    };
>> +    size = iov_size(iov, iovcnt);
>> +    if (!size) {
>> +        return 0;
>> +    }
>> +
>> +    ret = iov_send(s->sockfd, &sizeiov, 1, 0, sizeof(size));
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +    ret = iov_send(s->sockfd, iov, iovcnt, 0, size);
>> +    return ret;
>> +}
>> +
>> +/*
>> + * receive a packet from peer
>> + * in primary: enqueue packet to secondary_list
>> + * in secondary: pass packet to next
>> + */
>> +static void colo_proxy_sock_receive(void *opaque)
>> +{
>> +    NetFilterState *nf = opaque;
>> +    COLOProxyState *s = FILTER_COLO_PROXY(nf);
>> +    ssize_t len = 0;
>> +    struct iovec sizeiov = {
>> +        .iov_base = &len,
>> +        .iov_len = sizeof(len)
>> +    };
>> +
>> +    iov_recv(s->sockfd, &sizeiov, 1, 0, sizeof(len));
>> +    if (len > 0 && len < NET_BUFSIZE) {
>> +        char *buf = g_malloc0(len);
>> +        struct iovec iov = {
>> +            .iov_base = buf,
>> +            .iov_len = len
>> +        };
>> +
>> +        iov_recv(s->sockfd, &iov, 1, 0, len);
>> +        if (s->colo_mode == COLO_MODE_PRIMARY) {
>> +            colo_proxy_enqueue_secondary_packet(nf, buf, len);
>> +            /* buf will be release when pakcet destroy */
>> +        } else {
>> +            qemu_net_queue_send(s->incoming_queue, nf->netdev,
>> +                            0, (const uint8_t *)buf, len, NULL);
>> +        }
>> +    }
>> +}
>> +
>>   static ssize_t colo_proxy_receive_iov(NetFilterState *nf,
>>                                            NetClientState *sender,
>>                                            unsigned flags,
>> @@ -208,6 +271,57 @@ static void colo_proxy_cleanup(NetFilterState *nf)
>>       qemu_event_destroy(&s->need_compare_ev);
>>   }
>>   
>> +/* wait for peer connecting
>> + * NOTE: this function will block the caller
>> + * 0 on success, otherwise returns -1
>> + */
>> +static int colo_wait_incoming(COLOProxyState *s)
>> +{
>> +    struct sockaddr_in addr;
>> +    socklen_t addrlen = sizeof(addr);
>> +    int accept_sock, err;
>> +    int fd = inet_listen(s->addr, NULL, 256, SOCK_STREAM, 0, NULL);
>> +
>> +    if (fd < 0) {
>> +        error_report("colo proxy listen failed");
>> +        return -1;
>> +    }
>> +
>> +    do {
>> +        accept_sock = qemu_accept(fd, (struct sockaddr *)&addr, &addrlen);
>> +        err = socket_error();
>> +    } while (accept_sock < 0 && err == EINTR);
>> +    closesocket(fd);
>> +
>> +    if (accept_sock < 0) {
>> +        error_report("colo proxy accept failed(%s)", strerror(err));
>> +        return -1;
>> +    }
>> +    s->sockfd = accept_sock;
>> +
>> +    qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s);
>> +
>> +    return 0;
>> +}
>> +
>> +/* try to connect listening server
>> + * 0 on success, otherwise something wrong
>> + */
>> +static ssize_t colo_proxy_connect(COLOProxyState *s)
>> +{
>> +    int sock;
>> +    sock = inet_connect(s->addr, NULL);
>> +
>> +    if (sock < 0) {
>> +        error_report("colo proxy inet_connect failed");
>> +        return -1;
>> +    }
>> +    s->sockfd = sock;
>> +    qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s);
>> +
>> +    return 0;
>> +}
>> +
>>   static void colo_proxy_notify_checkpoint(void)
>>   {
>>       trace_colo_proxy("colo_proxy_notify_checkpoint");
>> -- 
>> 1.9.1
>>
>>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>
diff mbox

Patch

diff --git a/net/colo-proxy.c b/net/colo-proxy.c
index ba2bbe7..2347bbf 100644
--- a/net/colo-proxy.c
+++ b/net/colo-proxy.c
@@ -172,6 +172,69 @@  bool colo_proxy_query_checkpoint(void)
     return colo_do_checkpoint;
 }
 
+/*
+ * send a packet to peer
+ * >=0: success
+ * <0: fail
+ */
+static ssize_t colo_proxy_sock_send(NetFilterState *nf,
+                                         const struct iovec *iov,
+                                         int iovcnt)
+{
+    COLOProxyState *s = FILTER_COLO_PROXY(nf);
+    ssize_t ret = 0;
+    ssize_t size = 0;
+    struct iovec sizeiov = {
+        .iov_base = &size,
+        .iov_len = sizeof(size)
+    };
+    size = iov_size(iov, iovcnt);
+    if (!size) {
+        return 0;
+    }
+
+    ret = iov_send(s->sockfd, &sizeiov, 1, 0, sizeof(size));
+    if (ret < 0) {
+        return ret;
+    }
+    ret = iov_send(s->sockfd, iov, iovcnt, 0, size);
+    return ret;
+}
+
+/*
+ * receive a packet from peer
+ * in primary: enqueue packet to secondary_list
+ * in secondary: pass packet to next
+ */
+static void colo_proxy_sock_receive(void *opaque)
+{
+    NetFilterState *nf = opaque;
+    COLOProxyState *s = FILTER_COLO_PROXY(nf);
+    ssize_t len = 0;
+    struct iovec sizeiov = {
+        .iov_base = &len,
+        .iov_len = sizeof(len)
+    };
+
+    iov_recv(s->sockfd, &sizeiov, 1, 0, sizeof(len));
+    if (len > 0 && len < NET_BUFSIZE) {
+        char *buf = g_malloc0(len);
+        struct iovec iov = {
+            .iov_base = buf,
+            .iov_len = len
+        };
+
+        iov_recv(s->sockfd, &iov, 1, 0, len);
+        if (s->colo_mode == COLO_MODE_PRIMARY) {
+            colo_proxy_enqueue_secondary_packet(nf, buf, len);
+            /* buf will be release when pakcet destroy */
+        } else {
+            qemu_net_queue_send(s->incoming_queue, nf->netdev,
+                            0, (const uint8_t *)buf, len, NULL);
+        }
+    }
+}
+
 static ssize_t colo_proxy_receive_iov(NetFilterState *nf,
                                          NetClientState *sender,
                                          unsigned flags,
@@ -208,6 +271,57 @@  static void colo_proxy_cleanup(NetFilterState *nf)
     qemu_event_destroy(&s->need_compare_ev);
 }
 
+/* wait for peer connecting
+ * NOTE: this function will block the caller
+ * 0 on success, otherwise returns -1
+ */
+static int colo_wait_incoming(COLOProxyState *s)
+{
+    struct sockaddr_in addr;
+    socklen_t addrlen = sizeof(addr);
+    int accept_sock, err;
+    int fd = inet_listen(s->addr, NULL, 256, SOCK_STREAM, 0, NULL);
+
+    if (fd < 0) {
+        error_report("colo proxy listen failed");
+        return -1;
+    }
+
+    do {
+        accept_sock = qemu_accept(fd, (struct sockaddr *)&addr, &addrlen);
+        err = socket_error();
+    } while (accept_sock < 0 && err == EINTR);
+    closesocket(fd);
+
+    if (accept_sock < 0) {
+        error_report("colo proxy accept failed(%s)", strerror(err));
+        return -1;
+    }
+    s->sockfd = accept_sock;
+
+    qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s);
+
+    return 0;
+}
+
+/* try to connect listening server
+ * 0 on success, otherwise something wrong
+ */
+static ssize_t colo_proxy_connect(COLOProxyState *s)
+{
+    int sock;
+    sock = inet_connect(s->addr, NULL);
+
+    if (sock < 0) {
+        error_report("colo proxy inet_connect failed");
+        return -1;
+    }
+    s->sockfd = sock;
+    qemu_set_fd_handler(s->sockfd, colo_proxy_sock_receive, NULL, (void *)s);
+
+    return 0;
+}
+
 static void colo_proxy_notify_checkpoint(void)
 {
     trace_colo_proxy("colo_proxy_notify_checkpoint");