diff mbox

[RFC,7/9] net/colo-proxy: add packet enqueue and handle function

Message ID 1448627251-11186-8-git-send-email-zhangchen.fnst@cn.fujitsu.com
State New
Headers show

Commit Message

Zhang Chen Nov. 27, 2015, 12:27 p.m. UTC
From: zhangchen <zhangchen.fnst@cn.fujitsu.com>

Add common packet handle function and enqueue
packet distinguished connection,then we can
lookup one connection packet to compare

Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com>
---
 net/colo-proxy.c | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 166 insertions(+), 1 deletion(-)

Comments

Dr. David Alan Gilbert Dec. 1, 2015, 4:12 p.m. UTC | #1
* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> From: zhangchen <zhangchen.fnst@cn.fujitsu.com>
> 
> Add common packet handle function and enqueue
> packet distinguished connection,then we can
> lookup one connection packet to compare
> 
> Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com>
> ---
>  net/colo-proxy.c | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 166 insertions(+), 1 deletion(-)
> 
> diff --git a/net/colo-proxy.c b/net/colo-proxy.c
> index 08a852f..a664e6d 100644
> --- a/net/colo-proxy.c
> +++ b/net/colo-proxy.c
> @@ -24,6 +24,170 @@
>  
>  static char *mode;
>  static bool colo_do_checkpoint;
> +static void packet_destroy(void *opaque, void *user_data);
> +
> +static uint32_t connection_key_hash(const void *opaque)
> +{
> +    const Connection_key *key = opaque;
> +    uint32_t a, b, c;
> +
> +    /* Jenkins hash */
> +    a = b = c = JHASH_INITVAL + sizeof(*key);
> +    a += key->src;
> +    b += key->dst;
> +    c += key->ports;
> +    __jhash_mix(a, b, c);
> +
> +    a += key->ip_proto;
> +    __jhash_final(a, b, c);
> +
> +    return c;
> +}
> +
> +static int connection_key_equal(const void *opaque1, const void *opaque2)
> +{
> +    return memcmp(opaque1, opaque2, sizeof(Connection_key)) == 0;
> +}
> +
> +static void connection_destroy(void *opaque)
> +{
> +    Connection *connection = opaque;
> +    g_queue_foreach(&connection->primary_list, packet_destroy, NULL);
> +    g_queue_free(&connection->primary_list);
> +    g_queue_foreach(&connection->secondary_list, packet_destroy, NULL);
> +    g_queue_free(&connection->secondary_list);
> +    g_slice_free(Connection, connection);
> +}
> +
> +static Connection *connection_new(void)
> +{
> +    Connection *connection = g_slice_new(Connection);
> +
> +    g_queue_init(&connection->primary_list);
> +    g_queue_init(&connection->secondary_list);
> +    connection->processing = false;
> +
> +    return connection;
> +}
> +
> +/* Return 0 on success, or return -1 if the pkt is corrpted */
> +static int parse_packet_early(Packet *pkt, Connection_key *key)
> +{
> +    int network_length;
> +    uint8_t *data = pkt->data;
> +
> +    pkt->network_layer = data + ETH_HLEN;
> +    if (ntohs(*(uint16_t *)(data + 12)) != ETH_P_IP) {
> +        if (ntohs(*(uint16_t *)(data + 12)) == ETH_P_ARP) {
> +            return -1;
> +        }
> +        return 0;
> +    }

Can you use some of the functions/macros in include/net/eth.h to
make this easier? Maybe eth_get_l3_proto ?
Do you plan to do IPv6 at some point?

> +    network_length = pkt->ip->ip_hl * 4;
> +    pkt->transport_layer = pkt->network_layer + network_length;
> +    key->ip_proto = pkt->ip->ip_p;
> +    key->src = pkt->ip->ip_src;
> +    key->dst = pkt->ip->ip_dst;
> +
> +    switch (key->ip_proto) {
> +    case IPPROTO_TCP:
> +    case IPPROTO_UDP:
> +    case IPPROTO_DCCP:
> +    case IPPROTO_ESP:
> +    case IPPROTO_SCTP:
> +    case IPPROTO_UDPLITE:
> +        key->ports = *(uint32_t *)(pkt->transport_layer);
> +        break;
> +    case IPPROTO_AH:
> +        key->ports = *(uint32_t *)(pkt->transport_layer + 4);

Interesting; I don't see any other code in QEMU to handle AH,
and I don't know much about it.

> +        break;
> +    default:
> +        break;
> +    }
> +
> +    return 0;
> +}
> +
> +static Packet *packet_new(ColoProxyState *s, const void *data,
> +                          int size, Connection_key *key, NetClientState *sender)
> +{
> +    Packet *pkt = g_slice_new(Packet);
> +
> +    pkt->data = g_malloc(size);
> +    memcpy(pkt->data, data, size);

g_memdup might be useful for these:
https://developer.gnome.org/glib/stable/glib-Memory-Allocation.html#g-memdup

> +    pkt->size = size;
> +    pkt->s = s;
> +    pkt->sender = sender;
> +    pkt->should_be_sent = false;
> +
> +    if (parse_packet_early(pkt, key)) {
> +        packet_destroy(pkt, NULL);
> +        pkt = NULL;
> +    }
> +
> +    return pkt;
> +}
> +
> +static void packet_destroy(void *opaque, void *user_data)
> +{
> +    Packet *pkt = opaque;
> +    g_free(pkt->data);
> +    g_slice_free(Packet, pkt);
> +}
> +
> +static Connection *colo_proxy_enqueue_packet(GHashTable *unprocessed_packets,
> +                                          Connection_key *key,
> +                                          Packet *pkt, packet_type type)
> +{
> +    Connection *connection;
> +    Packet *tmppkt;
> +    connection = g_hash_table_lookup(unprocessed_packets, key);
> +    if (connection == NULL) {
> +        Connection_key *new_key = g_malloc(sizeof(*key));
> +
> +        connection = connection_new();
> +        memcpy(new_key, key, sizeof(*key));
> +        key = new_key;
> +
> +        g_hash_table_insert(unprocessed_packets, key, connection);

Is 'unprocessed_packets' a good name for this hashtable? I'm not quite
sure I understand, but it looks to me like it's your connection-tracking equivalent,
which then has a queue for each connection with unprocessed packets?

Also, do we do anything to stop this hash growing really huge? If there
are lots-and-lots of connections can we limit it somehow? (what does Linux do?)

> +    }
> +    switch (type) {
> +    case PRIMARY_OUTPUT:
> +        if (g_queue_get_length(&connection->secondary_list) > 0) {

Please add some more comments; I think this is when a packet comes in
on the primary, and then we find we've already got a packet from the secondary
waiting?

> +            tmppkt = g_queue_pop_head(&connection->secondary_list);
> +            DEBUG("g_queue_get_length(&connection->primary_list)=%d\n",
> +                        g_queue_get_length(&connection->primary_list));
> +            DEBUG("g_queue_get_length(&connection->secondary_list)=%d\n",
> +                        g_queue_get_length(&connection->secondary_list));

> +            if (colo_packet_compare(pkt, tmppkt)) {
> +                DEBUG("packet same and release packet\n");
> +                pkt->should_be_sent = true;
> +                break;
> +            } else {
> +                DEBUG("packet different\n");
> +                colo_proxy_notify_checkpoint();
> +                pkt->should_be_sent = false;
> +                break;
> +            }
> +        } else {
> +            g_queue_push_tail(&connection->primary_list, pkt);
> +            pkt->should_be_sent = false;
> +        }
> +
> +        break;
> +    case SECONDARY_OUTPUT:
> +        g_queue_push_tail(&connection->secondary_list, pkt);
> +        DEBUG("secondary pkt data=%s,  pkt->ip->ipsrc=%x,pkt->ip->ipdst=%x\n",
> +                    (char *)pkt->data, pkt->ip->ip_src, pkt->ip->ip_dst);
> +        break;
> +    default:
> +        abort();
> +    }
> +
> +    return connection;
> +}
> +
>  
>  /*
>   * Packets to be sent by colo forward to
> @@ -165,7 +329,8 @@ static ssize_t colo_proxy_primary_handler(NetFilterState *nf,
>      }
>  
>      if (direction == NET_FILTER_DIRECTION_RX) {
> -        /* TODO: enqueue_primary_packet */
> +        ret = colo_enqueue_primary_packet(nf, sender, flags, iov,
> +                    iovcnt, sent_cb);

The routine above is 'colo_enqueue_packet' rather than colo_enqueue_primary_packet?

>      } else {
>          ret = colo_forward2another(nf, sender, flags, iov, iovcnt,
>                      sent_cb, COLO_PRIMARY_MODE);
> -- 
> 1.9.1

Dave

> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Zhang Chen Dec. 3, 2015, 6:35 a.m. UTC | #2
Hi,Dave

On 12/02/2015 12:12 AM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> From: zhangchen <zhangchen.fnst@cn.fujitsu.com>
>>
>> Add common packet handle function and enqueue
>> packet distinguished connection,then we can
>> lookup one connection packet to compare
>>
>> Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com>
>> ---
>>   net/colo-proxy.c | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>   1 file changed, 166 insertions(+), 1 deletion(-)
>>
>> diff --git a/net/colo-proxy.c b/net/colo-proxy.c
>> index 08a852f..a664e6d 100644
>> --- a/net/colo-proxy.c
>> +++ b/net/colo-proxy.c
>> @@ -24,6 +24,170 @@
>>   
>>   static char *mode;
>>   static bool colo_do_checkpoint;
>> +static void packet_destroy(void *opaque, void *user_data);
>> +
>> +static uint32_t connection_key_hash(const void *opaque)
>> +{
>> +    const Connection_key *key = opaque;
>> +    uint32_t a, b, c;
>> +
>> +    /* Jenkins hash */
>> +    a = b = c = JHASH_INITVAL + sizeof(*key);
>> +    a += key->src;
>> +    b += key->dst;
>> +    c += key->ports;
>> +    __jhash_mix(a, b, c);
>> +
>> +    a += key->ip_proto;
>> +    __jhash_final(a, b, c);
>> +
>> +    return c;
>> +}
>> +
>> +static int connection_key_equal(const void *opaque1, const void *opaque2)
>> +{
>> +    return memcmp(opaque1, opaque2, sizeof(Connection_key)) == 0;
>> +}
>> +
>> +static void connection_destroy(void *opaque)
>> +{
>> +    Connection *connection = opaque;
>> +    g_queue_foreach(&connection->primary_list, packet_destroy, NULL);
>> +    g_queue_free(&connection->primary_list);
>> +    g_queue_foreach(&connection->secondary_list, packet_destroy, NULL);
>> +    g_queue_free(&connection->secondary_list);
>> +    g_slice_free(Connection, connection);
>> +}
>> +
>> +static Connection *connection_new(void)
>> +{
>> +    Connection *connection = g_slice_new(Connection);
>> +
>> +    g_queue_init(&connection->primary_list);
>> +    g_queue_init(&connection->secondary_list);
>> +    connection->processing = false;
>> +
>> +    return connection;
>> +}
>> +
>> +/* Return 0 on success, or return -1 if the pkt is corrpted */
>> +static int parse_packet_early(Packet *pkt, Connection_key *key)
>> +{
>> +    int network_length;
>> +    uint8_t *data = pkt->data;
>> +
>> +    pkt->network_layer = data + ETH_HLEN;
>> +    if (ntohs(*(uint16_t *)(data + 12)) != ETH_P_IP) {
>> +        if (ntohs(*(uint16_t *)(data + 12)) == ETH_P_ARP) {
>> +            return -1;
>> +        }
>> +        return 0;
>> +    }
> Can you use some of the functions/macros in include/net/eth.h to
> make this easier? Maybe eth_get_l3_proto ?
> Do you plan to do IPv6 at some point?

I will use include/net/eth.h in next version

IPv6 currently not support, still colo framework be merged

>> +    network_length = pkt->ip->ip_hl * 4;
>> +    pkt->transport_layer = pkt->network_layer + network_length;
>> +    key->ip_proto = pkt->ip->ip_p;
>> +    key->src = pkt->ip->ip_src;
>> +    key->dst = pkt->ip->ip_dst;
>> +
>> +    switch (key->ip_proto) {
>> +    case IPPROTO_TCP:
>> +    case IPPROTO_UDP:
>> +    case IPPROTO_DCCP:
>> +    case IPPROTO_ESP:
>> +    case IPPROTO_SCTP:
>> +    case IPPROTO_UDPLITE:
>> +        key->ports = *(uint32_t *)(pkt->transport_layer);
>> +        break;
>> +    case IPPROTO_AH:
>> +        key->ports = *(uint32_t *)(pkt->transport_layer + 4);
> Interesting; I don't see any other code in QEMU to handle AH,
> and I don't know much about it.
>
>> +        break;
>> +    default:
>> +        break;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static Packet *packet_new(ColoProxyState *s, const void *data,
>> +                          int size, Connection_key *key, NetClientState *sender)
>> +{
>> +    Packet *pkt = g_slice_new(Packet);
>> +
>> +    pkt->data = g_malloc(size);
>> +    memcpy(pkt->data, data, size);
> g_memdup might be useful for these:
> https://developer.gnome.org/glib/stable/glib-Memory-Allocation.html#g-memdup

I will fix it in next version

>> +    pkt->size = size;
>> +    pkt->s = s;
>> +    pkt->sender = sender;
>> +    pkt->should_be_sent = false;
>> +
>> +    if (parse_packet_early(pkt, key)) {
>> +        packet_destroy(pkt, NULL);
>> +        pkt = NULL;
>> +    }
>> +
>> +    return pkt;
>> +}
>> +
>> +static void packet_destroy(void *opaque, void *user_data)
>> +{
>> +    Packet *pkt = opaque;
>> +    g_free(pkt->data);
>> +    g_slice_free(Packet, pkt);
>> +}
>> +
>> +static Connection *colo_proxy_enqueue_packet(GHashTable *unprocessed_packets,
>> +                                          Connection_key *key,
>> +                                          Packet *pkt, packet_type type)
>> +{
>> +    Connection *connection;
>> +    Packet *tmppkt;
>> +    connection = g_hash_table_lookup(unprocessed_packets, key);
>> +    if (connection == NULL) {
>> +        Connection_key *new_key = g_malloc(sizeof(*key));
>> +
>> +        connection = connection_new();
>> +        memcpy(new_key, key, sizeof(*key));
>> +        key = new_key;
>> +
>> +        g_hash_table_insert(unprocessed_packets, key, connection);
> Is 'unprocessed_packets' a good name for this hashtable? I'm not quite
> sure I understand, but it looks to me like it's your connection-tracking equivalent,
> which then has a queue for each connection with unprocessed packets?

i will change hashtable name to connection_track_table,is it ok?

> Also, do we do anything to stop this hash growing really huge? If there
> are lots-and-lots of connections can we limit it somehow? (what does Linux do?)

when we find PVM's packet different to SVM's packet,colo will do 
checkpoint.
that's means we will flush all connection's packets,even though all 
packets are
same,colo will alse do checkpoint periodically. so hashtable can't 
growing really huge.

>> +    }
>> +    switch (type) {
>> +    case PRIMARY_OUTPUT:
>> +        if (g_queue_get_length(&connection->secondary_list) > 0) {
> Please add some more comments; I think this is when a packet comes in
> on the primary, and then we find we've already got a packet from the secondary
> waiting?

yes,you are right

I will add more comments in next version

>> +            tmppkt = g_queue_pop_head(&connection->secondary_list);
>> +            DEBUG("g_queue_get_length(&connection->primary_list)=%d\n",
>> +                        g_queue_get_length(&connection->primary_list));
>> +            DEBUG("g_queue_get_length(&connection->secondary_list)=%d\n",
>> +                        g_queue_get_length(&connection->secondary_list));
>> +            if (colo_packet_compare(pkt, tmppkt)) {
>> +                DEBUG("packet same and release packet\n");
>> +                pkt->should_be_sent = true;
>> +                break;
>> +            } else {
>> +                DEBUG("packet different\n");
>> +                colo_proxy_notify_checkpoint();
>> +                pkt->should_be_sent = false;
>> +                break;
>> +            }
>> +        } else {
>> +            g_queue_push_tail(&connection->primary_list, pkt);
>> +            pkt->should_be_sent = false;
>> +        }
>> +
>> +        break;
>> +    case SECONDARY_OUTPUT:
>> +        g_queue_push_tail(&connection->secondary_list, pkt);
>> +        DEBUG("secondary pkt data=%s,  pkt->ip->ipsrc=%x,pkt->ip->ipdst=%x\n",
>> +                    (char *)pkt->data, pkt->ip->ip_src, pkt->ip->ip_dst);
>> +        break;
>> +    default:
>> +        abort();
>> +    }
>> +
>> +    return connection;
>> +}
>> +
>>   
>>   /*
>>    * Packets to be sent by colo forward to
>> @@ -165,7 +329,8 @@ static ssize_t colo_proxy_primary_handler(NetFilterState *nf,
>>       }
>>   
>>       if (direction == NET_FILTER_DIRECTION_RX) {
>> -        /* TODO: enqueue_primary_packet */
>> +        ret = colo_enqueue_primary_packet(nf, sender, flags, iov,
>> +                    iovcnt, sent_cb);
> The routine above is 'colo_enqueue_packet' rather than colo_enqueue_primary_packet?

yes,colo_enqueue_packet is enqueue packet common

Thanks for review
zhangchen

>>       } else {
>>           ret = colo_forward2another(nf, sender, flags, iov, iovcnt,
>>                       sent_cb, COLO_PRIMARY_MODE);
>> -- 
>> 1.9.1
> Dave
>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>
Dr. David Alan Gilbert Dec. 3, 2015, 9:09 a.m. UTC | #3
* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> 
> Hi,Dave
> 
> On 12/02/2015 12:12 AM, Dr. David Alan Gilbert wrote:
> >* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> >>From: zhangchen <zhangchen.fnst@cn.fujitsu.com>
> >>
> >>Add common packet handle function and enqueue
> >>packet distinguished connection,then we can
> >>lookup one connection packet to compare
> >>
> >>Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com>
> >>---
> >>  net/colo-proxy.c | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
> >>  1 file changed, 166 insertions(+), 1 deletion(-)
> >>
> >>diff --git a/net/colo-proxy.c b/net/colo-proxy.c
> >>index 08a852f..a664e6d 100644
> >>--- a/net/colo-proxy.c
> >>+++ b/net/colo-proxy.c
> >>@@ -24,6 +24,170 @@
> >>  static char *mode;
> >>  static bool colo_do_checkpoint;
> >>+static void packet_destroy(void *opaque, void *user_data);
> >>+
> >>+static uint32_t connection_key_hash(const void *opaque)
> >>+{
> >>+    const Connection_key *key = opaque;
> >>+    uint32_t a, b, c;
> >>+
> >>+    /* Jenkins hash */
> >>+    a = b = c = JHASH_INITVAL + sizeof(*key);
> >>+    a += key->src;
> >>+    b += key->dst;
> >>+    c += key->ports;
> >>+    __jhash_mix(a, b, c);
> >>+
> >>+    a += key->ip_proto;
> >>+    __jhash_final(a, b, c);
> >>+
> >>+    return c;
> >>+}
> >>+
> >>+static int connection_key_equal(const void *opaque1, const void *opaque2)
> >>+{
> >>+    return memcmp(opaque1, opaque2, sizeof(Connection_key)) == 0;
> >>+}
> >>+
> >>+static void connection_destroy(void *opaque)
> >>+{
> >>+    Connection *connection = opaque;
> >>+    g_queue_foreach(&connection->primary_list, packet_destroy, NULL);
> >>+    g_queue_free(&connection->primary_list);
> >>+    g_queue_foreach(&connection->secondary_list, packet_destroy, NULL);
> >>+    g_queue_free(&connection->secondary_list);
> >>+    g_slice_free(Connection, connection);
> >>+}
> >>+
> >>+static Connection *connection_new(void)
> >>+{
> >>+    Connection *connection = g_slice_new(Connection);
> >>+
> >>+    g_queue_init(&connection->primary_list);
> >>+    g_queue_init(&connection->secondary_list);
> >>+    connection->processing = false;
> >>+
> >>+    return connection;
> >>+}
> >>+
> >>+/* Return 0 on success, or return -1 if the pkt is corrpted */
> >>+static int parse_packet_early(Packet *pkt, Connection_key *key)
> >>+{
> >>+    int network_length;
> >>+    uint8_t *data = pkt->data;
> >>+
> >>+    pkt->network_layer = data + ETH_HLEN;
> >>+    if (ntohs(*(uint16_t *)(data + 12)) != ETH_P_IP) {
> >>+        if (ntohs(*(uint16_t *)(data + 12)) == ETH_P_ARP) {
> >>+            return -1;
> >>+        }
> >>+        return 0;
> >>+    }
> >Can you use some of the functions/macros in include/net/eth.h to
> >make this easier? Maybe eth_get_l3_proto ?
> >Do you plan to do IPv6 at some point?
> 
> I will use include/net/eth.h in next version
> 
> IPv6 currently not support, still colo framework be merged
> 
> >>+    network_length = pkt->ip->ip_hl * 4;
> >>+    pkt->transport_layer = pkt->network_layer + network_length;
> >>+    key->ip_proto = pkt->ip->ip_p;
> >>+    key->src = pkt->ip->ip_src;
> >>+    key->dst = pkt->ip->ip_dst;
> >>+
> >>+    switch (key->ip_proto) {
> >>+    case IPPROTO_TCP:
> >>+    case IPPROTO_UDP:
> >>+    case IPPROTO_DCCP:
> >>+    case IPPROTO_ESP:
> >>+    case IPPROTO_SCTP:
> >>+    case IPPROTO_UDPLITE:
> >>+        key->ports = *(uint32_t *)(pkt->transport_layer);
> >>+        break;
> >>+    case IPPROTO_AH:
> >>+        key->ports = *(uint32_t *)(pkt->transport_layer + 4);
> >Interesting; I don't see any other code in QEMU to handle AH,
> >and I don't know much about it.
> >
> >>+        break;
> >>+    default:
> >>+        break;
> >>+    }
> >>+
> >>+    return 0;
> >>+}
> >>+
> >>+static Packet *packet_new(ColoProxyState *s, const void *data,
> >>+                          int size, Connection_key *key, NetClientState *sender)
> >>+{
> >>+    Packet *pkt = g_slice_new(Packet);
> >>+
> >>+    pkt->data = g_malloc(size);
> >>+    memcpy(pkt->data, data, size);
> >g_memdup might be useful for these:
> >https://developer.gnome.org/glib/stable/glib-Memory-Allocation.html#g-memdup
> 
> I will fix it in next version
> 
> >>+    pkt->size = size;
> >>+    pkt->s = s;
> >>+    pkt->sender = sender;
> >>+    pkt->should_be_sent = false;
> >>+
> >>+    if (parse_packet_early(pkt, key)) {
> >>+        packet_destroy(pkt, NULL);
> >>+        pkt = NULL;
> >>+    }
> >>+
> >>+    return pkt;
> >>+}
> >>+
> >>+static void packet_destroy(void *opaque, void *user_data)
> >>+{
> >>+    Packet *pkt = opaque;
> >>+    g_free(pkt->data);
> >>+    g_slice_free(Packet, pkt);
> >>+}
> >>+
> >>+static Connection *colo_proxy_enqueue_packet(GHashTable *unprocessed_packets,
> >>+                                          Connection_key *key,
> >>+                                          Packet *pkt, packet_type type)
> >>+{
> >>+    Connection *connection;
> >>+    Packet *tmppkt;
> >>+    connection = g_hash_table_lookup(unprocessed_packets, key);
> >>+    if (connection == NULL) {
> >>+        Connection_key *new_key = g_malloc(sizeof(*key));
> >>+
> >>+        connection = connection_new();
> >>+        memcpy(new_key, key, sizeof(*key));
> >>+        key = new_key;
> >>+
> >>+        g_hash_table_insert(unprocessed_packets, key, connection);
> >Is 'unprocessed_packets' a good name for this hashtable? I'm not quite
> >sure I understand, but it looks to me like it's your connection-tracking equivalent,
> >which then has a queue for each connection with unprocessed packets?
> 
> i will change hashtable name to connection_track_table,is it ok?

Yes, thank you.

> >Also, do we do anything to stop this hash growing really huge? If there
> >are lots-and-lots of connections can we limit it somehow? (what does Linux do?)
> 
> when we find PVM's packet different to SVM's packet,colo will do checkpoint.
> that's means we will flush all connection's packets,even though all packets
> are
> same,colo will alse do checkpoint periodically. so hashtable can't growing
> really huge.

I see the flush clears all the packets, but does it also clear the hash?

> >>+    }
> >>+    switch (type) {
> >>+    case PRIMARY_OUTPUT:
> >>+        if (g_queue_get_length(&connection->secondary_list) > 0) {
> >Please add some more comments; I think this is when a packet comes in
> >on the primary, and then we find we've already got a packet from the secondary
> >waiting?
> 
> yes,you are right
> 
> I will add more comments in next version

Thank you.

Dave

> >>+            tmppkt = g_queue_pop_head(&connection->secondary_list);
> >>+            DEBUG("g_queue_get_length(&connection->primary_list)=%d\n",
> >>+                        g_queue_get_length(&connection->primary_list));
> >>+            DEBUG("g_queue_get_length(&connection->secondary_list)=%d\n",
> >>+                        g_queue_get_length(&connection->secondary_list));
> >>+            if (colo_packet_compare(pkt, tmppkt)) {
> >>+                DEBUG("packet same and release packet\n");
> >>+                pkt->should_be_sent = true;
> >>+                break;
> >>+            } else {
> >>+                DEBUG("packet different\n");
> >>+                colo_proxy_notify_checkpoint();
> >>+                pkt->should_be_sent = false;
> >>+                break;
> >>+            }
> >>+        } else {
> >>+            g_queue_push_tail(&connection->primary_list, pkt);
> >>+            pkt->should_be_sent = false;
> >>+        }
> >>+
> >>+        break;
> >>+    case SECONDARY_OUTPUT:
> >>+        g_queue_push_tail(&connection->secondary_list, pkt);
> >>+        DEBUG("secondary pkt data=%s,  pkt->ip->ipsrc=%x,pkt->ip->ipdst=%x\n",
> >>+                    (char *)pkt->data, pkt->ip->ip_src, pkt->ip->ip_dst);
> >>+        break;
> >>+    default:
> >>+        abort();
> >>+    }
> >>+
> >>+    return connection;
> >>+}
> >>+
> >>  /*
> >>   * Packets to be sent by colo forward to
> >>@@ -165,7 +329,8 @@ static ssize_t colo_proxy_primary_handler(NetFilterState *nf,
> >>      }
> >>      if (direction == NET_FILTER_DIRECTION_RX) {
> >>-        /* TODO: enqueue_primary_packet */
> >>+        ret = colo_enqueue_primary_packet(nf, sender, flags, iov,
> >>+                    iovcnt, sent_cb);
> >The routine above is 'colo_enqueue_packet' rather than colo_enqueue_primary_packet?
> 
> yes,colo_enqueue_packet is enqueue packet common
> 
> Thanks for review
> zhangchen
> 
> >>      } else {
> >>          ret = colo_forward2another(nf, sender, flags, iov, iovcnt,
> >>                      sent_cb, COLO_PRIMARY_MODE);
> >>-- 
> >>1.9.1
> >Dave
> >
> >>
> >>
> >--
> >Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> >
> >
> >.
> >
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Zhang Chen Dec. 4, 2015, 3:21 a.m. UTC | #4
Hi,Dave


On 12/03/2015 05:09 PM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> Hi,Dave
>>
>> On 12/02/2015 12:12 AM, Dr. David Alan Gilbert wrote:
>>> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>>>> From: zhangchen <zhangchen.fnst@cn.fujitsu.com>
>>>>
>>>> Add common packet handle function and enqueue
>>>> packet distinguished connection,then we can
>>>> lookup one connection packet to compare
>>>>
>>>> Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com>
>>>> ---
>>>>   net/colo-proxy.c | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>>>   1 file changed, 166 insertions(+), 1 deletion(-)
>>>>
>>>> diff --git a/net/colo-proxy.c b/net/colo-proxy.c
>>>> index 08a852f..a664e6d 100644
>>>> --- a/net/colo-proxy.c
>>>> +++ b/net/colo-proxy.c
>>>> @@ -24,6 +24,170 @@
>>>>   static char *mode;
>>>>   static bool colo_do_checkpoint;
>>>> +static void packet_destroy(void *opaque, void *user_data);
>>>> +
>>>> +static uint32_t connection_key_hash(const void *opaque)
>>>> +{
>>>> +    const Connection_key *key = opaque;
>>>> +    uint32_t a, b, c;
>>>> +
>>>> +    /* Jenkins hash */
>>>> +    a = b = c = JHASH_INITVAL + sizeof(*key);
>>>> +    a += key->src;
>>>> +    b += key->dst;
>>>> +    c += key->ports;
>>>> +    __jhash_mix(a, b, c);
>>>> +
>>>> +    a += key->ip_proto;
>>>> +    __jhash_final(a, b, c);
>>>> +
>>>> +    return c;
>>>> +}
>>>> +
>>>> +static int connection_key_equal(const void *opaque1, const void *opaque2)
>>>> +{
>>>> +    return memcmp(opaque1, opaque2, sizeof(Connection_key)) == 0;
>>>> +}
>>>> +
>>>> +static void connection_destroy(void *opaque)
>>>> +{
>>>> +    Connection *connection = opaque;
>>>> +    g_queue_foreach(&connection->primary_list, packet_destroy, NULL);
>>>> +    g_queue_free(&connection->primary_list);
>>>> +    g_queue_foreach(&connection->secondary_list, packet_destroy, NULL);
>>>> +    g_queue_free(&connection->secondary_list);
>>>> +    g_slice_free(Connection, connection);
>>>> +}
>>>> +
>>>> +static Connection *connection_new(void)
>>>> +{
>>>> +    Connection *connection = g_slice_new(Connection);
>>>> +
>>>> +    g_queue_init(&connection->primary_list);
>>>> +    g_queue_init(&connection->secondary_list);
>>>> +    connection->processing = false;
>>>> +
>>>> +    return connection;
>>>> +}
>>>> +
>>>> +/* Return 0 on success, or return -1 if the pkt is corrpted */
>>>> +static int parse_packet_early(Packet *pkt, Connection_key *key)
>>>> +{
>>>> +    int network_length;
>>>> +    uint8_t *data = pkt->data;
>>>> +
>>>> +    pkt->network_layer = data + ETH_HLEN;
>>>> +    if (ntohs(*(uint16_t *)(data + 12)) != ETH_P_IP) {
>>>> +        if (ntohs(*(uint16_t *)(data + 12)) == ETH_P_ARP) {
>>>> +            return -1;
>>>> +        }
>>>> +        return 0;
>>>> +    }
>>> Can you use some of the functions/macros in include/net/eth.h to
>>> make this easier? Maybe eth_get_l3_proto ?
>>> Do you plan to do IPv6 at some point?
>> I will use include/net/eth.h in next version
>>
>> IPv6 currently not support, still colo framework be merged
>>
>>>> +    network_length = pkt->ip->ip_hl * 4;
>>>> +    pkt->transport_layer = pkt->network_layer + network_length;
>>>> +    key->ip_proto = pkt->ip->ip_p;
>>>> +    key->src = pkt->ip->ip_src;
>>>> +    key->dst = pkt->ip->ip_dst;
>>>> +
>>>> +    switch (key->ip_proto) {
>>>> +    case IPPROTO_TCP:
>>>> +    case IPPROTO_UDP:
>>>> +    case IPPROTO_DCCP:
>>>> +    case IPPROTO_ESP:
>>>> +    case IPPROTO_SCTP:
>>>> +    case IPPROTO_UDPLITE:
>>>> +        key->ports = *(uint32_t *)(pkt->transport_layer);
>>>> +        break;
>>>> +    case IPPROTO_AH:
>>>> +        key->ports = *(uint32_t *)(pkt->transport_layer + 4);
>>> Interesting; I don't see any other code in QEMU to handle AH,
>>> and I don't know much about it.
>>>
>>>> +        break;
>>>> +    default:
>>>> +        break;
>>>> +    }
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +static Packet *packet_new(ColoProxyState *s, const void *data,
>>>> +                          int size, Connection_key *key, NetClientState *sender)
>>>> +{
>>>> +    Packet *pkt = g_slice_new(Packet);
>>>> +
>>>> +    pkt->data = g_malloc(size);
>>>> +    memcpy(pkt->data, data, size);
>>> g_memdup might be useful for these:
>>> https://developer.gnome.org/glib/stable/glib-Memory-Allocation.html#g-memdup
>> I will fix it in next version
>>
>>>> +    pkt->size = size;
>>>> +    pkt->s = s;
>>>> +    pkt->sender = sender;
>>>> +    pkt->should_be_sent = false;
>>>> +
>>>> +    if (parse_packet_early(pkt, key)) {
>>>> +        packet_destroy(pkt, NULL);
>>>> +        pkt = NULL;
>>>> +    }
>>>> +
>>>> +    return pkt;
>>>> +}
>>>> +
>>>> +static void packet_destroy(void *opaque, void *user_data)
>>>> +{
>>>> +    Packet *pkt = opaque;
>>>> +    g_free(pkt->data);
>>>> +    g_slice_free(Packet, pkt);
>>>> +}
>>>> +
>>>> +static Connection *colo_proxy_enqueue_packet(GHashTable *unprocessed_packets,
>>>> +                                          Connection_key *key,
>>>> +                                          Packet *pkt, packet_type type)
>>>> +{
>>>> +    Connection *connection;
>>>> +    Packet *tmppkt;
>>>> +    connection = g_hash_table_lookup(unprocessed_packets, key);
>>>> +    if (connection == NULL) {
>>>> +        Connection_key *new_key = g_malloc(sizeof(*key));
>>>> +
>>>> +        connection = connection_new();
>>>> +        memcpy(new_key, key, sizeof(*key));
>>>> +        key = new_key;
>>>> +
>>>> +        g_hash_table_insert(unprocessed_packets, key, connection);
>>> Is 'unprocessed_packets' a good name for this hashtable? I'm not quite
>>> sure I understand, but it looks to me like it's your connection-tracking equivalent,
>>> which then has a queue for each connection with unprocessed packets?
>> i will change hashtable name to connection_track_table,is it ok?
> Yes, thank you.
>
>>> Also, do we do anything to stop this hash growing really huge? If there
>>> are lots-and-lots of connections can we limit it somehow? (what does Linux do?)
>> when we find PVM's packet different to SVM's packet,colo will do checkpoint.
>> that's means we will flush all connection's packets,even though all packets
>> are
>> same,colo will alse do checkpoint periodically. so hashtable can't growing
>> really huge.
> I see the flush clears all the packets, but does it also clear the hash?
>

I read the kernel code,TCP conntrack will clear hash one time every five 
days periodicity.
and the hashtable size
     /* Idea from tcp.c: use 1/16384 of memory.  On i386: 32MB
      * machine has 512 buckets. >= 1GB machines have 16384 buckets. */
I will follow kernel's done to fix colo-proxy in next version.

Thanks for review
zhangchen

>>>> +    }
>>>> +    switch (type) {
>>>> +    case PRIMARY_OUTPUT:
>>>> +        if (g_queue_get_length(&connection->secondary_list) > 0) {
>>> Please add some more comments; I think this is when a packet comes in
>>> on the primary, and then we find we've already got a packet from the secondary
>>> waiting?
>> yes,you are right
>>
>> I will add more comments in next version
> Thank you.
>
> Dave
>
>>>> +            tmppkt = g_queue_pop_head(&connection->secondary_list);
>>>> +            DEBUG("g_queue_get_length(&connection->primary_list)=%d\n",
>>>> +                        g_queue_get_length(&connection->primary_list));
>>>> +            DEBUG("g_queue_get_length(&connection->secondary_list)=%d\n",
>>>> +                        g_queue_get_length(&connection->secondary_list));
>>>> +            if (colo_packet_compare(pkt, tmppkt)) {
>>>> +                DEBUG("packet same and release packet\n");
>>>> +                pkt->should_be_sent = true;
>>>> +                break;
>>>> +            } else {
>>>> +                DEBUG("packet different\n");
>>>> +                colo_proxy_notify_checkpoint();
>>>> +                pkt->should_be_sent = false;
>>>> +                break;
>>>> +            }
>>>> +        } else {
>>>> +            g_queue_push_tail(&connection->primary_list, pkt);
>>>> +            pkt->should_be_sent = false;
>>>> +        }
>>>> +
>>>> +        break;
>>>> +    case SECONDARY_OUTPUT:
>>>> +        g_queue_push_tail(&connection->secondary_list, pkt);
>>>> +        DEBUG("secondary pkt data=%s,  pkt->ip->ipsrc=%x,pkt->ip->ipdst=%x\n",
>>>> +                    (char *)pkt->data, pkt->ip->ip_src, pkt->ip->ip_dst);
>>>> +        break;
>>>> +    default:
>>>> +        abort();
>>>> +    }
>>>> +
>>>> +    return connection;
>>>> +}
>>>> +
>>>>   /*
>>>>    * Packets to be sent by colo forward to
>>>> @@ -165,7 +329,8 @@ static ssize_t colo_proxy_primary_handler(NetFilterState *nf,
>>>>       }
>>>>       if (direction == NET_FILTER_DIRECTION_RX) {
>>>> -        /* TODO: enqueue_primary_packet */
>>>> +        ret = colo_enqueue_primary_packet(nf, sender, flags, iov,
>>>> +                    iovcnt, sent_cb);
>>> The routine above is 'colo_enqueue_packet' rather than colo_enqueue_primary_packet?
>> yes,colo_enqueue_packet is enqueue packet common
>>
>> Thanks for review
>> zhangchen
>>
>>>>       } else {
>>>>           ret = colo_forward2another(nf, sender, flags, iov, iovcnt,
>>>>                       sent_cb, COLO_PRIMARY_MODE);
>>>> -- 
>>>> 1.9.1
>>> Dave
>>>
>>>>
>>> --
>>> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>>>
>>>
>>> .
>>>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>
Dr. David Alan Gilbert Dec. 4, 2015, 9:14 a.m. UTC | #5
* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> Hi,Dave
> 
> 
> On 12/03/2015 05:09 PM, Dr. David Alan Gilbert wrote:
> >* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> >>Hi,Dave
> >>
> >>On 12/02/2015 12:12 AM, Dr. David Alan Gilbert wrote:
> >>>* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> >>>>From: zhangchen <zhangchen.fnst@cn.fujitsu.com>
> >>>>
> >>>>Add common packet handle function and enqueue
> >>>>packet distinguished connection,then we can
> >>>>lookup one connection packet to compare
> >>>>
> >>>>Signed-off-by: zhangchen <zhangchen.fnst@cn.fujitsu.com>
> >>>>---
> >>>>  net/colo-proxy.c | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
> >>>>  1 file changed, 166 insertions(+), 1 deletion(-)
> >>>>
> >>>>diff --git a/net/colo-proxy.c b/net/colo-proxy.c
> >>>>index 08a852f..a664e6d 100644
> >>>>--- a/net/colo-proxy.c
> >>>>+++ b/net/colo-proxy.c
> >>>>@@ -24,6 +24,170 @@
> >>>>  static char *mode;
> >>>>  static bool colo_do_checkpoint;
> >>>>+static void packet_destroy(void *opaque, void *user_data);
> >>>>+
> >>>>+static uint32_t connection_key_hash(const void *opaque)
> >>>>+{
> >>>>+    const Connection_key *key = opaque;
> >>>>+    uint32_t a, b, c;
> >>>>+
> >>>>+    /* Jenkins hash */
> >>>>+    a = b = c = JHASH_INITVAL + sizeof(*key);
> >>>>+    a += key->src;
> >>>>+    b += key->dst;
> >>>>+    c += key->ports;
> >>>>+    __jhash_mix(a, b, c);
> >>>>+
> >>>>+    a += key->ip_proto;
> >>>>+    __jhash_final(a, b, c);
> >>>>+
> >>>>+    return c;
> >>>>+}
> >>>>+
> >>>>+static int connection_key_equal(const void *opaque1, const void *opaque2)
> >>>>+{
> >>>>+    return memcmp(opaque1, opaque2, sizeof(Connection_key)) == 0;
> >>>>+}
> >>>>+
> >>>>+static void connection_destroy(void *opaque)
> >>>>+{
> >>>>+    Connection *connection = opaque;
> >>>>+    g_queue_foreach(&connection->primary_list, packet_destroy, NULL);
> >>>>+    g_queue_free(&connection->primary_list);
> >>>>+    g_queue_foreach(&connection->secondary_list, packet_destroy, NULL);
> >>>>+    g_queue_free(&connection->secondary_list);
> >>>>+    g_slice_free(Connection, connection);
> >>>>+}
> >>>>+
> >>>>+static Connection *connection_new(void)
> >>>>+{
> >>>>+    Connection *connection = g_slice_new(Connection);
> >>>>+
> >>>>+    g_queue_init(&connection->primary_list);
> >>>>+    g_queue_init(&connection->secondary_list);
> >>>>+    connection->processing = false;
> >>>>+
> >>>>+    return connection;
> >>>>+}
> >>>>+
> >>>>+/* Return 0 on success, or return -1 if the pkt is corrpted */
> >>>>+static int parse_packet_early(Packet *pkt, Connection_key *key)
> >>>>+{
> >>>>+    int network_length;
> >>>>+    uint8_t *data = pkt->data;
> >>>>+
> >>>>+    pkt->network_layer = data + ETH_HLEN;
> >>>>+    if (ntohs(*(uint16_t *)(data + 12)) != ETH_P_IP) {
> >>>>+        if (ntohs(*(uint16_t *)(data + 12)) == ETH_P_ARP) {
> >>>>+            return -1;
> >>>>+        }
> >>>>+        return 0;
> >>>>+    }
> >>>Can you use some of the functions/macros in include/net/eth.h to
> >>>make this easier? Maybe eth_get_l3_proto ?
> >>>Do you plan to do IPv6 at some point?
> >>I will use include/net/eth.h in next version
> >>
> >>IPv6 currently not support, still colo framework be merged
> >>
> >>>>+    network_length = pkt->ip->ip_hl * 4;
> >>>>+    pkt->transport_layer = pkt->network_layer + network_length;
> >>>>+    key->ip_proto = pkt->ip->ip_p;
> >>>>+    key->src = pkt->ip->ip_src;
> >>>>+    key->dst = pkt->ip->ip_dst;
> >>>>+
> >>>>+    switch (key->ip_proto) {
> >>>>+    case IPPROTO_TCP:
> >>>>+    case IPPROTO_UDP:
> >>>>+    case IPPROTO_DCCP:
> >>>>+    case IPPROTO_ESP:
> >>>>+    case IPPROTO_SCTP:
> >>>>+    case IPPROTO_UDPLITE:
> >>>>+        key->ports = *(uint32_t *)(pkt->transport_layer);
> >>>>+        break;
> >>>>+    case IPPROTO_AH:
> >>>>+        key->ports = *(uint32_t *)(pkt->transport_layer + 4);
> >>>Interesting; I don't see any other code in QEMU to handle AH,
> >>>and I don't know much about it.
> >>>
> >>>>+        break;
> >>>>+    default:
> >>>>+        break;
> >>>>+    }
> >>>>+
> >>>>+    return 0;
> >>>>+}
> >>>>+
> >>>>+static Packet *packet_new(ColoProxyState *s, const void *data,
> >>>>+                          int size, Connection_key *key, NetClientState *sender)
> >>>>+{
> >>>>+    Packet *pkt = g_slice_new(Packet);
> >>>>+
> >>>>+    pkt->data = g_malloc(size);
> >>>>+    memcpy(pkt->data, data, size);
> >>>g_memdup might be useful for these:
> >>>https://developer.gnome.org/glib/stable/glib-Memory-Allocation.html#g-memdup
> >>I will fix it in next version
> >>
> >>>>+    pkt->size = size;
> >>>>+    pkt->s = s;
> >>>>+    pkt->sender = sender;
> >>>>+    pkt->should_be_sent = false;
> >>>>+
> >>>>+    if (parse_packet_early(pkt, key)) {
> >>>>+        packet_destroy(pkt, NULL);
> >>>>+        pkt = NULL;
> >>>>+    }
> >>>>+
> >>>>+    return pkt;
> >>>>+}
> >>>>+
> >>>>+static void packet_destroy(void *opaque, void *user_data)
> >>>>+{
> >>>>+    Packet *pkt = opaque;
> >>>>+    g_free(pkt->data);
> >>>>+    g_slice_free(Packet, pkt);
> >>>>+}
> >>>>+
> >>>>+static Connection *colo_proxy_enqueue_packet(GHashTable *unprocessed_packets,
> >>>>+                                          Connection_key *key,
> >>>>+                                          Packet *pkt, packet_type type)
> >>>>+{
> >>>>+    Connection *connection;
> >>>>+    Packet *tmppkt;
> >>>>+    connection = g_hash_table_lookup(unprocessed_packets, key);
> >>>>+    if (connection == NULL) {
> >>>>+        Connection_key *new_key = g_malloc(sizeof(*key));
> >>>>+
> >>>>+        connection = connection_new();
> >>>>+        memcpy(new_key, key, sizeof(*key));
> >>>>+        key = new_key;
> >>>>+
> >>>>+        g_hash_table_insert(unprocessed_packets, key, connection);
> >>>Is 'unprocessed_packets' a good name for this hashtable? I'm not quite
> >>>sure I understand, but it looks to me like it's your connection-tracking equivalent,
> >>>which then has a queue for each connection with unprocessed packets?
> >>i will change hashtable name to connection_track_table,is it ok?
> >Yes, thank you.
> >
> >>>Also, do we do anything to stop this hash growing really huge? If there
> >>>are lots-and-lots of connections can we limit it somehow? (what does Linux do?)
> >>when we find PVM's packet different to SVM's packet,colo will do checkpoint.
> >>that's means we will flush all connection's packets,even though all packets
> >>are
> >>same,colo will alse do checkpoint periodically. so hashtable can't growing
> >>really huge.
> >I see the flush clears all the packets, but does it also clear the hash?
> >
> 
> I read the kernel code,TCP conntrack will clear hash one time every five
> days periodicity.
> and the hashtable size
>     /* Idea from tcp.c: use 1/16384 of memory.  On i386: 32MB
>      * machine has 512 buckets. >= 1GB machines have 16384 buckets. */
> I will follow kernel's done to fix colo-proxy in next version.

I think it's OK if you just set a size and make it limit to that size;
lets keep it simple for now.   I think you'll also have to free entries
when you see the TCP connection closed.


Dave

> 
> Thanks for review
> zhangchen
> 
> >>>>+    }
> >>>>+    switch (type) {
> >>>>+    case PRIMARY_OUTPUT:
> >>>>+        if (g_queue_get_length(&connection->secondary_list) > 0) {
> >>>Please add some more comments; I think this is when a packet comes in
> >>>on the primary, and then we find we've already got a packet from the secondary
> >>>waiting?
> >>yes,you are right
> >>
> >>I will add more comments in next version
> >Thank you.
> >
> >Dave
> >
> >>>>+            tmppkt = g_queue_pop_head(&connection->secondary_list);
> >>>>+            DEBUG("g_queue_get_length(&connection->primary_list)=%d\n",
> >>>>+                        g_queue_get_length(&connection->primary_list));
> >>>>+            DEBUG("g_queue_get_length(&connection->secondary_list)=%d\n",
> >>>>+                        g_queue_get_length(&connection->secondary_list));
> >>>>+            if (colo_packet_compare(pkt, tmppkt)) {
> >>>>+                DEBUG("packet same and release packet\n");
> >>>>+                pkt->should_be_sent = true;
> >>>>+                break;
> >>>>+            } else {
> >>>>+                DEBUG("packet different\n");
> >>>>+                colo_proxy_notify_checkpoint();
> >>>>+                pkt->should_be_sent = false;
> >>>>+                break;
> >>>>+            }
> >>>>+        } else {
> >>>>+            g_queue_push_tail(&connection->primary_list, pkt);
> >>>>+            pkt->should_be_sent = false;
> >>>>+        }
> >>>>+
> >>>>+        break;
> >>>>+    case SECONDARY_OUTPUT:
> >>>>+        g_queue_push_tail(&connection->secondary_list, pkt);
> >>>>+        DEBUG("secondary pkt data=%s,  pkt->ip->ipsrc=%x,pkt->ip->ipdst=%x\n",
> >>>>+                    (char *)pkt->data, pkt->ip->ip_src, pkt->ip->ip_dst);
> >>>>+        break;
> >>>>+    default:
> >>>>+        abort();
> >>>>+    }
> >>>>+
> >>>>+    return connection;
> >>>>+}
> >>>>+
> >>>>  /*
> >>>>   * Packets to be sent by colo forward to
> >>>>@@ -165,7 +329,8 @@ static ssize_t colo_proxy_primary_handler(NetFilterState *nf,
> >>>>      }
> >>>>      if (direction == NET_FILTER_DIRECTION_RX) {
> >>>>-        /* TODO: enqueue_primary_packet */
> >>>>+        ret = colo_enqueue_primary_packet(nf, sender, flags, iov,
> >>>>+                    iovcnt, sent_cb);
> >>>The routine above is 'colo_enqueue_packet' rather than colo_enqueue_primary_packet?
> >>yes,colo_enqueue_packet is enqueue packet common
> >>
> >>Thanks for review
> >>zhangchen
> >>
> >>>>      } else {
> >>>>          ret = colo_forward2another(nf, sender, flags, iov, iovcnt,
> >>>>                      sent_cb, COLO_PRIMARY_MODE);
> >>>>-- 
> >>>>1.9.1
> >>>Dave
> >>>
> >>>>
> >>>--
> >>>Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> >>>
> >>>
> >>>.
> >>>
> >>
> >>
> >--
> >Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> >
> >
> >.
> >
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox

Patch

diff --git a/net/colo-proxy.c b/net/colo-proxy.c
index 08a852f..a664e6d 100644
--- a/net/colo-proxy.c
+++ b/net/colo-proxy.c
@@ -24,6 +24,170 @@ 
 
 static char *mode;
 static bool colo_do_checkpoint;
+static void packet_destroy(void *opaque, void *user_data);
+
+static uint32_t connection_key_hash(const void *opaque)
+{
+    const Connection_key *key = opaque;
+    uint32_t a, b, c;
+
+    /* Jenkins hash */
+    a = b = c = JHASH_INITVAL + sizeof(*key);
+    a += key->src;
+    b += key->dst;
+    c += key->ports;
+    __jhash_mix(a, b, c);
+
+    a += key->ip_proto;
+    __jhash_final(a, b, c);
+
+    return c;
+}
+
+static int connection_key_equal(const void *opaque1, const void *opaque2)
+{
+    return memcmp(opaque1, opaque2, sizeof(Connection_key)) == 0;
+}
+
+static void connection_destroy(void *opaque)
+{
+    Connection *connection = opaque;
+    g_queue_foreach(&connection->primary_list, packet_destroy, NULL);
+    g_queue_free(&connection->primary_list);
+    g_queue_foreach(&connection->secondary_list, packet_destroy, NULL);
+    g_queue_free(&connection->secondary_list);
+    g_slice_free(Connection, connection);
+}
+
+static Connection *connection_new(void)
+{
+    Connection *connection = g_slice_new(Connection);
+
+    g_queue_init(&connection->primary_list);
+    g_queue_init(&connection->secondary_list);
+    connection->processing = false;
+
+    return connection;
+}
+
+/* Return 0 on success, or return -1 if the pkt is corrpted */
+static int parse_packet_early(Packet *pkt, Connection_key *key)
+{
+    int network_length;
+    uint8_t *data = pkt->data;
+
+    pkt->network_layer = data + ETH_HLEN;
+    if (ntohs(*(uint16_t *)(data + 12)) != ETH_P_IP) {
+        if (ntohs(*(uint16_t *)(data + 12)) == ETH_P_ARP) {
+            return -1;
+        }
+        return 0;
+    }
+
+    network_length = pkt->ip->ip_hl * 4;
+    pkt->transport_layer = pkt->network_layer + network_length;
+    key->ip_proto = pkt->ip->ip_p;
+    key->src = pkt->ip->ip_src;
+    key->dst = pkt->ip->ip_dst;
+
+    switch (key->ip_proto) {
+    case IPPROTO_TCP:
+    case IPPROTO_UDP:
+    case IPPROTO_DCCP:
+    case IPPROTO_ESP:
+    case IPPROTO_SCTP:
+    case IPPROTO_UDPLITE:
+        key->ports = *(uint32_t *)(pkt->transport_layer);
+        break;
+    case IPPROTO_AH:
+        key->ports = *(uint32_t *)(pkt->transport_layer + 4);
+        break;
+    default:
+        break;
+    }
+
+    return 0;
+}
+
+static Packet *packet_new(ColoProxyState *s, const void *data,
+                          int size, Connection_key *key, NetClientState *sender)
+{
+    Packet *pkt = g_slice_new(Packet);
+
+    pkt->data = g_malloc(size);
+    memcpy(pkt->data, data, size);
+    pkt->size = size;
+    pkt->s = s;
+    pkt->sender = sender;
+    pkt->should_be_sent = false;
+
+    if (parse_packet_early(pkt, key)) {
+        packet_destroy(pkt, NULL);
+        pkt = NULL;
+    }
+
+    return pkt;
+}
+
+static void packet_destroy(void *opaque, void *user_data)
+{
+    Packet *pkt = opaque;
+    g_free(pkt->data);
+    g_slice_free(Packet, pkt);
+}
+
+static Connection *colo_proxy_enqueue_packet(GHashTable *unprocessed_packets,
+                                          Connection_key *key,
+                                          Packet *pkt, packet_type type)
+{
+    Connection *connection;
+    Packet *tmppkt;
+    connection = g_hash_table_lookup(unprocessed_packets, key);
+    if (connection == NULL) {
+        Connection_key *new_key = g_malloc(sizeof(*key));
+
+        connection = connection_new();
+        memcpy(new_key, key, sizeof(*key));
+        key = new_key;
+
+        g_hash_table_insert(unprocessed_packets, key, connection);
+    }
+    switch (type) {
+    case PRIMARY_OUTPUT:
+        if (g_queue_get_length(&connection->secondary_list) > 0) {
+            tmppkt = g_queue_pop_head(&connection->secondary_list);
+            DEBUG("g_queue_get_length(&connection->primary_list)=%d\n",
+                        g_queue_get_length(&connection->primary_list));
+            DEBUG("g_queue_get_length(&connection->secondary_list)=%d\n",
+                        g_queue_get_length(&connection->secondary_list));
+            if (colo_packet_compare(pkt, tmppkt)) {
+                DEBUG("packet same and release packet\n");
+                pkt->should_be_sent = true;
+                break;
+            } else {
+                DEBUG("packet different\n");
+                colo_proxy_notify_checkpoint();
+                pkt->should_be_sent = false;
+                break;
+            }
+        } else {
+            g_queue_push_tail(&connection->primary_list, pkt);
+            pkt->should_be_sent = false;
+        }
+
+        break;
+    case SECONDARY_OUTPUT:
+        g_queue_push_tail(&connection->secondary_list, pkt);
+        DEBUG("secondary pkt data=%s,  pkt->ip->ipsrc=%x,pkt->ip->ipdst=%x\n",
+                    (char *)pkt->data, pkt->ip->ip_src, pkt->ip->ip_dst);
+        break;
+    default:
+        abort();
+    }
+
+    return connection;
+}
+
 
 /*
  * Packets to be sent by colo forward to
@@ -165,7 +329,8 @@  static ssize_t colo_proxy_primary_handler(NetFilterState *nf,
     }
 
     if (direction == NET_FILTER_DIRECTION_RX) {
-        /* TODO: enqueue_primary_packet */
+        ret = colo_enqueue_primary_packet(nf, sender, flags, iov,
+                    iovcnt, sent_cb);
     } else {
         ret = colo_forward2another(nf, sender, flags, iov, iovcnt,
                     sent_cb, COLO_PRIMARY_MODE);