diff mbox

[ovs-dev,monitor_cond,V4,17/17] ovsdb: Lazy cleanup of clause matched row changes

Message ID 1455796015-14898-18-git-send-email-lirans@il.ibm.com
State Superseded
Headers show

Commit Message

Liran Schour Feb. 18, 2016, 11:46 a.m. UTC
Maintain the minimal transaction ID per "==" table.
Run lazy cleanup while going over the monitor rows tracked by clauses
remove all rows that have a lower transaction than the minimal table
transaction ID.

Signed-off-by: Liran Schour <lirans@il.ibm.com>
---
 ovsdb/monitor.c | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 96 insertions(+), 6 deletions(-)
diff mbox

Patch

diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index 46d2d13..929b29e 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -148,6 +148,12 @@  struct ovsdb_monitor_changes {
                                     hmap.  */
 };
 
+/* Track of tracked txn_ids */
+struct ovsdb_monitor_txn_id {
+    int n_refs;
+    uint64_t transaction;
+};
+
 /* A particular table being monitored. */
 struct ovsdb_monitor_table {
     const struct ovsdb_table *table;
@@ -173,6 +179,11 @@  struct ovsdb_monitor_table {
     /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
     struct hmap changes;
     bool track_clauses;
+    /* Holds tracked txn_ids, first element is the minimal txn_id
+     * that is being tracked */
+    struct ovsdb_monitor_txn_id *txn_ids;
+    size_t allocated_txn_ids;
+    size_t n_txn_ids;
     /* Contains ovsdb_tracked_clauses * */
     struct shash trk_clauses;
 };
@@ -769,17 +780,74 @@  ovsdb_monitor_table_find_clause_changes(
     return changes;
 }
 
+static int
+compare_txn_ids(const void *a_, const void *b_)
+{
+    const struct ovsdb_monitor_txn_id *a = a_;
+    const struct ovsdb_monitor_txn_id *b = b_;
+
+    return a->transaction == b->transaction ? 0 :
+        a->transaction < b->transaction ? -1 : 1;
+}
+
+/* Maintain minimal unflushed transaction by a sorted list. First element in
+ * the list is the lowest unflushed transaction id. */
+static void
+ovsdb_monitor_table_track(struct ovsdb_monitor_table *mt, uint64_t transaction)
+{
+    struct ovsdb_monitor_txn_id *txn_id;
+    int i;
+
+    for (i = mt->n_txn_ids - 1; i >= 0; i--) {
+        txn_id = &mt->txn_ids[i];
+        if (txn_id->transaction == transaction) {
+            txn_id->n_refs++;
+            return;
+        }
+    }
+
+    if (mt->n_txn_ids >= mt->allocated_txn_ids) {
+        mt->txn_ids = x2nrealloc(mt->txn_ids, &mt->allocated_txn_ids,
+                                 sizeof *mt->txn_ids);
+    }
+
+    txn_id = &mt->txn_ids[mt->n_txn_ids++];
+    txn_id->n_refs = 1;
+    txn_id->transaction = transaction;
+
+    qsort(mt->txn_ids, mt->n_txn_ids, sizeof *mt->txn_ids,
+          compare_txn_ids);
+}
+
+/* Maintain sorted list of tracked transactions. First element in the list
+ * is the lowest unflushed transaction id. */
+static void
+ovsdb_monitor_table_untrack(struct ovsdb_monitor_table *mt, uint64_t transaction)
+{
+    int i;
+
+    for(i = 0; i < mt->n_txn_ids; i++) {
+        struct ovsdb_monitor_txn_id *txn_id = &mt->txn_ids[i];
+        if (txn_id->transaction == transaction) {
+            if (--txn_id->n_refs == 0) {
+                txn_id->transaction = ULONG_LONG_MAX;
+                qsort(mt->txn_ids, mt->n_txn_ids, sizeof *mt->txn_ids,
+                      compare_txn_ids);
+                mt->n_txn_ids--;
+            }
+            return;
+        }
+    }
+    OVS_NOT_REACHED();
+}
+
 static void
 ovsdb_monitor_clauses_track_update(const struct ovsdb_row *old,
                                    const struct ovsdb_row *new,
                                    struct ovsdb_monitor_table *mt)
 {
-    //struct ovsdb_monitor_txn_id *trk;
-    /* First element in list is minimal tracked transaction */
-    //INIT_CONTAINER(trk, &mt->tracked_txns, node);
     struct shash_node *node;
     struct ovsdb_monitor_changes *changes;
-    //uint64_t unflushed = trk->txn_id;
 
     /* Insert row to tracked columns */
     SHASH_FOR_EACH(node, &mt->trk_clauses) {
@@ -1406,6 +1474,13 @@  ovsdb_monitor_compose_update(
 
                         ovsdb_monitor_row_set_old(row, transaction);
                         if (row->transaction < transaction) {
+                            uint64_t lowest_txn_id = mt->txn_ids[0].transaction;
+
+                            /* Check for lazy cleanup */
+                            if (row->transaction < lowest_txn_id) {
+                                hmap_remove(&changes->rows, &row->hmap_node);
+                                ovsdb_monitor_row_destroy(mt, row);
+                            }
                             continue;
                         }
                         if (uuid_exists(&row_uuids, &row->uuid)) {
@@ -1452,7 +1527,8 @@  ovsdb_monitor_compose_update(
 static struct json*
 ovsdb_monitor_compose_all_rows_update(
                     struct ovsdb_monitor *dbmon,
-                    struct ovsdb_monitor_session_condition *condition)
+                    struct ovsdb_monitor_session_condition *condition,
+                    uint64_t next_txn)
 {
     struct shash_node *node;
     struct json *json = NULL;
@@ -1477,6 +1553,9 @@  ovsdb_monitor_compose_all_rows_update(
             }
         }
         ovsdb_monitor_table_condition_updated(mt, condition);
+        if (mt->track_clauses) {
+            ovsdb_monitor_table_track(mt, next_txn);
+        }
     }
     free(changed);
 
@@ -1526,7 +1605,7 @@  ovsdb_monitor_get_update(
             }
         } else {
             json =
-                ovsdb_monitor_compose_all_rows_update(dbmon, condition);
+                ovsdb_monitor_compose_all_rows_update(dbmon, condition, next_txn);
         }
     }
 
@@ -1536,7 +1615,13 @@  ovsdb_monitor_get_update(
 
         ovsdb_monitor_table_untrack_changes(mt, prev_txn);
         ovsdb_monitor_table_track_changes(mt, next_txn);
+        if (mt->track_clauses) {
+            /* We are tracking columns on this table */
+            ovsdb_monitor_table_untrack(mt, prev_txn);
+            ovsdb_monitor_table_track(mt, next_txn);
+        }
     }
+
     *unflushed = next_txn;
 
     return json;
@@ -1780,6 +1865,9 @@  ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
                 changes->n_refs++;
             }
         }
+        if (mt->track_clauses) {
+            ovsdb_monitor_table_track(mt, 0);
+        }
     }
 }
 
@@ -1933,6 +2021,8 @@  ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
             ovsdb_monitor_changes_destroy(changes);
         }
         hmap_destroy(&mt->changes);
+
+        free(mt->txn_ids);
         free(mt->columns);
         free(mt->columns_index_map);
         free(mt);