@@ -43,7 +43,7 @@ static struct uuid server_uuid;
static struct jsonrpc_session *session;
static unsigned int session_seqno = UINT_MAX;
-static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
+static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *);
static void add_monitored_table(struct ovsdb_table_schema *table,
struct json *monitor_requests);
@@ -100,16 +100,27 @@ enum ovsdb_replication_state {
static enum ovsdb_replication_state state;
+struct replication_db {
+ struct ovsdb *db;
+ bool schema_version_higher;
+ /* Points to the schema received from the active server if
+ * the local db schema version is higher. NULL otherwise. */
+ struct ovsdb_schema *active_db_schema;
+};
+
+static bool check_replication_possible(struct replication_db *,
+ struct ovsdb_schema *);
+
/* All DBs known to ovsdb-server. The actual replication dbs are stored
* in 'replication dbs', which is a subset of all dbs and remote dbs whose
* schema matches. */
static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
static struct shash *replication_dbs;
-static struct shash *replication_db_clone(struct shash *dbs);
+static struct shash *replication_dbs_create(void);
static void replication_dbs_destroy(void);
/* Find 'struct ovsdb' by name within 'replication_dbs' */
-static struct ovsdb* find_db(const char *db_name);
+static struct replication_db *find_db(const char *db_name);
void
@@ -152,8 +163,8 @@ send_schema_requests(const struct json *result)
if (name->type == JSON_STRING) {
/* Send one schema request for each remote DB. */
const char *db_name = json_string(name);
- struct ovsdb *db = find_db(db_name);
- if (db) {
+ struct replication_db *rdb = find_db(db_name);
+ if (rdb) {
struct jsonrpc_msg *request =
jsonrpc_create_request(
"get_schema",
@@ -161,7 +172,7 @@ send_schema_requests(const struct json *result)
json_string_create(db_name)),
NULL);
- request_ids_add(request->id, db);
+ request_ids_add(request->id, rdb->db);
jsonrpc_session_send(session, request);
}
}
@@ -206,11 +217,11 @@ replication_run(void)
&& msg->params->array.n == 2
&& msg->params->array.elems[0]->type == JSON_STRING) {
char *db_name = msg->params->array.elems[0]->string;
- struct ovsdb *db = find_db(db_name);
- if (db) {
+ struct replication_db *rdb = find_db(db_name);
+ if (rdb) {
struct ovsdb_error *error;
error = process_notification(msg->params->array.elems[1],
- db);
+ rdb->db);
if (error) {
ovsdb_error_assert(error);
state = RPL_S_ERR;
@@ -218,6 +229,7 @@ replication_run(void)
}
}
} else if (msg->type == JSONRPC_REPLY) {
+ struct replication_db *rdb;
struct ovsdb *db;
if (!request_ids_lookup_and_free(msg->id, &db)) {
VLOG_WARN("received unexpected reply");
@@ -256,7 +268,7 @@ replication_run(void)
jsonrpc_session_send(session, request);
replication_dbs_destroy();
- replication_dbs = replication_db_clone(&local_dbs);
+ replication_dbs = replication_dbs_create();
state = RPL_S_DB_REQUESTED;
break;
}
@@ -284,17 +296,37 @@ replication_run(void)
state = RPL_S_ERR;
}
- if (db != find_db(schema->name)) {
+ rdb = find_db(schema->name);
+ if (!rdb) {
/* Unexpected schema. */
VLOG_WARN("unexpected schema %s", schema->name);
state = RPL_S_ERR;
- } else if (!ovsdb_schema_equal(schema, db->schema)) {
+ } else if (!ovsdb_schema_equal(schema, rdb->db->schema)) {
/* Schmea version mismatch. */
- VLOG_INFO("Schema version mismatch, %s not replicated",
+ VLOG_INFO("Schema version mismatch, checking if %s can "
+ "still be replicated or not.",
schema->name);
- shash_find_and_delete(replication_dbs, schema->name);
+ if (check_replication_possible(rdb, schema)) {
+ VLOG_INFO("%s can be replicated.", schema->name);
+ rdb->schema_version_higher = true;
+ if (rdb->active_db_schema) {
+ ovsdb_schema_destroy(rdb->active_db_schema);
+ }
+ rdb->active_db_schema = schema;
+ } else {
+ VLOG_INFO("%s cannot be replicated.", schema->name);
+ struct replication_db *r =
+ shash_find_and_delete(replication_dbs,
+ schema->name);
+ if (r->active_db_schema) {
+ ovsdb_schema_destroy(r->active_db_schema);
+ }
+ free(r);
+ ovsdb_schema_destroy(schema);
+ }
+ } else {
+ ovsdb_schema_destroy(schema);
}
- ovsdb_schema_destroy(schema);
/* After receiving schemas, reset the local databases that
* will be monitored and send out monitor requests for them. */
@@ -306,11 +338,13 @@ replication_run(void)
state = RPL_S_ERR;
} else {
SHASH_FOR_EACH (node, replication_dbs) {
- db = node->data;
+ rdb = node->data;
struct jsonrpc_msg *request =
- create_monitor_request(db);
+ create_monitor_request(
+ rdb->schema_version_higher ?
+ rdb->active_db_schema : rdb->db->schema);
- request_ids_add(request->id, db);
+ request_ids_add(request->id, rdb->db);
jsonrpc_session_send(session, request);
VLOG_DBG("Send monitor requests");
state = RPL_S_MONITOR_REQUESTED;
@@ -509,7 +543,7 @@ replication_destroy(void)
shash_destroy(&local_dbs);
}
-static struct ovsdb *
+static struct replication_db *
find_db(const char *db_name)
{
return shash_find_data(replication_dbs, db_name);
@@ -541,11 +575,10 @@ reset_database(struct ovsdb *db)
* Caller is responsible for disposing 'request'.
*/
static struct jsonrpc_msg *
-create_monitor_request(struct ovsdb *db)
+create_monitor_request(struct ovsdb_schema *schema)
{
struct jsonrpc_msg *request;
struct json *monitor;
- struct ovsdb_schema *schema = db->schema;
const char *db_name = schema->name;
struct json *monitor_request = json_object_create();
@@ -779,14 +812,18 @@ request_ids_clear(void)
}
static struct shash *
-replication_db_clone(struct shash *dbs)
+replication_dbs_create(void)
{
struct shash *new = xmalloc(sizeof *new);
shash_init(new);
struct shash_node *node;
- SHASH_FOR_EACH (node, dbs) {
- shash_add(new, node->name, node->data);
+ SHASH_FOR_EACH (node, &local_dbs) {
+ struct replication_db *repl_db = xmalloc(sizeof *repl_db);
+ repl_db->db = node->data;
+ repl_db->schema_version_higher = false;
+ repl_db->active_db_schema = NULL;
+ shash_add(new, node->name, repl_db);
}
return new;
@@ -795,7 +832,24 @@ replication_db_clone(struct shash *dbs)
static void
replication_dbs_destroy(void)
{
- shash_destroy(replication_dbs);
+ if (!replication_dbs) {
+ return;
+ }
+
+ struct shash_node *node, *next;
+
+ SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
+ hmap_remove(&replication_dbs->map, &node->node);
+ struct replication_db *rdb = node->data;
+ if (rdb->active_db_schema) {
+ ovsdb_schema_destroy(rdb->active_db_schema);
+ }
+ free(rdb);
+ free(node->name);
+ free(node);
+ }
+
+ hmap_destroy(&replication_dbs->map);
free(replication_dbs);
replication_dbs = NULL;
}
@@ -877,6 +931,67 @@ replication_status(void)
return ds_steal_cstr(&ds);
}
+/* Checks if its possible to replicate to the local db from the active db
+ * schema.
+ *
+ * Returns true, if
+ * - local db schema version is greater than or equal to the schema version
+ * in the 'active_db_schema'.
+ * - The tables of 'active_db_schema' are present in the local db schema.
+ * False, otherwise.
+ */
+
+static bool
+check_replication_possible(struct replication_db *rdb,
+ struct ovsdb_schema *active_db_schema)
+{
+ char *local_db_schema_ver = xstrdup(rdb->db->schema->version);
+ char *active_db_schema_ver = xstrdup(active_db_schema->version);
+ char *save_ptr = NULL;
+ bool repl_possible = false;
+
+ char *token = local_db_schema_ver;
+ char *ldb_ver_x = strtok_r(token, ".", &save_ptr);
+
+ if (!ldb_ver_x) {
+ goto exit;
+ }
+
+ char *ldb_ver_y = strtok_r(NULL, ".", &save_ptr);
+ if (!ldb_ver_y) {
+ goto exit;
+ }
+
+ token = active_db_schema_ver;
+ save_ptr = NULL;
+ char *active_ver_x = strtok_r(token, ".", &save_ptr);
+
+ if (!active_ver_x || strcmp(active_ver_x, ldb_ver_x)) {
+ goto exit;
+ }
+
+ char *active_ver_y = strtok_r(NULL, ".", &save_ptr);
+
+ if (!active_ver_y || (atoi(ldb_ver_y) < atoi(active_ver_y))) {
+ goto exit;
+ }
+
+ repl_possible = true;
+ /* Verify that the local db schema has all tables of 'active_db_schema'. */
+ struct shash_node *node;
+ SHASH_FOR_EACH (node, &active_db_schema->tables) {
+ if (!shash_find(&rdb->db->schema->tables, node->name)) {
+ repl_possible = false;
+ break;
+ }
+ }
+
+exit:
+ free(local_db_schema_ver);
+ free(active_db_schema_ver);
+ return repl_possible;
+}
+
void
replication_usage(void)
{