@@ -148,6 +148,12 @@ struct ovsdb_monitor_changes {
hmap. */
};
+/* Track of tracked txn_ids */
+struct ovsdb_monitor_txn_id {
+ int n_refs;
+ uint64_t transaction;
+};
+
/* A particular table being monitored. */
struct ovsdb_monitor_table {
const struct ovsdb_table *table;
@@ -173,6 +179,11 @@ struct ovsdb_monitor_table {
/* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
struct hmap changes;
bool track_clauses;
+ /* Holds tracked txn_ids, first element is the minimal txn_id
+ * that is being tracked */
+ struct ovsdb_monitor_txn_id *txn_ids;
+ size_t allocated_txn_ids;
+ size_t n_txn_ids;
/* Contains ovsdb_tracked_clauses * */
struct shash trk_clauses;
};
@@ -769,17 +780,74 @@ ovsdb_monitor_table_find_clause_changes(
return changes;
}
+static int
+compare_txn_ids(const void *a_, const void *b_)
+{
+ const struct ovsdb_monitor_txn_id *a = a_;
+ const struct ovsdb_monitor_txn_id *b = b_;
+
+ return a->transaction == b->transaction ? 0 :
+ a->transaction < b->transaction ? -1 : 1;
+}
+
+/* Maintain minimal unflushed transaction by a sorted list. First element in
+ * the list is the lowest unflushed transaction id. */
+static void
+ovsdb_monitor_table_track(struct ovsdb_monitor_table *mt, uint64_t transaction)
+{
+ struct ovsdb_monitor_txn_id *txn_id;
+ int i;
+
+ for (i = mt->n_txn_ids - 1; i >= 0; i--) {
+ txn_id = &mt->txn_ids[i];
+ if (txn_id->transaction == transaction) {
+ txn_id->n_refs++;
+ return;
+ }
+ }
+
+ if (mt->n_txn_ids >= mt->allocated_txn_ids) {
+ mt->txn_ids = x2nrealloc(mt->txn_ids, &mt->allocated_txn_ids,
+ sizeof *mt->txn_ids);
+ }
+
+ txn_id = &mt->txn_ids[mt->n_txn_ids++];
+ txn_id->n_refs = 1;
+ txn_id->transaction = transaction;
+
+ qsort(mt->txn_ids, mt->n_txn_ids, sizeof *mt->txn_ids,
+ compare_txn_ids);
+}
+
+/* Maintain sorted list of tracked transactions. First element in the list
+ * is the lowest unflushed transaction id. */
+static void
+ovsdb_monitor_table_untrack(struct ovsdb_monitor_table *mt, uint64_t transaction)
+{
+ int i;
+
+ for(i = 0; i < mt->n_txn_ids; i++) {
+ struct ovsdb_monitor_txn_id *txn_id = &mt->txn_ids[i];
+ if (txn_id->transaction == transaction) {
+ if (--txn_id->n_refs == 0) {
+ txn_id->transaction = ULONG_LONG_MAX;
+ qsort(mt->txn_ids, mt->n_txn_ids, sizeof *mt->txn_ids,
+ compare_txn_ids);
+ mt->n_txn_ids--;
+ }
+ return;
+ }
+ }
+ OVS_NOT_REACHED();
+}
+
static void
ovsdb_monitor_clauses_track_update(const struct ovsdb_row *old,
const struct ovsdb_row *new,
struct ovsdb_monitor_table *mt)
{
- //struct ovsdb_monitor_txn_id *trk;
- /* First element in list is minimal tracked transaction */
- //INIT_CONTAINER(trk, &mt->tracked_txns, node);
struct shash_node *node;
struct ovsdb_monitor_changes *changes;
- //uint64_t unflushed = trk->txn_id;
/* Insert row to tracked columns */
SHASH_FOR_EACH(node, &mt->trk_clauses) {
@@ -1406,6 +1474,13 @@ ovsdb_monitor_compose_update(
ovsdb_monitor_row_set_old(row, transaction);
if (row->transaction < transaction) {
+ uint64_t lowest_txn_id = mt->txn_ids[0].transaction;
+
+ /* Check for lazy cleanup */
+ if (row->transaction < lowest_txn_id) {
+ hmap_remove(&changes->rows, &row->hmap_node);
+ ovsdb_monitor_row_destroy(mt, row);
+ }
continue;
}
if (uuid_exists(&row_uuids, &row->uuid)) {
@@ -1452,7 +1527,8 @@ ovsdb_monitor_compose_update(
static struct json*
ovsdb_monitor_compose_all_rows_update(
struct ovsdb_monitor *dbmon,
- struct ovsdb_monitor_session_condition *condition)
+ struct ovsdb_monitor_session_condition *condition,
+ uint64_t next_txn)
{
struct shash_node *node;
struct json *json = NULL;
@@ -1477,6 +1553,9 @@ ovsdb_monitor_compose_all_rows_update(
}
}
ovsdb_monitor_table_condition_updated(mt, condition);
+ if (mt->track_clauses) {
+ ovsdb_monitor_table_track(mt, next_txn);
+ }
}
free(changed);
@@ -1526,7 +1605,7 @@ ovsdb_monitor_get_update(
}
} else {
json =
- ovsdb_monitor_compose_all_rows_update(dbmon, condition);
+ ovsdb_monitor_compose_all_rows_update(dbmon, condition, next_txn);
}
}
@@ -1536,7 +1615,13 @@ ovsdb_monitor_get_update(
ovsdb_monitor_table_untrack_changes(mt, prev_txn);
ovsdb_monitor_table_track_changes(mt, next_txn);
+ if (mt->track_clauses) {
+ /* We are tracking columns on this table */
+ ovsdb_monitor_table_untrack(mt, prev_txn);
+ ovsdb_monitor_table_track(mt, next_txn);
+ }
}
+
*unflushed = next_txn;
return json;
@@ -1780,6 +1865,9 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
changes->n_refs++;
}
}
+ if (mt->track_clauses) {
+ ovsdb_monitor_table_track(mt, 0);
+ }
}
}
@@ -1933,6 +2021,8 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
ovsdb_monitor_changes_destroy(changes);
}
hmap_destroy(&mt->changes);
+
+ free(mt->txn_ids);
free(mt->columns);
free(mt->columns_index_map);
free(mt);
Maintain the minimal transaction ID per "==" table. Run lazy cleanup while going over the monitor rows tracked by clauses remove all rows that have a lower transaction than the minimal table transaction ID. Signed-off-by: Liran Schour <lirans@il.ibm.com> --- ovsdb/monitor.c | 102 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 96 insertions(+), 6 deletions(-)