@@ -159,6 +159,9 @@ int64_t xbzrle_cache_resize(int64_t new_size);
void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
void ram_control_load_hook(QEMUFile *f, uint64_t flags);
+void ram_control_add(QEMUFile *f, void *host_addr,
+ ram_addr_t block_offset, uint64_t length);
+void ram_control_remove(QEMUFile *f, ram_addr_t block_offset);
/* Whenever this is found in the data stream, the flags
* will be passed to ram_control_load_hook in the incoming-migration
@@ -171,6 +174,8 @@ void ram_control_load_hook(QEMUFile *f, uint64_t flags);
#define RAM_SAVE_CONTROL_DELAYED -2000
#define RAM_LOAD_CONTROL_NOT_SUPP -3000
#define RAM_LOAD_CONTROL_DELAYED -4000
+#define RAM_COPY_CONTROL_NOT_SUPP -5000
+#define RAM_COPY_CONTROL_DELAYED -6000
#define RDMA_CONTROL_VERSION_CURRENT 1
@@ -182,4 +187,11 @@ int ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
int ram_control_load_page(QEMUFile *f,
void *host_addr,
long size);
+
+int ram_control_copy_page(QEMUFile *f,
+ ram_addr_t block_offset_dest,
+ ram_addr_t offset_dest,
+ ram_addr_t block_offset_source,
+ ram_addr_t offset_source,
+ long size);
#endif
@@ -92,6 +92,41 @@ typedef int (QEMURamLoadFunc)(QEMUFile *f,
void *host_addr,
long size);
+/*
+ * This function allows *local* RDMA copying memory between two registered
+ * RAMBlocks, both real ones as well as private memory areas independently
+ * registered by external callers (such as MC). If RDMA is not available,
+ * then this function does nothing and the caller should just use memcpy().
+ */
+typedef int (QEMURamCopyFunc)(QEMUFile *f, void *opaque,
+ ram_addr_t block_offset_dest,
+ ram_addr_t offset_dest,
+ ram_addr_t block_offset_source,
+ ram_addr_t offset_source,
+ long size);
+
+/*
+ * Inform the underlying transport of a new virtual memory area.
+ * If this area is an actual RAMBlock, then pass the corresponding
+ * parameters of that block.
+ * If this area is an arbitrary virtual memory address, then
+ * pass the same value for both @host_addr and @block_offset.
+ */
+typedef int (QEMURamAddFunc)(QEMUFile *f, void *opaque,
+ void *host_addr,
+ ram_addr_t block_offset,
+ uint64_t length);
+
+/*
+ * Remove an underlying new virtual memory area.
+ * If this area is an actual RAMBlock, then pass the corresponding
+ * parameters of that block.
+ * If this area is an arbitrary virtual memory address, then
+ * pass the same value for both @host_addr and @block_offset.
+ */
+typedef int (QEMURamRemoveFunc)(QEMUFile *f, void *opaque,
+ ram_addr_t block_offset);
+
typedef struct QEMUFileOps {
QEMUFilePutBufferFunc *put_buffer;
QEMUFileGetBufferFunc *get_buffer;
@@ -103,6 +138,9 @@ typedef struct QEMUFileOps {
QEMURamHookFunc *hook_ram_load;
QEMURamSaveFunc *save_page;
QEMURamLoadFunc *load_page;
+ QEMURamCopyFunc *copy_page;
+ QEMURamAddFunc *add;
+ QEMURamRemoveFunc *remove;
} QEMUFileOps;
QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
@@ -27,32 +27,40 @@
#include <string.h>
#include <rdma/rdma_cma.h>
+/*
+ * Ability to runtime-enable debug statements while inside GDB.
+ * Choices are 1, 2, or 3 (so far).
+ */
+static int rdma_debug = 0;
+
//#define DEBUG_RDMA
//#define DEBUG_RDMA_VERBOSE
//#define DEBUG_RDMA_REALLY_VERBOSE
+#define RPRINTF(fmt, ...) printf("rdma: " fmt, ## __VA_ARGS__)
+
#ifdef DEBUG_RDMA
#define DPRINTF(fmt, ...) \
- do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+ do { RPRINTF(fmt, ## __VA_ARGS__); } while (0)
#else
#define DPRINTF(fmt, ...) \
- do { } while (0)
+ do { if (rdma_debug >= 1) RPRINTF(fmt, ## __VA_ARGS__); } while (0)
#endif
#ifdef DEBUG_RDMA_VERBOSE
#define DDPRINTF(fmt, ...) \
- do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+ do { RPRINTF(fmt, ## __VA_ARGS__); } while (0)
#else
#define DDPRINTF(fmt, ...) \
- do { } while (0)
+ do { if (rdma_debug >= 2) RPRINTF(fmt, ## __VA_ARGS__); } while (0)
#endif
#ifdef DEBUG_RDMA_REALLY_VERBOSE
#define DDDPRINTF(fmt, ...) \
- do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
+ do { RPRINTF(fmt, ## __VA_ARGS__); } while (0)
#else
#define DDDPRINTF(fmt, ...) \
- do { } while (0)
+ do { if (rdma_debug >= 3) RPRINTF(fmt, ## __VA_ARGS__); } while (0)
#endif
/*
@@ -60,17 +68,20 @@
*/
#define ERROR(errp, fmt, ...) \
do { \
+ Error **e = errp; \
fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
- if (errp && (*(errp) == NULL)) { \
- error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
+ if (e && ((*e) == NULL)) { \
+ error_setg(e, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
} \
} while (0)
+#define SET_ERROR(rdma, err) if (!rdma->error_state) rdma->error_state = err
+
#define RDMA_RESOLVE_TIMEOUT_MS 10000
/* Do not merge data if larger than this. */
#define RDMA_MERGE_MAX (2 * 1024 * 1024)
-#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
+#define RDMA_SEND_MAX (RDMA_MERGE_MAX / 4096)
#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
@@ -87,18 +98,30 @@
*/
#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
-
-#define RDMA_CONTROL_VERSION_CURRENT 1
/*
* Capabilities for negotiation.
*/
#define RDMA_CAPABILITY_PIN_ALL 0x01
+#define RDMA_CAPABILITY_KEEPALIVE 0x02
+
+/*
+ * Max # missed keepalive before we assume remote side is unavailable.
+ */
+#define RDMA_CONNECTION_INTERVAL_MS 300
+#define RDMA_KEEPALIVE_INTERVAL_MS 300
+#define RDMA_KEEPALIVE_FIRST_MISSED_OFFSET 1000
+#define RDMA_MAX_LOST_KEEPALIVE 10
+#define RDMA_MAX_STARTUP_MISSED_KEEPALIVE 100
/*
* Add the other flags above to this list of known capabilities
* as they are introduced.
*/
-static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
+static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL
+ | RDMA_CAPABILITY_KEEPALIVE
+ ;
+static QEMUTimer *connection_timer = NULL;
+static QEMUTimer *keepalive_timer = NULL;
#define CHECK_ERROR_STATE() \
do { \
@@ -143,14 +166,18 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
*/
enum {
RDMA_WRID_NONE = 0,
- RDMA_WRID_RDMA_WRITE = 1,
+ RDMA_WRID_RDMA_WRITE_REMOTE = 1,
+ RDMA_WRID_RDMA_WRITE_LOCAL = 2,
+ RDMA_WRID_RDMA_KEEPALIVE = 3,
RDMA_WRID_SEND_CONTROL = 2000,
RDMA_WRID_RECV_CONTROL = 4000,
};
const char *wrid_desc[] = {
[RDMA_WRID_NONE] = "NONE",
- [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
+ [RDMA_WRID_RDMA_WRITE_REMOTE] = "WRITE RDMA REMOTE",
+ [RDMA_WRID_RDMA_WRITE_LOCAL] = "WRITE RDMA LOCAL",
+ [RDMA_WRID_RDMA_KEEPALIVE] = "KEEPALIVE",
[RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
[RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
};
@@ -216,21 +243,41 @@ typedef struct {
/*
* Negotiate RDMA capabilities during connection-setup time.
*/
-typedef struct {
+typedef struct QEMU_PACKED RDMACapabilities {
uint32_t version;
uint32_t flags;
+ uint32_t keepalive_rkey;
+ uint64_t keepalive_addr;
} RDMACapabilities;
+static uint64_t htonll(uint64_t v)
+{
+ union { uint32_t lv[2]; uint64_t llv; } u;
+ u.lv[0] = htonl(v >> 32);
+ u.lv[1] = htonl(v & 0xFFFFFFFFULL);
+ return u.llv;
+}
+
+static uint64_t ntohll(uint64_t v) {
+ union { uint32_t lv[2]; uint64_t llv; } u;
+ u.llv = v;
+ return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
+}
+
static void caps_to_network(RDMACapabilities *cap)
{
cap->version = htonl(cap->version);
cap->flags = htonl(cap->flags);
+ cap->keepalive_rkey = htonl(cap->keepalive_rkey);
+ cap->keepalive_addr = htonll(cap->keepalive_addr);
}
static void network_to_caps(RDMACapabilities *cap)
{
cap->version = ntohl(cap->version);
cap->flags = ntohl(cap->flags);
+ cap->keepalive_rkey = ntohl(cap->keepalive_rkey);
+ cap->keepalive_addr = ntohll(cap->keepalive_addr);
}
/*
@@ -245,11 +292,15 @@ typedef struct RDMALocalBlock {
uint64_t remote_host_addr; /* remote virtual address */
uint64_t offset;
uint64_t length;
- struct ibv_mr **pmr; /* MRs for chunk-level registration */
- struct ibv_mr *mr; /* MR for non-chunk-level registration */
- uint32_t *remote_keys; /* rkeys for chunk-level registration */
- uint32_t remote_rkey; /* rkeys for non-chunk-level registration */
- int index; /* which block are we */
+ struct ibv_mr **pmr; /* MRs for remote chunk-level registration */
+ struct ibv_mr *mr; /* MR for non-chunk-level registration */
+ struct ibv_mr **pmr_src; /* MRs for copy chunk-level registration */
+ struct ibv_mr *mr_src; /* MR for copy non-chunk-level registration */
+ struct ibv_mr **pmr_dest; /* MRs for copy chunk-level registration */
+ struct ibv_mr *mr_dest; /* MR for copy non-chunk-level registration */
+ uint32_t *remote_keys; /* rkeys for chunk-level registration */
+ uint32_t remote_rkey; /* rkeys for non-chunk-level registration */
+ int index; /* which block are we */
bool is_ram_block;
int nb_chunks;
unsigned long *transit_bitmap;
@@ -271,20 +322,6 @@ typedef struct QEMU_PACKED RDMARemoteBlock {
uint32_t padding;
} RDMARemoteBlock;
-static uint64_t htonll(uint64_t v)
-{
- union { uint32_t lv[2]; uint64_t llv; } u;
- u.lv[0] = htonl(v >> 32);
- u.lv[1] = htonl(v & 0xFFFFFFFFULL);
- return u.llv;
-}
-
-static uint64_t ntohll(uint64_t v) {
- union { uint32_t lv[2]; uint64_t llv; } u;
- u.llv = v;
- return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
-}
-
static void remote_block_to_network(RDMARemoteBlock *rb)
{
rb->remote_host_addr = htonll(rb->remote_host_addr);
@@ -313,10 +350,69 @@ typedef struct RDMALocalBlocks {
} RDMALocalBlocks;
/*
+ * We provide RDMA to QEMU by way of 2 mechanisms:
+ *
+ * 1. Local copy to remote copy
+ * 2. Local copy to local copy - like memcpy().
+ *
+ * Three instances of this structure are maintained inside of RDMAContext
+ * to manage both mechanisms.
+ */
+typedef struct RDMACurrentChunk {
+ /* store info about current buffer so that we can
+ merge it with future sends */
+ uint64_t current_addr;
+ uint64_t current_length;
+ /* index of ram block the current buffer belongs to */
+ int current_block_idx;
+ /* index of the chunk in the current ram block */
+ int current_chunk;
+
+ uint64_t block_offset;
+ uint64_t offset;
+
+ /* parameters for qemu_rdma_write() */
+ uint64_t chunk_idx;
+ uint8_t *chunk_start;
+ uint8_t *chunk_end;
+ RDMALocalBlock *block;
+ uint8_t *addr;
+ uint64_t chunks;
+} RDMACurrentChunk;
+
+/*
+ * Three copies of the following strucuture are used to hold the infiniband
+ * connection variables for each of the aformentioned mechanisms, one for
+ * remote copy and two local copy.
+ */
+typedef struct RDMALocalContext {
+ struct ibv_context *verbs;
+ struct ibv_pd *pd;
+ struct ibv_comp_channel *comp_chan;
+ struct ibv_cq *cq;
+ struct ibv_qp_init_attr qp_attr;
+ struct ibv_qp *qp;
+ union ibv_gid gid;
+ struct ibv_port_attr port;
+ uint64_t psn;
+ int port_num;
+ int nb_sent;
+ int64_t start_time;
+ int max_nb_sent;
+ const char * id_str;
+} RDMALocalContext;
+
+/*
* Main data structure for RDMA state.
* While there is only one copy of this structure being allocated right now,
* this is the place where one would start if you wanted to consider
* having more than one RDMA connection open at the same time.
+ *
+ * It is used for performing both local and remote RDMA operations
+ * with a single RDMA connection.
+ *
+ * Local operations are done by allocating separate queue pairs after
+ * the initial RDMA remote connection is initalized.
*/
typedef struct RDMAContext {
char *host;
@@ -333,19 +429,15 @@ typedef struct RDMAContext {
*/
int control_ready_expected;
- /* number of outstanding writes */
+ /* number of posts */
int nb_sent;
- /* store info about current buffer so that we can
- merge it with future sends */
- uint64_t current_addr;
- uint64_t current_length;
- /* index of ram block the current buffer belongs to */
- int current_index;
- /* index of the chunk in the current ram block */
- int current_chunk;
+ RDMACurrentChunk chunk_remote;
+ RDMACurrentChunk chunk_local_src;
+ RDMACurrentChunk chunk_local_dest;
bool pin_all;
+ bool do_keepalive;
/*
* infiniband-specific variables for opening the device
@@ -384,17 +476,200 @@ typedef struct RDMAContext {
* Then use coroutine yield function.
* Source runs in a thread, so we don't care.
*/
- int migration_started_on_destination;
+ bool migration_started;
int total_registrations;
int total_writes;
int unregister_current, unregister_next;
- uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
+ uint64_t unregistrations[RDMA_SEND_MAX];
GHashTable *blockmap;
+
+ uint64_t keepalive;
+ uint64_t last_keepalive;
+ uint64_t nb_missed_keepalive;
+ uint64_t next_keepalive;
+ struct ibv_mr *keepalive_mr;
+ struct ibv_mr *next_keepalive_mr;
+ uint32_t keepalive_rkey;
+ uint64_t keepalive_addr;
+ bool keepalive_startup;
+
+ RDMALocalContext lc_src;
+ RDMALocalContext lc_dest;
+ RDMALocalContext lc_remote;
+
+ /* who are we? */
+ bool source;
+ bool dest;
} RDMAContext;
+static void close_ibv(RDMAContext *rdma, RDMALocalContext *lc)
+{
+
+ if (lc->qp) {
+ struct ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR };
+ ibv_modify_qp(lc->qp, &attr, IBV_QP_STATE);
+ ibv_destroy_qp(lc->qp);
+ lc->qp = NULL;
+ }
+
+ if (lc->cq) {
+ ibv_destroy_cq(lc->cq);
+ lc->cq = NULL;
+ }
+
+ if (lc->comp_chan) {
+ ibv_destroy_comp_channel(lc->comp_chan);
+ lc->comp_chan = NULL;
+ }
+
+ if (lc->pd) {
+ ibv_dealloc_pd(lc->pd);
+ lc->pd = NULL;
+ }
+
+ if (lc->verbs) {
+ ibv_close_device(lc->verbs);
+ lc->verbs = NULL;
+ }
+}
+
+/*
+ * Create protection domain and completion queues
+ */
+static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma, RDMALocalContext *lc)
+{
+ struct rlimit r = { .rlim_cur = RLIM_INFINITY, .rlim_max = RLIM_INFINITY };
+
+ if (getrlimit(RLIMIT_MEMLOCK, &r) < 0) {
+ perror("getrlimit");
+ ERROR(NULL, "getrlimit(RLIMIT_MEMLOCK)");
+ goto err_alloc;
+ }
+
+ DPRINTF("MemLock Limits cur: %" PRId64 " max: %" PRId64 "\n",
+ r.rlim_cur, r.rlim_max);
+
+ lc->pd = ibv_alloc_pd(lc->verbs);
+ if (!lc->pd) {
+ ERROR(NULL, "allocate protection domain");
+ goto err_alloc;
+ }
+
+ /* create completion channel */
+ lc->comp_chan = ibv_create_comp_channel(lc->verbs);
+ if (!lc->comp_chan) {
+ ERROR(NULL, "allocate completion channel");
+ goto err_alloc;
+ }
+
+ /*
+ * Completion queue can be filled by both read and write work requests,
+ * so must reflect the sum of both possible queue sizes.
+ */
+ lc->cq = ibv_create_cq(lc->verbs, (RDMA_SEND_MAX * 3), NULL,
+ lc->comp_chan, 0);
+ if (!lc->cq) {
+ ERROR(NULL, "allocate completion queue");
+ goto err_alloc;
+ }
+
+ return 0;
+
+err_alloc:
+ close_ibv(rdma, lc);
+ return -EINVAL;
+}
+
+static int open_local(RDMAContext *rdma, RDMALocalContext *lc)
+{
+ struct ibv_qp_attr set_attr = {
+ .qp_state = IBV_QPS_INIT,
+ .pkey_index = 0,
+ .qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE,
+ };
+ struct ibv_qp_attr query_attr;
+ struct ibv_qp_init_attr query_init_attr;
+ int ret;
+
+ lc->psn = lrand48() & 0xffffff;
+
+ ret = ibv_query_qp(rdma->lc_remote.qp, &query_attr, IBV_QP_PORT, &query_init_attr);
+
+ if (ret) {
+ ret = EINVAL;
+ ERROR(NULL, "query original QP state");
+ goto err;
+ }
+
+ lc->port_num = query_attr.port_num;
+ set_attr.port_num = lc->port_num;
+
+ lc->verbs = ibv_open_device(rdma->lc_remote.verbs->device);
+
+ if(lc->verbs == NULL) {
+ ret = EINVAL;
+ ERROR(NULL, "open device!");
+ goto err;
+ }
+
+ ret = qemu_rdma_alloc_pd_cq(rdma, lc);
+
+ if (ret) {
+ ret = -ret;
+ ERROR(NULL, "Local ibv structure allocations");
+ goto err;
+ }
+
+ if (rdma->dest) {
+ qemu_set_nonblock(lc->comp_chan->fd);
+ }
+
+ lc->qp_attr.cap.max_send_wr = RDMA_SEND_MAX;
+ lc->qp_attr.cap.max_recv_wr = 3;
+ lc->qp_attr.cap.max_send_sge = 1;
+ lc->qp_attr.cap.max_recv_sge = 1;
+ lc->qp_attr.send_cq = lc->cq;
+ lc->qp_attr.recv_cq = lc->cq;
+ lc->qp_attr.qp_type = IBV_QPT_RC;
+
+ lc->qp = ibv_create_qp(lc->pd, &lc->qp_attr);
+ if (!lc->qp) {
+ ret = EINVAL;
+ ERROR(NULL, "create queue pair!");
+ goto err;
+ }
+
+ ret = ibv_modify_qp(lc->qp, &set_attr,
+ IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS);
+
+ if (ret) {
+ ERROR(NULL, "verbs to init!");
+ goto err;
+ }
+
+ ret = ibv_query_port(lc->verbs, lc->port_num, &lc->port);
+
+ if (ret) {
+ ERROR(NULL, "query port attributes!");
+ goto err;
+ }
+
+ ret = ibv_query_gid(lc->verbs, 1, 0, &lc->gid);
+
+ if (ret) {
+ ERROR(NULL, "Failed to query gid!");
+ goto err;
+ }
+
+ return 0;
+err:
+ SET_ERROR(rdma, -ret);
+ return rdma->error_state;
+}
+
/*
* Interface to the rest of the migration call stack.
*/
@@ -440,7 +715,7 @@ typedef struct QEMU_PACKED {
uint64_t current_addr; /* offset into the ramblock of the chunk */
uint64_t chunk; /* chunk to lookup if unregistering */
} key;
- uint32_t current_index; /* which ramblock the chunk belongs to */
+ uint32_t current_block_idx; /* which ramblock the chunk belongs to */
uint32_t padding;
uint64_t chunks; /* how many sequential chunks to register */
} RDMARegister;
@@ -448,14 +723,14 @@ typedef struct QEMU_PACKED {
static void register_to_network(RDMARegister *reg)
{
reg->key.current_addr = htonll(reg->key.current_addr);
- reg->current_index = htonl(reg->current_index);
+ reg->current_block_idx = htonl(reg->current_block_idx);
reg->chunks = htonll(reg->chunks);
}
static void network_to_register(RDMARegister *reg)
{
reg->key.current_addr = ntohll(reg->key.current_addr);
- reg->current_index = ntohl(reg->current_index);
+ reg->current_block_idx = ntohl(reg->current_block_idx);
reg->chunks = ntohll(reg->chunks);
}
@@ -578,10 +853,10 @@ static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
- DDPRINTF("Added Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
- " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
- local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
- block->length, (uint64_t) (block->local_host_addr + block->length),
+ DDPRINTF("Added Block: %d, addr: %p, offset: %" PRIu64
+ " length: %" PRIu64 " end: %p bits %" PRIu64 " chunks %d\n",
+ local->nb_blocks, block->local_host_addr, block->offset,
+ block->length, block->local_host_addr + block->length,
BITS_TO_LONGS(block->nb_chunks) *
sizeof(unsigned long) * 8, block->nb_chunks);
@@ -621,35 +896,51 @@ static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
return 0;
}
-static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
+static void qemu_rdma_free_pmrs(RDMAContext *rdma, RDMALocalBlock *block,
+ struct ibv_mr ***mrs)
{
- RDMALocalBlocks *local = &rdma->local_ram_blocks;
- RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
- (void *) block_offset);
- RDMALocalBlock *old = local->block;
- int x;
-
- assert(block);
-
- if (block->pmr) {
+ if (*mrs) {
int j;
for (j = 0; j < block->nb_chunks; j++) {
- if (!block->pmr[j]) {
+ if (!(*mrs)[j]) {
continue;
}
- ibv_dereg_mr(block->pmr[j]);
+ ibv_dereg_mr((*mrs)[j]);
rdma->total_registrations--;
}
- g_free(block->pmr);
- block->pmr = NULL;
+ g_free(*mrs);
+
+ *mrs = NULL;
}
+}
- if (block->mr) {
- ibv_dereg_mr(block->mr);
+static void qemu_rdma_free_mr(RDMAContext *rdma, struct ibv_mr **mr)
+{
+ if (*mr) {
+ ibv_dereg_mr(*mr);
rdma->total_registrations--;
- block->mr = NULL;
+ *mr = NULL;
}
+}
+
+static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
+{
+ RDMALocalBlocks *local = &rdma->local_ram_blocks;
+ RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
+ (void *) block_offset);
+ RDMALocalBlock *old = local->block;
+ int x;
+
+ assert(block);
+
+ qemu_rdma_free_pmrs(rdma, block, &block->pmr);
+ qemu_rdma_free_pmrs(rdma, block, &block->pmr_src);
+ qemu_rdma_free_pmrs(rdma, block, &block->pmr_dest);
+
+ qemu_rdma_free_mr(rdma, &block->mr);
+ qemu_rdma_free_mr(rdma, &block->mr_src);
+ qemu_rdma_free_mr(rdma, &block->mr_dest);
g_free(block->transit_bitmap);
block->transit_bitmap = NULL;
@@ -674,7 +965,12 @@ static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
}
if (block->index < (local->nb_blocks - 1)) {
- memcpy(local->block + block->index, old + (block->index + 1),
+ RDMALocalBlock * end = old + (block->index + 1);
+ for (x = 0; x < (local->nb_blocks - (block->index + 1)); x++) {
+ end[x].index--;
+ }
+
+ memcpy(local->block + block->index, end,
sizeof(RDMALocalBlock) *
(local->nb_blocks - (block->index + 1)));
}
@@ -683,6 +979,10 @@ static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
local->block = NULL;
}
+ g_free(old);
+
+ local->nb_blocks--;
+
DDPRINTF("Deleted Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
" length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
@@ -690,10 +990,6 @@ static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
BITS_TO_LONGS(block->nb_chunks) *
sizeof(unsigned long) * 8, block->nb_chunks);
- g_free(old);
-
- local->nb_blocks--;
-
if (local->nb_blocks) {
for (x = 0; x < local->nb_blocks; x++) {
g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
@@ -974,7 +1270,7 @@ route:
goto err_resolve_get_addr;
}
rdma_ack_cm_event(cm_event);
- rdma->verbs = rdma->cm_id->verbs;
+ rdma->lc_remote.verbs = rdma->cm_id->verbs;
qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
return 0;
@@ -988,49 +1284,43 @@ err_resolve_create_id:
return ret;
}
-/*
- * Create protection domain and completion queues
- */
-static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
+static int qemu_rdma_alloc_keepalive(RDMAContext *rdma)
{
- /* allocate pd */
- rdma->pd = ibv_alloc_pd(rdma->verbs);
- if (!rdma->pd) {
- fprintf(stderr, "failed to allocate protection domain\n");
- return -1;
- }
+ rdma->keepalive_mr = ibv_reg_mr(rdma->lc_remote.pd,
+ &rdma->keepalive, sizeof(rdma->keepalive),
+ IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
- /* create completion channel */
- rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
- if (!rdma->comp_channel) {
- fprintf(stderr, "failed to allocate completion channel\n");
- goto err_alloc_pd_cq;
+ if (!rdma->keepalive_mr) {
+ perror("Failed to register keepalive location!");
+ SET_ERROR(rdma, -ENOMEM);
+ goto err_alloc;
}
- /*
- * Completion queue can be filled by both read and write work requests,
- * so must reflect the sum of both possible queue sizes.
- */
- rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
- NULL, rdma->comp_channel, 0);
- if (!rdma->cq) {
- fprintf(stderr, "failed to allocate completion queue\n");
- goto err_alloc_pd_cq;
+ rdma->next_keepalive_mr = ibv_reg_mr(rdma->lc_remote.pd,
+ &rdma->next_keepalive, sizeof(rdma->next_keepalive),
+ IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+
+ if (!rdma->next_keepalive_mr) {
+ perror("Failed to register next keepalive location!");
+ SET_ERROR(rdma, -ENOMEM);
+ goto err_alloc;
}
return 0;
-err_alloc_pd_cq:
- if (rdma->pd) {
- ibv_dealloc_pd(rdma->pd);
+err_alloc:
+
+ if (rdma->keepalive_mr) {
+ ibv_dereg_mr(rdma->keepalive_mr);
+ rdma->keepalive_mr = NULL;
}
- if (rdma->comp_channel) {
- ibv_destroy_comp_channel(rdma->comp_channel);
+
+ if (rdma->next_keepalive_mr) {
+ ibv_dereg_mr(rdma->next_keepalive_mr);
+ rdma->next_keepalive_mr = NULL;
}
- rdma->pd = NULL;
- rdma->comp_channel = NULL;
- return -1;
+ return -1;
}
/*
@@ -1041,41 +1331,68 @@ static int qemu_rdma_alloc_qp(RDMAContext *rdma)
struct ibv_qp_init_attr attr = { 0 };
int ret;
- attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
+ attr.cap.max_send_wr = RDMA_SEND_MAX;
attr.cap.max_recv_wr = 3;
attr.cap.max_send_sge = 1;
attr.cap.max_recv_sge = 1;
- attr.send_cq = rdma->cq;
- attr.recv_cq = rdma->cq;
+ attr.send_cq = rdma->lc_remote.cq;
+ attr.recv_cq = rdma->lc_remote.cq;
attr.qp_type = IBV_QPT_RC;
- ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
+ ret = rdma_create_qp(rdma->cm_id, rdma->lc_remote.pd, &attr);
if (ret) {
return -1;
}
- rdma->qp = rdma->cm_id->qp;
+ rdma->lc_remote.qp = rdma->cm_id->qp;
return 0;
}
+static int qemu_rdma_reg_whole_mr(RDMAContext *rdma,
+ struct ibv_pd *pd,
+ struct ibv_mr **mr,
+ int index)
+{
+ RDMALocalBlocks *local = &rdma->local_ram_blocks;
+
+ *mr = ibv_reg_mr(pd,
+ local->block[index].local_host_addr,
+ local->block[index].length,
+ IBV_ACCESS_LOCAL_WRITE |
+ IBV_ACCESS_REMOTE_WRITE
+ );
+ if (!(*mr)) {
+ perror("Failed to register local dest ram block!\n");
+ return -1;
+ }
+ rdma->total_registrations++;
+
+ return 0;
+};
+
static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
{
int i;
RDMALocalBlocks *local = &rdma->local_ram_blocks;
for (i = 0; i < local->nb_blocks; i++) {
- local->block[i].mr =
- ibv_reg_mr(rdma->pd,
- local->block[i].local_host_addr,
- local->block[i].length,
- IBV_ACCESS_LOCAL_WRITE |
- IBV_ACCESS_REMOTE_WRITE
- );
- if (!local->block[i].mr) {
- perror("Failed to register local dest ram block!\n");
+ if (qemu_rdma_reg_whole_mr(rdma, rdma->lc_remote.pd, &local->block[i].mr, i)) {
break;
}
- rdma->total_registrations++;
+
+ /* TODO: make this optional if MC is disabled */
+ if (rdma->source) {
+ if (qemu_rdma_reg_whole_mr(rdma, rdma->lc_src.pd,
+ &local->block[i].mr_src, i)) {
+ break;
+ }
+ } else {
+ if (qemu_rdma_reg_whole_mr(rdma, rdma->lc_dest.pd,
+ &local->block[i].mr_dest, i)) {
+ break;
+ }
+ }
+
}
if (i >= local->nb_blocks) {
@@ -1083,8 +1400,10 @@ static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
}
for (i--; i >= 0; i--) {
- ibv_dereg_mr(local->block[i].mr);
- rdma->total_registrations--;
+ qemu_rdma_free_mr(rdma, &local->block[i].mr);
+ qemu_rdma_free_mr(rdma, rdma->source ?
+ &local->block[i].mr_src :
+ &local->block[i].mr_dest);
}
return -1;
@@ -1129,24 +1448,34 @@ static int qemu_rdma_search_ram_block(RDMAContext *rdma,
* to perform the actual RDMA operation.
*/
static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
- RDMALocalBlock *block, uint8_t *host_addr,
- uint32_t *lkey, uint32_t *rkey, int chunk,
- uint8_t *chunk_start, uint8_t *chunk_end)
+ RDMACurrentChunk *cc,
+ RDMALocalContext *lc,
+ bool copy,
+ uint32_t *lkey,
+ uint32_t *rkey)
{
- if (block->mr) {
+ struct ibv_mr ***pmr = copy ? (rdma->source ? &cc->block->pmr_src :
+ &cc->block->pmr_dest) : &cc->block->pmr;
+ struct ibv_mr **mr = copy ? (rdma->source ? &cc->block->mr_src :
+ &cc->block->mr_dest) : &cc->block->mr;
+
+ /*
+ * Use pre-registered keys for the entire VM, if available.
+ */
+ if (*mr) {
if (lkey) {
- *lkey = block->mr->lkey;
+ *lkey = (*mr)->lkey;
}
if (rkey) {
- *rkey = block->mr->rkey;
+ *rkey = (*mr)->rkey;
}
return 0;
}
/* allocate memory to store chunk MRs */
- if (!block->pmr) {
- block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
- if (!block->pmr) {
+ if (!(*pmr)) {
+ *pmr = g_malloc0(cc->block->nb_chunks * sizeof(struct ibv_mr *));
+ if (!(*pmr)) {
return -1;
}
}
@@ -1154,38 +1483,38 @@ static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
/*
* If 'rkey', then we're the destination, so grant access to the source.
*
- * If 'lkey', then we're the source VM, so grant access only to ourselves.
+ * If 'lkey', then we're the source, so grant access only to ourselves.
*/
- if (!block->pmr[chunk]) {
- uint64_t len = chunk_end - chunk_start;
+ if (!(*pmr)[cc->chunk_idx]) {
+ uint64_t len = cc->chunk_end - cc->chunk_start;
DDPRINTF("Registering %" PRIu64 " bytes @ %p\n",
- len, chunk_start);
+ len, cc->chunk_start);
- block->pmr[chunk] = ibv_reg_mr(rdma->pd,
- chunk_start, len,
- (rkey ? (IBV_ACCESS_LOCAL_WRITE |
- IBV_ACCESS_REMOTE_WRITE) : 0));
+ (*pmr)[cc->chunk_idx] = ibv_reg_mr(lc->pd, cc->chunk_start, len,
+ (rkey ? (IBV_ACCESS_LOCAL_WRITE |
+ IBV_ACCESS_REMOTE_WRITE) : 0));
- if (!block->pmr[chunk]) {
+ if (!(*pmr)[cc->chunk_idx]) {
perror("Failed to register chunk!");
- fprintf(stderr, "Chunk details: block: %d chunk index %d"
+ fprintf(stderr, "Chunk details: block: %d chunk index %" PRIu64
" start %" PRIu64 " end %" PRIu64 " host %" PRIu64
" local %" PRIu64 " registrations: %d\n",
- block->index, chunk, (uint64_t) chunk_start,
- (uint64_t) chunk_end, (uint64_t) host_addr,
- (uint64_t) block->local_host_addr,
+ cc->block->index, cc->chunk_idx, (uint64_t) cc->chunk_start,
+ (uint64_t) cc->chunk_end, (uint64_t) cc->addr,
+ (uint64_t) cc->block->local_host_addr,
rdma->total_registrations);
return -1;
}
+
rdma->total_registrations++;
}
if (lkey) {
- *lkey = block->pmr[chunk]->lkey;
+ *lkey = (*pmr)[cc->chunk_idx]->lkey;
}
if (rkey) {
- *rkey = block->pmr[chunk]->rkey;
+ *rkey = (*pmr)[cc->chunk_idx]->rkey;
}
return 0;
}
@@ -1196,7 +1525,7 @@ static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
*/
static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
{
- rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
+ rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->lc_remote.pd,
rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
if (rdma->wr_data[idx].control_mr) {
@@ -1257,11 +1586,11 @@ static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
uint64_t chunk =
(wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
- uint64_t index =
+ uint64_t block_index =
(wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
RDMALocalBlock *block =
- &(rdma->local_ram_blocks.block[index]);
- RDMARegister reg = { .current_index = index };
+ &(rdma->local_ram_blocks.block[block_index]);
+ RDMARegister reg = { .current_block_idx = block_index };
RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
};
RDMAControlHeader head = { .len = sizeof(RDMARegister),
@@ -1275,7 +1604,7 @@ static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
rdma->unregistrations[rdma->unregister_current] = 0;
rdma->unregister_current++;
- if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
+ if (rdma->unregister_current == RDMA_SEND_MAX) {
rdma->unregister_current = 0;
}
@@ -1339,7 +1668,7 @@ static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
uint64_t chunk, uint64_t wr_id)
{
if (rdma->unregistrations[rdma->unregister_next] != 0) {
- fprintf(stderr, "rdma migration: queue is full!\n");
+ ERROR(NULL, "queue is full!");
} else {
RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
@@ -1350,7 +1679,7 @@ static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
rdma->unregistrations[rdma->unregister_next++] =
qemu_rdma_make_wrid(wr_id, index, chunk);
- if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
+ if (rdma->unregister_next == RDMA_SEND_MAX) {
rdma->unregister_next = 0;
}
} else {
@@ -1365,14 +1694,21 @@ static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
* (of any kind) has completed.
* Return the work request ID that completed.
*/
-static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
+static uint64_t qemu_rdma_poll(RDMAContext *rdma,
+ RDMALocalContext *lc,
+ uint64_t *wr_id_out,
uint32_t *byte_len)
{
+ int64_t current_time;
int ret;
struct ibv_wc wc;
uint64_t wr_id;
- ret = ibv_poll_cq(rdma->cq, 1, &wc);
+ if (!lc->start_time) {
+ lc->start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+ }
+
+ ret = ibv_poll_cq(lc->cq, 1, &wc);
if (!ret) {
*wr_id_out = RDMA_WRID_NONE;
@@ -1397,29 +1733,48 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
if (rdma->control_ready_expected &&
(wr_id >= RDMA_WRID_RECV_CONTROL)) {
DDDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")"
- " left %d\n", wrid_desc[RDMA_WRID_RECV_CONTROL],
- wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
+ " left %d (per qp %d)\n",
+ wrid_desc[RDMA_WRID_RECV_CONTROL],
+ wr_id - RDMA_WRID_RECV_CONTROL, wr_id,
+ rdma->nb_sent, lc->nb_sent);
rdma->control_ready_expected = 0;
}
- if (wr_id == RDMA_WRID_RDMA_WRITE) {
+ if (wr_id == RDMA_WRID_RDMA_WRITE_REMOTE) {
uint64_t chunk =
(wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
- uint64_t index =
+ uint64_t block_idx =
(wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
- RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
-
- DDDPRINTF("completions %s (%" PRId64 ") left %d, "
- "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
- print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk,
- block->local_host_addr, (void *)block->remote_host_addr);
+ RDMALocalBlock *block = &(rdma->local_ram_blocks.block[block_idx]);
clear_bit(chunk, block->transit_bitmap);
+ if (lc->nb_sent > lc->max_nb_sent) {
+ lc->max_nb_sent = lc->nb_sent;
+ }
+
+ current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+
+ if ((current_time - lc->start_time) > 1000) {
+ lc->start_time = current_time;
+ DDPRINTF("outstanding %s total: %d context: %d max %d\n",
+ lc->id_str, rdma->nb_sent, lc->nb_sent, lc->max_nb_sent);
+ }
+
if (rdma->nb_sent > 0) {
rdma->nb_sent--;
}
+ if (lc->nb_sent > 0) {
+ lc->nb_sent--;
+ }
+
+ DDDPRINTF("completions %s (%" PRId64 ") left %d (per qp %d), "
+ "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
+ print_wrid(wr_id), wr_id, rdma->nb_sent, lc->nb_sent,
+ block_idx, chunk, block->local_host_addr,
+ (void *)block->remote_host_addr);
+
if (!rdma->pin_all) {
/*
* FYI: If one wanted to signal a specific chunk to be unregistered
@@ -1428,12 +1783,15 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
* unregistered later.
*/
#ifdef RDMA_UNREGISTRATION_EXAMPLE
- qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
+ if (block->pmr[chunk]) {
+ qemu_rdma_signal_unregister(rdma, block_idx, chunk, wc.wr_id);
+ }
#endif
}
} else {
- DDDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
- print_wrid(wr_id), wr_id, rdma->nb_sent);
+ DDDPRINTF("other completion %s (%"
+ PRId64 ") received left %d (per qp %d)\n",
+ print_wrid(wr_id), wr_id, rdma->nb_sent, lc->nb_sent);
}
*wr_id_out = wc.wr_id;
@@ -1457,7 +1815,9 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
* completions only need to be recorded, but do not actually
* need further processing.
*/
-static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
+static int qemu_rdma_block_for_wrid(RDMAContext *rdma,
+ RDMALocalContext *lc,
+ int wrid_requested,
uint32_t *byte_len)
{
int num_cq_events = 0, ret = 0;
@@ -1465,12 +1825,15 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
void *cq_ctx;
uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
- if (ibv_req_notify_cq(rdma->cq, 0)) {
- return -1;
+ ret = ibv_req_notify_cq(lc->cq, 0);
+ if (ret) {
+ perror("ibv_req_notify_cq");
+ return -ret;
}
+
/* poll cq first */
while (wr_id != wrid_requested) {
- ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
+ ret = qemu_rdma_poll(rdma, lc, &wr_id_in, byte_len);
if (ret < 0) {
return ret;
}
@@ -1496,23 +1859,27 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
* Coroutine doesn't start until process_incoming_migration()
* so don't yield unless we know we're running inside of a coroutine.
*/
- if (rdma->migration_started_on_destination) {
- yield_until_fd_readable(rdma->comp_channel->fd);
+ if (qemu_in_coroutine()) {
+ yield_until_fd_readable(lc->comp_chan->fd);
}
- if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
+ ret = ibv_get_cq_event(lc->comp_chan, &cq, &cq_ctx);
+ if (ret < 0) {
perror("ibv_get_cq_event");
goto err_block_for_wrid;
}
num_cq_events++;
- if (ibv_req_notify_cq(cq, 0)) {
+ ret = ibv_req_notify_cq(cq, 0);
+ if (ret) {
+ ret = -ret;
+ perror("ibv_req_notify_cq");
goto err_block_for_wrid;
}
while (wr_id != wrid_requested) {
- ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
+ ret = qemu_rdma_poll(rdma, lc, &wr_id_in, byte_len);
if (ret < 0) {
goto err_block_for_wrid;
}
@@ -1589,18 +1956,19 @@ static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
}
- if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
+ if (ibv_post_send(rdma->lc_remote.qp, &send_wr, &bad_wr)) {
return -1;
}
if (ret < 0) {
- fprintf(stderr, "Failed to use post IB SEND for control!\n");
+ ERROR(NULL, "using post IB SEND for control!");
return ret;
}
- ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
+ ret = qemu_rdma_block_for_wrid(rdma, &rdma->lc_remote,
+ RDMA_WRID_SEND_CONTROL, NULL);
if (ret < 0) {
- fprintf(stderr, "rdma migration: send polling control error!\n");
+ ERROR(NULL, "send polling control!");
}
return ret;
@@ -1626,7 +1994,7 @@ static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
};
- if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
+ if (ibv_post_recv(rdma->lc_remote.qp, &recv_wr, &bad_wr)) {
return -1;
}
@@ -1640,11 +2008,12 @@ static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
RDMAControlHeader *head, int expecting, int idx)
{
uint32_t byte_len;
- int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
+ int ret = qemu_rdma_block_for_wrid(rdma, &rdma->lc_remote,
+ RDMA_WRID_RECV_CONTROL + idx,
&byte_len);
if (ret < 0) {
- fprintf(stderr, "rdma migration: recv polling control error!\n");
+ ERROR(NULL, "recv polling control!");
return ret;
}
@@ -1731,8 +2100,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
if (resp) {
ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
if (ret) {
- fprintf(stderr, "rdma migration: error posting"
- " extra control recv for anticipated result!");
+ ERROR(NULL, "posting extra control recv for anticipated result!");
return ret;
}
}
@@ -1742,7 +2110,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
*/
ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
if (ret) {
- fprintf(stderr, "rdma migration: error posting first control recv!");
+ ERROR(NULL, "posting first control recv!");
return ret;
}
@@ -1752,7 +2120,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
ret = qemu_rdma_post_send_control(rdma, data, head);
if (ret < 0) {
- fprintf(stderr, "Failed to send control buffer!\n");
+ ERROR(NULL, "sending control buffer!");
return ret;
}
@@ -1829,64 +2197,80 @@ static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
*/
ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
if (ret) {
- fprintf(stderr, "rdma migration: error posting second control recv!");
+ ERROR(NULL, "posting second control recv!");
return ret;
}
return 0;
}
-/*
- * Write an actual chunk of memory using RDMA.
- *
- * If we're using dynamic registration on the dest-side, we have to
- * send a registration command first.
- */
-static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
- int current_index, uint64_t current_addr,
- uint64_t length)
+static inline void install_boundaries(RDMAContext *rdma, RDMACurrentChunk *cc)
{
- struct ibv_sge sge;
- struct ibv_send_wr send_wr = { 0 };
- struct ibv_send_wr *bad_wr;
- int reg_result_idx, ret, count = 0;
- uint64_t chunk, chunks;
- uint8_t *chunk_start, *chunk_end;
- RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
- RDMARegister reg;
- RDMARegisterResult *reg_result;
- RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
- RDMAControlHeader head = { .len = sizeof(RDMARegister),
+ uint64_t len = cc->block->is_ram_block ?
+ cc->current_length : cc->block->length;
+
+ cc->chunks = len / (1UL << RDMA_REG_CHUNK_SHIFT);
+
+ if (cc->chunks && ((len % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
+ cc->chunks--;
+ }
+
+ cc->addr = (uint8_t *) (uint64_t)(cc->block->local_host_addr +
+ (cc->current_addr - cc->block->offset));
+
+ cc->chunk_idx = ram_chunk_index(cc->block->local_host_addr, cc->addr);
+ cc->chunk_start = ram_chunk_start(cc->block, cc->chunk_idx);
+ cc->chunk_end = ram_chunk_end(cc->block, cc->chunk_idx + cc->chunks);
+
+ DDPRINTF("Block %d chunk %" PRIu64 " has %" PRIu64
+ " chunks, (%" PRIu64 " MB)\n", cc->block->index, cc->chunk_idx,
+ cc->chunks + 1, (cc->chunks + 1) *
+ (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
+
+}
+
+/*
+ * Push out any unwritten RDMA operations.
+ */
+static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
+ RDMACurrentChunk *src,
+ RDMACurrentChunk *dest)
+{
+ struct ibv_sge sge;
+ struct ibv_send_wr send_wr = { 0 };
+ struct ibv_send_wr *bad_wr;
+ int reg_result_idx, ret, count = 0;
+ bool copy;
+ RDMALocalContext *lc;
+ RDMARegister reg;
+ RDMARegisterResult *reg_result;
+ RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
+ RDMAControlHeader head = { .len = sizeof(RDMARegister),
.type = RDMA_CONTROL_REGISTER_REQUEST,
.repeat = 1,
};
-retry:
- sge.addr = (uint64_t)(block->local_host_addr +
- (current_addr - block->offset));
- sge.length = length;
-
- chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
- chunk_start = ram_chunk_start(block, chunk);
+ if (!src->current_length) {
+ return 0;
+ }
- if (block->is_ram_block) {
- chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
+ if (dest == src) {
+ dest = NULL;
+ }
- if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
- chunks--;
- }
- } else {
- chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
+ copy = dest ? true : false;
- if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
- chunks--;
- }
- }
+ lc = copy ?
+ (rdma->source ? &rdma->lc_src : &rdma->lc_dest) : &rdma->lc_remote;
- DDPRINTF("Writing %" PRIu64 " chunks, (%" PRIu64 " MB)\n",
- chunks + 1, (chunks + 1) * (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
+retry:
+ src->block = &(rdma->local_ram_blocks.block[src->current_block_idx]);
+ install_boundaries(rdma, src);
- chunk_end = ram_chunk_end(block, chunk + chunks);
+ if (dest) {
+ dest->block = &(rdma->local_ram_blocks.block[dest->current_block_idx]);
+ install_boundaries(rdma, dest);
+ }
if (!rdma->pin_all) {
#ifdef RDMA_UNREGISTRATION_EXAMPLE
@@ -1894,49 +2278,54 @@ retry:
#endif
}
- while (test_bit(chunk, block->transit_bitmap)) {
+ while (test_bit(src->chunk_idx, src->block->transit_bitmap)) {
(void)count;
DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64
- " current %" PRIu64 " len %" PRIu64 " %d %d\n",
- count++, current_index, chunk,
- sge.addr, length, rdma->nb_sent, block->nb_chunks);
+ " current %" PRIu64 " len %" PRIu64 " left %d (per qp %d) %d\n",
+ count++, src->current_block_idx, src->chunk_idx,
+ (uint64_t) src->addr, src->current_length,
+ rdma->nb_sent, lc->nb_sent, src->block->nb_chunks);
- ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
+ ret = qemu_rdma_block_for_wrid(rdma, lc,
+ RDMA_WRID_RDMA_WRITE_REMOTE, NULL);
if (ret < 0) {
fprintf(stderr, "Failed to Wait for previous write to complete "
"block %d chunk %" PRIu64
- " current %" PRIu64 " len %" PRIu64 " %d\n",
- current_index, chunk, sge.addr, length, rdma->nb_sent);
+ " current %" PRIu64 " len %" PRIu64 " %d (per qp %d)\n",
+ src->current_block_idx, src->chunk_idx, (uint64_t) src->addr,
+ src->current_length, rdma->nb_sent, lc->nb_sent);
return ret;
}
}
- if (!rdma->pin_all || !block->is_ram_block) {
- if (!block->remote_keys[chunk]) {
+ if (!rdma->pin_all || !src->block->is_ram_block) {
+ if (!src->block->remote_keys[src->chunk_idx]) {
/*
* This chunk has not yet been registered, so first check to see
* if the entire chunk is zero. If so, tell the other size to
* memset() + madvise() the entire chunk without RDMA.
*/
- if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
- && buffer_find_nonzero_offset((void *)sge.addr,
- length) == length) {
+ if (src->block->is_ram_block &&
+ can_use_buffer_find_nonzero_offset(src->addr, src->current_length)
+ && buffer_find_nonzero_offset(src->addr,
+ src->current_length) == src->current_length) {
RDMACompress comp = {
- .offset = current_addr,
+ .offset = src->current_addr,
.value = 0,
- .block_idx = current_index,
- .length = length,
+ .block_idx = src->current_block_idx,
+ .length = src->current_length,
};
head.len = sizeof(comp);
head.type = RDMA_CONTROL_COMPRESS;
- DDPRINTF("Entire chunk is zero, sending compress: %"
- PRIu64 " for %d "
- "bytes, index: %d, offset: %" PRId64 "...\n",
- chunk, sge.length, current_index, current_addr);
+ DDPRINTF("Entire chunk is zero, sending compress: %" PRIu64
+ " for %" PRIu64 " bytes, index: %d"
+ ", offset: %" PRId64 "...\n",
+ src->chunk_idx, src->current_length,
+ src->current_block_idx, src->current_addr);
compress_to_network(&comp);
ret = qemu_rdma_exchange_send(rdma, &head,
@@ -1946,109 +2335,125 @@ retry:
return -EIO;
}
- acct_update_position(f, sge.length, true);
+ acct_update_position(f, src->current_length, true);
return 1;
}
/*
- * Otherwise, tell other side to register.
+ * Otherwise, tell other side to register. (Only for remote RDMA)
*/
- reg.current_index = current_index;
- if (block->is_ram_block) {
- reg.key.current_addr = current_addr;
- } else {
- reg.key.chunk = chunk;
- }
- reg.chunks = chunks;
+ if (!dest) {
+ reg.current_block_idx = src->current_block_idx;
+ if (src->block->is_ram_block) {
+ reg.key.current_addr = src->current_addr;
+ } else {
+ reg.key.chunk = src->chunk_idx;
+ }
+ reg.chunks = src->chunks;
- DDPRINTF("Sending registration request chunk %" PRIu64 " for %d "
- "bytes, index: %d, offset: %" PRId64 "...\n",
- chunk, sge.length, current_index, current_addr);
+ DDPRINTF("Sending registration request chunk %" PRIu64
+ " for %" PRIu64 " bytes, index: %d, offset: %"
+ PRId64 "...\n",
+ src->chunk_idx, src->current_length,
+ src->current_block_idx, src->current_addr);
- register_to_network(®);
- ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®,
- &resp, ®_result_idx, NULL);
- if (ret < 0) {
- return ret;
+ register_to_network(®);
+ ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®,
+ &resp, ®_result_idx, NULL);
+ if (ret < 0) {
+ return ret;
+ }
}
/* try to overlap this single registration with the one we sent. */
- if (qemu_rdma_register_and_get_keys(rdma, block,
- (uint8_t *) sge.addr,
- &sge.lkey, NULL, chunk,
- chunk_start, chunk_end)) {
+ if (qemu_rdma_register_and_get_keys(rdma, src, lc, copy,
+ &sge.lkey, NULL)) {
fprintf(stderr, "cannot get lkey!\n");
return -EINVAL;
}
- reg_result = (RDMARegisterResult *)
- rdma->wr_data[reg_result_idx].control_curr;
+ if (!dest) {
+ reg_result = (RDMARegisterResult *)
+ rdma->wr_data[reg_result_idx].control_curr;
- network_to_result(reg_result);
+ network_to_result(reg_result);
- DDPRINTF("Received registration result:"
- " my key: %x their key %x, chunk %" PRIu64 "\n",
- block->remote_keys[chunk], reg_result->rkey, chunk);
+ DDPRINTF("Received registration result:"
+ " my key: %x their key %x, chunk %" PRIu64 "\n",
+ src->block->remote_keys[src->chunk_idx],
+ reg_result->rkey, src->chunk_idx);
- block->remote_keys[chunk] = reg_result->rkey;
- block->remote_host_addr = reg_result->host_addr;
+ src->block->remote_keys[src->chunk_idx] = reg_result->rkey;
+ src->block->remote_host_addr = reg_result->host_addr;
+ }
} else {
/* already registered before */
- if (qemu_rdma_register_and_get_keys(rdma, block,
- (uint8_t *)sge.addr,
- &sge.lkey, NULL, chunk,
- chunk_start, chunk_end)) {
+ if (qemu_rdma_register_and_get_keys(rdma, src, lc, copy,
+ &sge.lkey, NULL)) {
fprintf(stderr, "cannot get lkey!\n");
return -EINVAL;
}
}
- send_wr.wr.rdma.rkey = block->remote_keys[chunk];
+ send_wr.wr.rdma.rkey = src->block->remote_keys[src->chunk_idx];
} else {
- send_wr.wr.rdma.rkey = block->remote_rkey;
+ send_wr.wr.rdma.rkey = src->block->remote_rkey;
- if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
- &sge.lkey, NULL, chunk,
- chunk_start, chunk_end)) {
+ if (qemu_rdma_register_and_get_keys(rdma, src, lc, copy,
+ &sge.lkey, NULL)) {
fprintf(stderr, "cannot get lkey!\n");
return -EINVAL;
}
}
+ if (dest) {
+ if (qemu_rdma_register_and_get_keys(rdma, dest,
+ &rdma->lc_dest, copy,
+ NULL, &send_wr.wr.rdma.rkey)) {
+ fprintf(stderr, "cannot get rkey!\n");
+ return -EINVAL;
+ }
+ }
+
/*
* Encode the ram block index and chunk within this wrid.
* We will use this information at the time of completion
* to figure out which bitmap to check against and then which
* chunk in the bitmap to look for.
*/
- send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
- current_index, chunk);
+ send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE_REMOTE,
+ src->current_block_idx, src->chunk_idx);
+ sge.length = src->current_length;
+ sge.addr = (uint64_t) src->addr;
send_wr.opcode = IBV_WR_RDMA_WRITE;
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
- send_wr.wr.rdma.remote_addr = block->remote_host_addr +
- (current_addr - block->offset);
+ send_wr.wr.rdma.remote_addr = (dest ? (uint64_t) dest->addr :
+ (src->block->remote_host_addr +
+ (src->current_addr - src->block->offset)));
- DDDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
- " remote: %lx, bytes %" PRIu32 "\n",
- chunk, sge.addr, send_wr.wr.rdma.remote_addr,
- sge.length);
+ DDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
+ " remote: %lx, bytes %" PRIu32 " lkey %" PRIu32
+ " rkey %" PRIu32 "\n",
+ src->chunk_idx, sge.addr,
+ send_wr.wr.rdma.remote_addr, sge.length,
+ sge.lkey, send_wr.wr.rdma.rkey);
/*
* ibv_post_send() does not return negative error numbers,
* per the specification they are positive - no idea why.
*/
- ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
+ ret = ibv_post_send(lc->qp, &send_wr, &bad_wr);
if (ret == ENOMEM) {
DDPRINTF("send queue is full. wait a little....\n");
- ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
+ ret = qemu_rdma_block_for_wrid(rdma, lc,
+ RDMA_WRID_RDMA_WRITE_REMOTE, NULL);
if (ret < 0) {
- fprintf(stderr, "rdma migration: failed to make "
- "room in full send queue! %d\n", ret);
+ ERROR(NULL, "could not make room in full send queue! %d", ret);
return ret;
}
@@ -2059,80 +2464,66 @@ retry:
return -ret;
}
- set_bit(chunk, block->transit_bitmap);
- acct_update_position(f, sge.length, false);
- rdma->total_writes++;
-
- return 0;
-}
-
-/*
- * Push out any unwritten RDMA operations.
- *
- * We support sending out multiple chunks at the same time.
- * Not all of them need to get signaled in the completion queue.
- */
-static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
-{
- int ret;
+ set_bit(src->chunk_idx, src->block->transit_bitmap);
- if (!rdma->current_length) {
- return 0;
+ if (!dest) {
+ acct_update_position(f, sge.length, false);
}
- ret = qemu_rdma_write_one(f, rdma,
- rdma->current_index, rdma->current_addr, rdma->current_length);
+ rdma->total_writes++;
+ rdma->nb_sent++;
+ lc->nb_sent++;
- if (ret < 0) {
- return ret;
- }
+ DDDPRINTF("sent total: %d sent lc: %d\n", rdma->nb_sent, lc->nb_sent);
- if (ret == 0) {
- rdma->nb_sent++;
- DDDPRINTF("sent total: %d\n", rdma->nb_sent);
- }
+ src->current_length = 0;
+ src->current_addr = 0;
- rdma->current_length = 0;
- rdma->current_addr = 0;
+ if (dest) {
+ dest->current_length = 0;
+ dest->current_addr = 0;
+ }
return 0;
}
static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
- uint64_t offset, uint64_t len)
+ RDMACurrentChunk *cc,
+ uint64_t current_addr,
+ uint64_t len)
{
RDMALocalBlock *block;
uint8_t *host_addr;
uint8_t *chunk_end;
- if (rdma->current_index < 0) {
+ if (cc->current_block_idx < 0) {
return 0;
}
- if (rdma->current_chunk < 0) {
+ if (cc->current_chunk < 0) {
return 0;
}
- block = &(rdma->local_ram_blocks.block[rdma->current_index]);
- host_addr = block->local_host_addr + (offset - block->offset);
- chunk_end = ram_chunk_end(block, rdma->current_chunk);
+ block = &(rdma->local_ram_blocks.block[cc->current_block_idx]);
+ host_addr = block->local_host_addr + (current_addr - block->offset);
+ chunk_end = ram_chunk_end(block, cc->current_chunk);
- if (rdma->current_length == 0) {
+ if (cc->current_length == 0) {
return 0;
}
/*
* Only merge into chunk sequentially.
*/
- if (offset != (rdma->current_addr + rdma->current_length)) {
+ if (current_addr != (cc->current_addr + cc->current_length)) {
return 0;
}
- if (offset < block->offset) {
+ if (current_addr < block->offset) {
return 0;
}
- if ((offset + len) > (block->offset + block->length)) {
+ if ((current_addr + len) > (block->offset + block->length)) {
return 0;
}
@@ -2143,72 +2534,121 @@ static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
return 1;
}
-/*
- * We're not actually writing here, but doing three things:
- *
- * 1. Identify the chunk the buffer belongs to.
- * 2. If the chunk is full or the buffer doesn't belong to the current
- * chunk, then start a new chunk and flush() the old chunk.
- * 3. To keep the hardware busy, we also group chunks into batches
- * and only require that a batch gets acknowledged in the completion
- * qeueue instead of each individual chunk.
+static int write_start(RDMAContext *rdma,
+ RDMACurrentChunk *cc,
+ uint64_t len,
+ uint64_t current_addr)
+{
+ int ret;
+ uint64_t block_idx, chunk;
+
+ cc->current_addr = current_addr;
+ block_idx = cc->current_block_idx;
+ chunk = cc->current_chunk;
+
+ ret = qemu_rdma_search_ram_block(rdma, cc->block_offset,
+ cc->offset, len, &block_idx, &chunk);
+ if (ret) {
+ ERROR(NULL, "ram block search failed");
+ return ret;
+ }
+
+ cc->current_block_idx = block_idx;
+ cc->current_chunk = chunk;
+
+ return 0;
+}
+
+/*
+ * If we cannot merge it, we flush the current buffer first.
*/
-static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
- uint64_t block_offset, uint64_t offset,
- uint64_t len)
+static int qemu_rdma_flush_unmergable(RDMAContext *rdma,
+ RDMACurrentChunk *src,
+ RDMACurrentChunk *dest,
+ QEMUFile *f, uint64_t len)
{
- uint64_t current_addr = block_offset + offset;
- uint64_t index = rdma->current_index;
- uint64_t chunk = rdma->current_chunk;
+ uint64_t current_addr_src;
+ uint64_t current_addr_dest;
int ret;
- /* If we cannot merge it, we flush the current buffer first. */
- if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
- ret = qemu_rdma_write_flush(f, rdma);
- if (ret) {
- return ret;
+ current_addr_src = src->block_offset + src->offset;
+
+ if (dest) {
+ current_addr_dest = dest->block_offset + dest->offset;
+ }
+
+ if (qemu_rdma_buffer_mergable(rdma, src, current_addr_src, len)) {
+ if (dest) {
+ if (qemu_rdma_buffer_mergable(rdma, dest, current_addr_dest, len)) {
+ goto merge;
+ }
+ } else {
+ goto merge;
}
- rdma->current_length = 0;
- rdma->current_addr = current_addr;
+ }
+
+ ret = qemu_rdma_write(f, rdma, src, dest);
+
+ if (ret) {
+ return ret;
+ }
+
+ ret = write_start(rdma, src, len, current_addr_src);
+
+ if (ret) {
+ return ret;
+ }
+
+ if (dest) {
+ ret = write_start(rdma, dest, len, current_addr_dest);
- ret = qemu_rdma_search_ram_block(rdma, block_offset,
- offset, len, &index, &chunk);
if (ret) {
- fprintf(stderr, "ram block search failed\n");
return ret;
}
- rdma->current_index = index;
- rdma->current_chunk = chunk;
}
- /* merge it */
- rdma->current_length += len;
-
- /* flush it if buffer is too large */
- if (rdma->current_length >= RDMA_MERGE_MAX) {
- return qemu_rdma_write_flush(f, rdma);
+merge:
+ src->current_length += len;
+ if (dest) {
+ dest->current_length += len;
}
return 0;
}
-static void qemu_rdma_cleanup(RDMAContext *rdma)
+static void qemu_rdma_cleanup(RDMAContext *rdma, bool force)
{
struct rdma_cm_event *cm_event;
int ret, idx;
+ if (connection_timer) {
+ timer_del(connection_timer);
+ timer_free(connection_timer);
+ connection_timer = NULL;
+ }
+
+ if (keepalive_timer) {
+ timer_del(keepalive_timer);
+ timer_free(keepalive_timer);
+ keepalive_timer = NULL;
+ }
+
if (rdma->cm_id && rdma->connected) {
if (rdma->error_state) {
- RDMAControlHeader head = { .len = 0,
- .type = RDMA_CONTROL_ERROR,
- .repeat = 1,
- };
- fprintf(stderr, "Early error. Sending error.\n");
- qemu_rdma_post_send_control(rdma, NULL, &head);
+ if (rdma->error_state != -ENETUNREACH) {
+ RDMAControlHeader head = { .len = 0,
+ .type = RDMA_CONTROL_ERROR,
+ .repeat = 1,
+ };
+ fprintf(stderr, "Early error. Sending error.\n");
+ qemu_rdma_post_send_control(rdma, NULL, &head);
+ } else {
+ fprintf(stderr, "Early error.\n");
+ }
}
ret = rdma_disconnect(rdma->cm_id);
- if (!ret) {
+ if (!ret && !force && (rdma->error_state != -ENETUNREACH)) {
DDPRINTF("waiting for disconnect\n");
ret = rdma_get_cm_event(rdma->channel, &cm_event);
if (!ret) {
@@ -2216,6 +2656,7 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
}
}
DDPRINTF("Disconnected.\n");
+ rdma->lc_remote.verbs = NULL;
rdma->connected = false;
}
@@ -2237,22 +2678,10 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
}
}
- if (rdma->qp) {
- rdma_destroy_qp(rdma->cm_id);
- rdma->qp = NULL;
- }
- if (rdma->cq) {
- ibv_destroy_cq(rdma->cq);
- rdma->cq = NULL;
- }
- if (rdma->comp_channel) {
- ibv_destroy_comp_channel(rdma->comp_channel);
- rdma->comp_channel = NULL;
- }
- if (rdma->pd) {
- ibv_dealloc_pd(rdma->pd);
- rdma->pd = NULL;
- }
+ close_ibv(rdma, &rdma->lc_remote);
+ close_ibv(rdma, &rdma->lc_src);
+ close_ibv(rdma, &rdma->lc_dest);
+
if (rdma->listen_id) {
rdma_destroy_id(rdma->listen_id);
rdma->listen_id = NULL;
@@ -2265,12 +2694,24 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
rdma_destroy_event_channel(rdma->channel);
rdma->channel = NULL;
}
+
g_free(rdma->host);
rdma->host = NULL;
+
+ if (rdma->keepalive_mr) {
+ ibv_dereg_mr(rdma->keepalive_mr);
+ rdma->keepalive_mr = NULL;
+ }
+ if (rdma->next_keepalive_mr) {
+ ibv_dereg_mr(rdma->next_keepalive_mr);
+ rdma->next_keepalive_mr = NULL;
+ }
}
-static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
+static int qemu_rdma_source_init(RDMAContext *rdma,
+ Error **errp,
+ MigrationState *s)
{
int ret, idx;
Error *local_err = NULL, **temp = &local_err;
@@ -2279,38 +2720,45 @@ static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
* Will be validated against destination's actual capabilities
* after the connect() completes.
*/
- rdma->pin_all = pin_all;
+ rdma->pin_all = s->enabled_capabilities[MIGRATION_CAPABILITY_X_RDMA_PIN_ALL];
+ rdma->do_keepalive = s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_KEEPALIVE];
ret = qemu_rdma_resolve_host(rdma, temp);
if (ret) {
goto err_rdma_source_init;
}
- ret = qemu_rdma_alloc_pd_cq(rdma);
+ ret = qemu_rdma_alloc_pd_cq(rdma, &rdma->lc_remote);
if (ret) {
- ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
+ ERROR(temp, "allocating pd and cq! Your mlock()"
" limits may be too low. Please check $ ulimit -a # and "
"search for 'ulimit -l' in the output");
goto err_rdma_source_init;
}
+ ret = qemu_rdma_alloc_keepalive(rdma);
+
+ if (ret) {
+ ERROR(temp, "allocating keepalive structures");
+ goto err_rdma_source_init;
+ }
+
ret = qemu_rdma_alloc_qp(rdma);
if (ret) {
- ERROR(temp, "rdma migration: error allocating qp!");
+ ERROR(temp, "allocating qp!");
goto err_rdma_source_init;
}
ret = qemu_rdma_init_ram_blocks(rdma);
if (ret) {
- ERROR(temp, "rdma migration: error initializing ram blocks!");
+ ERROR(temp, "initializing ram blocks!");
goto err_rdma_source_init;
}
for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
ret = qemu_rdma_reg_control(rdma, idx);
if (ret) {
- ERROR(temp, "rdma migration: error registering %d control!",
- idx);
+ ERROR(temp, "registering %d control!", idx);
goto err_rdma_source_init;
}
}
@@ -2319,7 +2767,7 @@ static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
err_rdma_source_init:
error_propagate(errp, local_err);
- qemu_rdma_cleanup(rdma);
+ qemu_rdma_cleanup(rdma, false);
return -1;
}
@@ -2328,6 +2776,8 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
RDMACapabilities cap = {
.version = RDMA_CONTROL_VERSION_CURRENT,
.flags = 0,
+ .keepalive_rkey = rdma->keepalive_mr->rkey,
+ .keepalive_addr = (uint64_t) &rdma->keepalive,
};
struct rdma_conn_param conn_param = { .initiator_depth = 2,
.retry_count = 5,
@@ -2346,6 +2796,13 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
cap.flags |= RDMA_CAPABILITY_PIN_ALL;
}
+ if (rdma->do_keepalive) {
+ DPRINTF("Keepalives requested.\n");
+ cap.flags |= RDMA_CAPABILITY_KEEPALIVE;
+ }
+
+ DDPRINTF("Sending keepalive params: key %x addr: %" PRIx64 "\n",
+ cap.keepalive_rkey, cap.keepalive_addr);
caps_to_network(&cap);
ret = rdma_connect(rdma->cm_id, &conn_param);
@@ -2380,6 +2837,12 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
network_to_caps(&cap);
+ rdma->keepalive_rkey = cap.keepalive_rkey;
+ rdma->keepalive_addr = cap.keepalive_addr;
+
+ DDPRINTF("Received keepalive params: key %x addr: %" PRIx64 "\n",
+ cap.keepalive_rkey, cap.keepalive_addr);
+
/*
* Verify that the *requested* capabilities are supported by the destination
* and disable them otherwise.
@@ -2390,7 +2853,14 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
rdma->pin_all = false;
}
+ if (rdma->do_keepalive && !(cap.flags & RDMA_CAPABILITY_KEEPALIVE)) {
+ ERROR(errp, "Server cannot support keepalives. "
+ "Will not check for them.");
+ rdma->do_keepalive = false;
+ }
+
DPRINTF("Pin all memory: %s\n", rdma->pin_all ? "enabled" : "disabled");
+ DPRINTF("Keepalives: %s\n", rdma->do_keepalive ? "enabled" : "disabled");
rdma_ack_cm_event(cm_event);
@@ -2405,7 +2875,7 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
return 0;
err_rdma_source_connect:
- qemu_rdma_cleanup(rdma);
+ qemu_rdma_cleanup(rdma, false);
return -1;
}
@@ -2424,14 +2894,14 @@ static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
if (rdma->host == NULL) {
ERROR(errp, "RDMA host is not set!");
- rdma->error_state = -EINVAL;
+ SET_ERROR(rdma, -EINVAL);
return -1;
}
/* create CM channel */
rdma->channel = rdma_create_event_channel();
if (!rdma->channel) {
ERROR(errp, "could not create rdma event channel");
- rdma->error_state = -EINVAL;
+ SET_ERROR(rdma, -EINVAL);
return -1;
}
@@ -2489,11 +2959,117 @@ err_dest_init_bind_addr:
err_dest_init_create_listen_id:
rdma_destroy_event_channel(rdma->channel);
rdma->channel = NULL;
- rdma->error_state = ret;
+ SET_ERROR(rdma, ret);
return ret;
}
+static void send_keepalive(void *opaque)
+{
+ RDMAContext *rdma = opaque;
+ struct ibv_sge sge;
+ struct ibv_send_wr send_wr = { 0 };
+ struct ibv_send_wr *bad_wr;
+ int ret;
+
+ if (!rdma->migration_started) {
+ goto reset;
+ }
+
+ rdma->next_keepalive++;
+retry:
+
+ sge.addr = (uint64_t) &rdma->next_keepalive;
+ sge.length = sizeof(rdma->next_keepalive);
+ sge.lkey = rdma->next_keepalive_mr->lkey;
+ send_wr.wr_id = RDMA_WRID_RDMA_KEEPALIVE;
+ send_wr.opcode = IBV_WR_RDMA_WRITE;
+ send_wr.send_flags = 0;
+ send_wr.sg_list = &sge;
+ send_wr.num_sge = 1;
+ send_wr.wr.rdma.remote_addr = rdma->keepalive_addr;
+ send_wr.wr.rdma.rkey = rdma->keepalive_rkey;
+
+ DDPRINTF("Posting keepalive: addr: %lx"
+ " remote: %lx, bytes %" PRIu32 "\n",
+ sge.addr, send_wr.wr.rdma.remote_addr, sge.length);
+
+ ret = ibv_post_send(rdma->lc_remote.qp, &send_wr, &bad_wr);
+
+ if (ret == ENOMEM) {
+ DPRINTF("send queue is full. wait a little....\n");
+ g_usleep(RDMA_KEEPALIVE_INTERVAL_MS * 1000);
+ goto retry;
+ } else if (ret > 0) {
+ perror("rdma migration: post keepalive");
+ SET_ERROR(rdma, -ret);
+ return;
+ }
+
+reset:
+ timer_mod(keepalive_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) +
+ RDMA_KEEPALIVE_INTERVAL_MS);
+}
+
+static void check_qp_state(void *opaque)
+{
+ RDMAContext *rdma = opaque;
+ int first_missed = 0;
+
+ if (!rdma->migration_started) {
+ goto reset;
+ }
+
+ if (rdma->last_keepalive == rdma->keepalive) {
+ rdma->nb_missed_keepalive++;
+ if (rdma->nb_missed_keepalive == 1) {
+ first_missed = RDMA_KEEPALIVE_FIRST_MISSED_OFFSET;
+ DDPRINTF("Setting first missed additional delay\n");
+ } else {
+ DPRINTF("WARN: missed keepalive: %" PRIu64 "\n",
+ rdma->nb_missed_keepalive);
+ }
+ } else {
+ rdma->keepalive_startup = true;
+ rdma->nb_missed_keepalive = 0;
+ }
+
+ rdma->last_keepalive = rdma->keepalive;
+
+ if (rdma->keepalive_startup) {
+ if (rdma->nb_missed_keepalive > RDMA_MAX_LOST_KEEPALIVE) {
+ struct ibv_qp_attr attr = {.qp_state = IBV_QPS_ERR };
+ SET_ERROR(rdma, -ENETUNREACH);
+ ERROR(NULL, "peer keepalive failed.");
+
+ if (ibv_modify_qp(rdma->lc_remote.qp, &attr, IBV_QP_STATE)) {
+ ERROR(NULL, "modify QP to RTR");
+ return;
+ }
+ return;
+ }
+ } else if (rdma->nb_missed_keepalive < RDMA_MAX_STARTUP_MISSED_KEEPALIVE) {
+ DDPRINTF("Keepalive startup waiting: %" PRIu64 "\n",
+ rdma->nb_missed_keepalive);
+ } else {
+ DDPRINTF("Keepalive startup too long.\n");
+ rdma->keepalive_startup = true;
+ }
+
+reset:
+ timer_mod(connection_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) +
+ RDMA_KEEPALIVE_INTERVAL_MS + first_missed);
+}
+
+static void qemu_rdma_keepalive_start(void)
+{
+ DPRINTF("Starting up keepalives....\n");
+ timer_mod(connection_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) +
+ RDMA_CONNECTION_INTERVAL_MS);
+ timer_mod(keepalive_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) +
+ RDMA_KEEPALIVE_INTERVAL_MS);
+}
+
static void *qemu_rdma_data_init(const char *host_port, Error **errp)
{
RDMAContext *rdma = NULL;
@@ -2502,8 +3078,12 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
if (host_port) {
rdma = g_malloc0(sizeof(RDMAContext));
memset(rdma, 0, sizeof(RDMAContext));
- rdma->current_index = -1;
- rdma->current_chunk = -1;
+ rdma->chunk_remote.current_block_idx = -1;
+ rdma->chunk_remote.current_chunk = -1;
+ rdma->chunk_local_src.current_block_idx = -1;
+ rdma->chunk_local_src.current_chunk = -1;
+ rdma->chunk_local_dest.current_block_idx = -1;
+ rdma->chunk_local_dest.current_chunk = -1;
addr = inet_parse(host_port, NULL);
if (addr != NULL) {
@@ -2515,6 +3095,14 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
return NULL;
}
}
+
+ rdma->keepalive_startup = false;
+ connection_timer = timer_new_ms(QEMU_CLOCK_REALTIME, check_qp_state, rdma);
+ keepalive_timer = timer_new_ms(QEMU_CLOCK_REALTIME, send_keepalive, rdma);
+ rdma->lc_dest.id_str = "local destination";
+ rdma->lc_src.id_str = "local src";
+ rdma->lc_remote.id_str = "remote";
+
return rdma;
}
@@ -2540,9 +3128,9 @@ static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
* Push out any writes that
* we're queued up for pc.ram.
*/
- ret = qemu_rdma_write_flush(f, rdma);
+ ret = qemu_rdma_write(f, rdma, &rdma->chunk_remote, NULL);
if (ret < 0) {
- rdma->error_state = ret;
+ SET_ERROR(rdma, ret);
return ret;
}
@@ -2558,7 +3146,7 @@ static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
if (ret < 0) {
- rdma->error_state = ret;
+ SET_ERROR(rdma, ret);
return ret;
}
@@ -2618,7 +3206,7 @@ static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
if (ret < 0) {
- rdma->error_state = ret;
+ SET_ERROR(rdma, ret);
return ret;
}
@@ -2631,18 +3219,23 @@ static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
/*
* Block until all the outstanding chunks have been delivered by the hardware.
*/
-static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
+static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma,
+ RDMACurrentChunk *src,
+ RDMACurrentChunk *dest)
{
int ret;
+ RDMALocalContext *lc = (dest && dest != src) ?
+ (rdma->source ? &rdma->lc_src : &rdma->lc_dest) : &rdma->lc_remote;
- if (qemu_rdma_write_flush(f, rdma) < 0) {
+ if (qemu_rdma_write(f, rdma, src, dest) < 0) {
return -EIO;
}
- while (rdma->nb_sent) {
- ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
+ while (lc->nb_sent) {
+ ret = qemu_rdma_block_for_wrid(rdma, lc,
+ RDMA_WRID_RDMA_WRITE_REMOTE, NULL);
if (ret < 0) {
- fprintf(stderr, "rdma migration: complete polling error!\n");
+ ERROR(NULL, "complete polling!");
return -EIO;
}
}
@@ -2657,13 +3250,190 @@ static int qemu_rdma_close(void *opaque)
DPRINTF("Shutting down connection.\n");
QEMUFileRDMA *r = opaque;
if (r->rdma) {
- qemu_rdma_cleanup(r->rdma);
+ qemu_rdma_cleanup(r->rdma, false);
g_free(r->rdma);
}
g_free(r);
return 0;
}
+static int qemu_rdma_instruct_unregister(RDMAContext *rdma, QEMUFile *f,
+ ram_addr_t block_offset,
+ ram_addr_t offset, long size)
+{
+ int ret;
+ uint64_t block, chunk;
+
+ if (size < 0) {
+ ret = qemu_rdma_drain_cq(f, rdma, &rdma->chunk_remote, NULL);
+ if (ret < 0) {
+ fprintf(stderr, "rdma: failed to synchronously drain"
+ " completion queue before unregistration.\n");
+ return ret;
+ }
+ }
+
+ ret = qemu_rdma_search_ram_block(rdma, block_offset,
+ offset, size, &block, &chunk);
+
+ if (ret) {
+ fprintf(stderr, "ram block search failed\n");
+ return ret;
+ }
+
+ qemu_rdma_signal_unregister(rdma, block, chunk, 0);
+
+ /*
+ * Synchronous, gauranteed unregistration (should not occur during
+ * fast-path). Otherwise, unregisters will process on the next call to
+ * qemu_rdma_drain_cq()
+ */
+ if (size < 0) {
+ qemu_rdma_unregister_waiting(rdma);
+ }
+
+ return 0;
+}
+
+
+static int qemu_rdma_poll_until_empty(RDMAContext *rdma, RDMALocalContext *lc)
+{
+ uint64_t wr_id, wr_id_in;
+ int ret;
+
+ /*
+ * Drain the Completion Queue if possible, but do not block,
+ * just poll.
+ *
+ * If nothing to poll, the end of the iteration will do this
+ * again to make sure we don't overflow the request queue.
+ */
+ while (1) {
+ ret = qemu_rdma_poll(rdma, lc, &wr_id_in, NULL);
+ if (ret < 0) {
+ ERROR(NULL, "empty polling error! %d", ret);
+ return ret;
+ }
+
+ wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+
+ if (wr_id == RDMA_WRID_NONE) {
+ break;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ * Parameters:
+ * @offset_{source|dest} == 0 :
+ * This means that 'block_offset' is a full virtual address that does not
+ * belong to a RAMBlock of the virtual machine and instead
+ * represents a private malloc'd memory area that the caller wishes to
+ * transfer. Source and dest can be different (either real RAMBlocks or
+ * private).
+ *
+ * @offset != 0 :
+ * Offset is an offset to be added to block_offset and used
+ * to also lookup the corresponding RAMBlock. Source and dest can be different
+ * (either real RAMBlocks or private).
+ *
+ * @size > 0 :
+ * Amount of memory to copy locally using RDMA.
+ *
+ * @size == 0 :
+ * A 'hint' or 'advice' that means that we wish to speculatively
+ * and asynchronously unregister either the source or destination memory.
+ * In this case, there is no gaurantee that the unregister will actually happen,
+ * for example, if the memory is being actively copied. Additionally, the memory
+ * may be re-registered at any future time if a copy within the same
+ * range was requested again, even if you attempted to unregister it here.
+ *
+ * @size < 0 : TODO, not yet supported
+ * Unregister the memory NOW. This means that the caller does not
+ * expect there to be any future RDMA copies and we just want to clean
+ * things up. This is used in case the upper layer owns the memory and
+ * cannot wait for qemu_fclose() to occur.
+ */
+static int qemu_rdma_copy_page(QEMUFile *f, void *opaque,
+ ram_addr_t block_offset_dest,
+ ram_addr_t offset_dest,
+ ram_addr_t block_offset_source,
+ ram_addr_t offset_source,
+ long size)
+{
+ QEMUFileRDMA *rfile = opaque;
+ RDMAContext *rdma = rfile->rdma;
+ int ret;
+ RDMACurrentChunk *src = &rdma->chunk_local_src;
+ RDMACurrentChunk *dest = &rdma->chunk_local_dest;
+
+ CHECK_ERROR_STATE();
+
+ qemu_fflush(f);
+
+ if (size > 0) {
+ /*
+ * Add this page to the current 'chunk'. If the chunk
+ * is full, or the page doen't belong to the current chunk,
+ * an actual RDMA write will occur and a new chunk will be formed.
+ */
+ src->block_offset = block_offset_source;
+ src->offset = offset_source;
+ dest->block_offset = block_offset_dest;
+ dest->offset = offset_dest;
+
+ DDPRINTF("Copy page: %p src offset %" PRIu64
+ " dest %p offset %" PRIu64 "\n",
+ (void *) block_offset_source, offset_source,
+ (void *) block_offset_dest, offset_dest);
+
+ ret = qemu_rdma_flush_unmergable(rdma, src, dest, f, size);
+
+ if (ret) {
+ ERROR(NULL, "local copy flush");
+ goto err;
+ }
+
+ if ((src->current_length >= RDMA_MERGE_MAX) ||
+ (dest->current_length >= RDMA_MERGE_MAX)) {
+ ret = qemu_rdma_write(f, rdma, src, dest);
+
+ if (ret < 0) {
+ goto err;
+ }
+ } else {
+ ret = 0;
+ }
+ } else {
+ ret = qemu_rdma_instruct_unregister(rdma, f, block_offset_source,
+ offset_source, size);
+ if (ret) {
+ goto err;
+ }
+
+ ret = qemu_rdma_instruct_unregister(rdma, f, block_offset_dest,
+ offset_dest, size);
+
+ if (ret) {
+ goto err;
+ }
+ }
+
+ ret = qemu_rdma_poll_until_empty(rdma,
+ rdma->source ? &rdma->lc_src : &rdma->lc_dest);
+
+ if (ret) {
+ goto err;
+ }
+
+ return RAM_COPY_CONTROL_DELAYED;
+err:
+ SET_ERROR(rdma, ret);
+ return ret;
+}
+
/*
* Parameters:
* @offset == 0 :
@@ -2672,6 +3442,20 @@ static int qemu_rdma_close(void *opaque)
* represents a private malloc'd memory area that the caller wishes to
* transfer.
*
+ * This allows callers to initiate RDMA transfers of arbitrary memory
+ * areas and not just only by migration itself.
+ *
+ * If this is true, then the virtual address specified by 'block_offset'
+ * below must have been pre-registered with us in advance by calling the
+ * new QEMUFileOps->add()/remove() functions on both sides of the
+ * connection.
+ *
+ * Also note: add()/remove() must been called in the *same sequence* and
+ * against the *same size* private virtual memory on both sides of the
+ * connection for this to work, regardless whether or not transfer of
+ * this private memory was initiated by the migration code or a private
+ * caller.
+ *
* @offset != 0 :
* Offset is an offset to be added to block_offset and used
* to also lookup the corresponding RAMBlock.
@@ -2680,7 +3464,7 @@ static int qemu_rdma_close(void *opaque)
* Initiate an transfer this size.
*
* @size == 0 :
- * A 'hint' or 'advice' that means that we wish to speculatively
+ * A 'hint' that means that we wish to speculatively
* and asynchronously unregister this memory. In this case, there is no
* guarantee that the unregister will actually happen, for example,
* if the memory is being actively transmitted. Additionally, the memory
@@ -2698,12 +3482,15 @@ static int qemu_rdma_close(void *opaque)
* sent. Usually, this will not be more than a few bytes of
* the protocol because most transfers are sent asynchronously.
*/
-static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
- ram_addr_t block_offset, ram_addr_t offset,
- size_t size, int *bytes_sent)
+static int qemu_rdma_save_page(QEMUFile *f, void *opaque,
+ ram_addr_t block_offset,
+ uint8_t *host_addr,
+ ram_addr_t offset,
+ long size, int *bytes_sent)
{
QEMUFileRDMA *rfile = opaque;
RDMAContext *rdma = rfile->rdma;
+ RDMACurrentChunk *cc = &rdma->chunk_remote;
int ret;
CHECK_ERROR_STATE();
@@ -2716,12 +3503,27 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
* is full, or the page doen't belong to the current chunk,
* an actual RDMA write will occur and a new chunk will be formed.
*/
- ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
- if (ret < 0) {
- fprintf(stderr, "rdma migration: write error! %d\n", ret);
+ cc->block_offset = block_offset;
+ cc->offset = offset;
+
+ ret = qemu_rdma_flush_unmergable(rdma, cc, NULL, f, size);
+
+ if (ret) {
+ ERROR(NULL, "remote flush unmergable");
goto err;
}
+ if (cc->current_length >= RDMA_MERGE_MAX) {
+ ret = qemu_rdma_write(f, rdma, cc, NULL);
+
+ if (ret < 0) {
+ ERROR(NULL, "remote write! %d", ret);
+ goto err;
+ }
+ } else {
+ ret = 0;
+ }
+
/*
* We always return 1 bytes because the RDMA
* protocol is completely asynchronous. We do not yet know
@@ -2734,64 +3536,22 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
*bytes_sent = 1;
}
} else {
- uint64_t index, chunk;
-
- /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
- if (size < 0) {
- ret = qemu_rdma_drain_cq(f, rdma);
- if (ret < 0) {
- fprintf(stderr, "rdma: failed to synchronously drain"
- " completion queue before unregistration.\n");
- goto err;
- }
- }
- */
-
- ret = qemu_rdma_search_ram_block(rdma, block_offset,
- offset, size, &index, &chunk);
+ ret = qemu_rdma_instruct_unregister(rdma, f, block_offset, offset, size);
if (ret) {
- fprintf(stderr, "ram block search failed\n");
goto err;
}
-
- qemu_rdma_signal_unregister(rdma, index, chunk, 0);
-
- /*
- * TODO: Synchronous, guaranteed unregistration (should not occur during
- * fast-path). Otherwise, unregisters will process on the next call to
- * qemu_rdma_drain_cq()
- if (size < 0) {
- qemu_rdma_unregister_waiting(rdma);
- }
- */
}
- /*
- * Drain the Completion Queue if possible, but do not block,
- * just poll.
- *
- * If nothing to poll, the end of the iteration will do this
- * again to make sure we don't overflow the request queue.
- */
- while (1) {
- uint64_t wr_id, wr_id_in;
- int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
- if (ret < 0) {
- fprintf(stderr, "rdma migration: polling error! %d\n", ret);
- goto err;
- }
-
- wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
+ ret = qemu_rdma_poll_until_empty(rdma, &rdma->lc_remote);
- if (wr_id == RDMA_WRID_NONE) {
- break;
- }
+ if (ret) {
+ goto err;
}
return RAM_SAVE_CONTROL_DELAYED;
err:
- rdma->error_state = ret;
+ SET_ERROR(rdma, ret);
return ret;
}
@@ -2829,6 +3589,13 @@ static int qemu_rdma_accept(RDMAContext *rdma)
goto err_rdma_dest_wait;
}
+ rdma->keepalive_rkey = cap.keepalive_rkey;
+ rdma->keepalive_addr = cap.keepalive_addr;
+
+ DDPRINTF("Received keepalive params: key %x addr: %" PRIx64
+ " local %" PRIx64 "\n",
+ cap.keepalive_rkey, cap.keepalive_addr, (uint64_t) &rdma->keepalive);
+
/*
* Respond with only the capabilities this version of QEMU knows about.
*/
@@ -2838,9 +3605,8 @@ static int qemu_rdma_accept(RDMAContext *rdma)
* Enable the ones that we do know about.
* Add other checks here as new ones are introduced.
*/
- if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
- rdma->pin_all = true;
- }
+ rdma->pin_all = cap.flags & RDMA_CAPABILITY_PIN_ALL;
+ rdma->do_keepalive = cap.flags & RDMA_CAPABILITY_KEEPALIVE;
rdma->cm_id = cm_event->id;
verbs = cm_event->id->verbs;
@@ -2848,43 +3614,56 @@ static int qemu_rdma_accept(RDMAContext *rdma)
rdma_ack_cm_event(cm_event);
DPRINTF("Memory pin all: %s\n", rdma->pin_all ? "enabled" : "disabled");
-
- caps_to_network(&cap);
+ DPRINTF("Keepalives: %s\n", rdma->do_keepalive ? "enabled" : "disabled");
DPRINTF("verbs context after listen: %p\n", verbs);
- if (!rdma->verbs) {
- rdma->verbs = verbs;
- } else if (rdma->verbs != verbs) {
- fprintf(stderr, "ibv context not matching %p, %p!\n",
- rdma->verbs, verbs);
- goto err_rdma_dest_wait;
+ if (!rdma->lc_remote.verbs) {
+ rdma->lc_remote.verbs = verbs;
+ } else if (rdma->lc_remote.verbs != verbs) {
+ ERROR(NULL, "ibv context %p != %p!", rdma->lc_remote.verbs, verbs);
+ goto err_rdma_dest_wait;
}
qemu_rdma_dump_id("dest_init", verbs);
- ret = qemu_rdma_alloc_pd_cq(rdma);
+ ret = qemu_rdma_alloc_pd_cq(rdma, &rdma->lc_remote);
if (ret) {
- fprintf(stderr, "rdma migration: error allocating pd and cq!\n");
+ ERROR(NULL, "error allocating pd and cq!");
goto err_rdma_dest_wait;
}
+ ret = qemu_rdma_alloc_keepalive(rdma);
+
+ if (ret) {
+ ERROR(NULL, "allocating keepalive structures");
+ goto err_rdma_dest_wait;
+ }
+
+ cap.keepalive_rkey = rdma->keepalive_mr->rkey,
+ cap.keepalive_addr = (uint64_t) &rdma->keepalive;
+
+ DDPRINTF("Sending keepalive params: key %x addr: %" PRIx64
+ " remote: %" PRIx64 "\n",
+ cap.keepalive_rkey, cap.keepalive_addr, rdma->keepalive_addr);
+ caps_to_network(&cap);
+
ret = qemu_rdma_alloc_qp(rdma);
if (ret) {
- fprintf(stderr, "rdma migration: error allocating qp!\n");
+ ERROR(NULL, "allocating qp!");
goto err_rdma_dest_wait;
}
ret = qemu_rdma_init_ram_blocks(rdma);
if (ret) {
- fprintf(stderr, "rdma migration: error initializing ram blocks!\n");
+ ERROR(NULL, "initializing ram blocks!");
goto err_rdma_dest_wait;
}
for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
ret = qemu_rdma_reg_control(rdma, idx);
if (ret) {
- fprintf(stderr, "rdma: error registering %d control!\n", idx);
+ ERROR(NULL, "registering %d control!", idx);
goto err_rdma_dest_wait;
}
}
@@ -2893,18 +3672,18 @@ static int qemu_rdma_accept(RDMAContext *rdma)
ret = rdma_accept(rdma->cm_id, &conn_param);
if (ret) {
- fprintf(stderr, "rdma_accept returns %d!\n", ret);
+ ERROR(NULL, "rdma_accept returns %d!", ret);
goto err_rdma_dest_wait;
}
ret = rdma_get_cm_event(rdma->channel, &cm_event);
if (ret) {
- fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
+ ERROR(NULL, "rdma_accept get_cm_event failed %d!", ret);
goto err_rdma_dest_wait;
}
if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
- fprintf(stderr, "rdma_accept not event established!\n");
+ ERROR(NULL, "rdma_accept not event established!");
rdma_ack_cm_event(cm_event);
goto err_rdma_dest_wait;
}
@@ -2914,7 +3693,7 @@ static int qemu_rdma_accept(RDMAContext *rdma)
ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
if (ret) {
- fprintf(stderr, "rdma migration: error posting second control recv!\n");
+ ERROR(NULL, "posting second control recv!");
goto err_rdma_dest_wait;
}
@@ -2923,18 +3702,16 @@ static int qemu_rdma_accept(RDMAContext *rdma)
return 0;
err_rdma_dest_wait:
- rdma->error_state = ret;
- qemu_rdma_cleanup(rdma);
+ SET_ERROR(rdma, ret);
+ qemu_rdma_cleanup(rdma, false);
return ret;
}
/*
* During each iteration of the migration, we listen for instructions
- * by the source VM to perform dynamic page registrations before they
+ * by the source VM to perform pinning operations before they
* can perform RDMA operations.
*
- * We respond with the 'rkey'.
- *
* Keep doing this until the source tells us to stop.
*/
static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
@@ -2957,8 +3734,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
RDMARegister *reg, *registers;
RDMACompress *comp;
RDMARegisterResult *reg_result;
- static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
RDMALocalBlock *block;
+ static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
void *host_addr;
int ret = 0;
int idx = 0;
@@ -3009,8 +3786,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
if (rdma->pin_all) {
ret = qemu_rdma_reg_whole_ram_blocks(rdma);
if (ret) {
- fprintf(stderr, "rdma migration: error dest "
- "registering ram blocks!\n");
+ ERROR(NULL, "dest registering ram blocks!");
goto out;
}
}
@@ -3043,7 +3819,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
(uint8_t *) rdma->block, &blocks);
if (ret < 0) {
- fprintf(stderr, "rdma migration: error sending remote info!\n");
+ ERROR(NULL, "sending remote info!");
goto out;
}
@@ -3055,8 +3831,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
for (count = 0; count < head.repeat; count++) {
- uint64_t chunk;
- uint8_t *chunk_start, *chunk_end;
+ RDMACurrentChunk cc;
reg = ®isters[count];
network_to_register(reg);
@@ -3065,30 +3840,28 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
DDPRINTF("Registration request (%d): index %d, current_addr %"
PRIu64 " chunks: %" PRIu64 "\n", count,
- reg->current_index, reg->key.current_addr, reg->chunks);
-
- block = &(rdma->local_ram_blocks.block[reg->current_index]);
- if (block->is_ram_block) {
- host_addr = (block->local_host_addr +
- (reg->key.current_addr - block->offset));
- chunk = ram_chunk_index(block->local_host_addr,
- (uint8_t *) host_addr);
+ reg->current_block_idx, reg->key.current_addr, reg->chunks);
+
+ cc.block = &(rdma->local_ram_blocks.block[reg->current_block_idx]);
+ if (cc.block->is_ram_block) {
+ cc.addr = (cc.block->local_host_addr +
+ (reg->key.current_addr - cc.block->offset));
+ cc.chunk_idx = ram_chunk_index(block->local_host_addr, cc.addr);
} else {
- chunk = reg->key.chunk;
- host_addr = block->local_host_addr +
+ cc.chunk_idx = reg->key.chunk;
+ cc.addr = cc.block->local_host_addr +
(reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
}
- chunk_start = ram_chunk_start(block, chunk);
- chunk_end = ram_chunk_end(block, chunk + reg->chunks);
- if (qemu_rdma_register_and_get_keys(rdma, block,
- (uint8_t *)host_addr, NULL, ®_result->rkey,
- chunk, chunk_start, chunk_end)) {
+ cc.chunk_start = ram_chunk_start(cc.block, cc.chunk_idx);
+ cc.chunk_end = ram_chunk_end(cc.block, cc.chunk_idx + reg->chunks);
+ if (qemu_rdma_register_and_get_keys(rdma, &cc, &rdma->lc_remote,
+ false, NULL, ®_result->rkey)) {
fprintf(stderr, "cannot get rkey!\n");
ret = -EINVAL;
goto out;
}
- reg_result->host_addr = (uint64_t) block->local_host_addr;
+ reg_result->host_addr = (uint64_t) cc.block->local_host_addr;
DDPRINTF("Registered rkey for this request: %x\n",
reg_result->rkey);
@@ -3115,9 +3888,9 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
DDPRINTF("Unregistration request (%d): "
" index %d, chunk %" PRIu64 "\n",
- count, reg->current_index, reg->key.chunk);
+ count, reg->current_block_idx, reg->key.chunk);
- block = &(rdma->local_ram_blocks.block[reg->current_index]);
+ block = &(rdma->local_ram_blocks.block[reg->current_block_idx]);
ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
block->pmr[reg->key.chunk] = NULL;
@@ -3154,7 +3927,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
} while (1);
out:
if (ret < 0) {
- rdma->error_state = ret;
+ SET_ERROR(rdma, ret);
}
return ret;
}
@@ -3168,7 +3941,23 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
CHECK_ERROR_STATE();
DDDPRINTF("start section: %" PRIu64 "\n", flags);
- qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
+
+ if (flags == RAM_CONTROL_FLUSH) {
+ int ret;
+
+ if (rdma->source) {
+ ret = qemu_rdma_drain_cq(f, rdma, &rdma->chunk_local_src,
+ &rdma->chunk_local_dest);
+
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ } else {
+ qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
+ }
+
qemu_fflush(f);
return 0;
@@ -3190,7 +3979,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
CHECK_ERROR_STATE();
qemu_fflush(f);
- ret = qemu_rdma_drain_cq(f, rdma);
+ ret = qemu_rdma_drain_cq(f, rdma, &rdma->chunk_remote, NULL);
if (ret < 0) {
goto err;
@@ -3225,13 +4014,13 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
/*
* The protocol uses two different sets of rkeys (mutually exclusive):
* 1. One key to represent the virtual address of the entire ram block.
- * (dynamic chunk registration disabled - pin everything with one rkey.)
+ * (pinning enabled - pin everything with one rkey.)
* 2. One to represent individual chunks within a ram block.
- * (dynamic chunk registration enabled - pin individual chunks.)
+ * (pinning disabled - pin individual chunks.)
*
* Once the capability is successfully negotiated, the destination transmits
* the keys to use (or sends them later) including the virtual addresses
- * and then propagates the remote ram block descriptions to his local copy.
+ * and then propagates the remote ram block descriptions to their local copy.
*/
if (local->nb_blocks != nb_remote_blocks) {
@@ -3285,7 +4074,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
return 0;
err:
- rdma->error_state = ret;
+ SET_ERROR(rdma, ret);
return ret;
}
@@ -3294,7 +4083,23 @@ static int qemu_rdma_get_fd(void *opaque)
QEMUFileRDMA *rfile = opaque;
RDMAContext *rdma = rfile->rdma;
- return rdma->comp_channel->fd;
+ return rdma->lc_remote.comp_chan->fd;
+}
+
+static int qemu_rdma_delete_block(QEMUFile *f, void *opaque,
+ ram_addr_t block_offset)
+{
+ QEMUFileRDMA *rfile = opaque;
+ return __qemu_rdma_delete_block(rfile->rdma, block_offset);
+}
+
+
+static int qemu_rdma_add_block(QEMUFile *f, void *opaque, void *host_addr,
+ ram_addr_t block_offset, uint64_t length)
+{
+ QEMUFileRDMA *rfile = opaque;
+ return __qemu_rdma_add_block(rfile->rdma, host_addr,
+ block_offset, length);
}
const QEMUFileOps rdma_read_ops = {
@@ -3302,6 +4107,9 @@ const QEMUFileOps rdma_read_ops = {
.get_fd = qemu_rdma_get_fd,
.close = qemu_rdma_close,
.hook_ram_load = qemu_rdma_registration_handle,
+ .copy_page = qemu_rdma_copy_page,
+ .add = qemu_rdma_add_block,
+ .remove = qemu_rdma_delete_block,
};
const QEMUFileOps rdma_write_ops = {
@@ -3310,6 +4118,9 @@ const QEMUFileOps rdma_write_ops = {
.before_ram_iterate = qemu_rdma_registration_start,
.after_ram_iterate = qemu_rdma_registration_stop,
.save_page = qemu_rdma_save_page,
+ .copy_page = qemu_rdma_copy_page,
+ .add = qemu_rdma_add_block,
+ .remove = qemu_rdma_delete_block,
};
static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
@@ -3331,6 +4142,98 @@ static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
return r->file;
}
+static int connect_local(RDMAContext *rdma,
+ RDMALocalContext *src,
+ RDMALocalContext *dest)
+{
+ int ret;
+ struct ibv_qp_attr next = {
+ .qp_state = IBV_QPS_RTR,
+ .path_mtu = IBV_MTU_1024,
+ .dest_qp_num = src->qp->qp_num,
+ .rq_psn = src->psn,
+ .max_dest_rd_atomic = 1,
+ .min_rnr_timer = 12,
+ .ah_attr = {
+ .is_global = 0,
+ .dlid = src->port.lid,
+ .sl = 0,
+ .src_path_bits = 0,
+ .port_num = src->port_num,
+ }
+ };
+
+ if(src->gid.global.interface_id) {
+ next.ah_attr.is_global = 1;
+ next.ah_attr.grh.hop_limit = 1;
+ next.ah_attr.grh.dgid = src->gid;
+ next.ah_attr.grh.sgid_index = 0;
+ }
+
+ ret = ibv_modify_qp(dest->qp, &next,
+ IBV_QP_STATE |
+ IBV_QP_AV |
+ IBV_QP_PATH_MTU |
+ IBV_QP_DEST_QPN |
+ IBV_QP_RQ_PSN |
+ IBV_QP_MAX_DEST_RD_ATOMIC |
+ IBV_QP_MIN_RNR_TIMER);
+
+ if (ret) {
+ SET_ERROR(rdma, -ret);
+ ERROR(NULL, "modify src verbs to ready");
+ return rdma->error_state;
+ }
+
+ next.qp_state = IBV_QPS_RTS;
+ next.timeout = 14;
+ next.retry_cnt = 7;
+ next.rnr_retry = 7;
+ next.sq_psn = dest->psn;
+ next.max_rd_atomic = 1;
+
+ ret = ibv_modify_qp(dest->qp, &next,
+ IBV_QP_STATE |
+ IBV_QP_TIMEOUT |
+ IBV_QP_RETRY_CNT |
+ IBV_QP_RNR_RETRY |
+ IBV_QP_SQ_PSN |
+ IBV_QP_MAX_QP_RD_ATOMIC);
+
+ if (ret) {
+ SET_ERROR(rdma, -ret);
+ ERROR(NULL, "modify dest verbs to ready\n");
+ return rdma->error_state;
+ }
+
+ return 0;
+}
+
+static int init_local(RDMAContext *rdma)
+{
+ DDPRINTF("Opening copy local source queue pair...\n");
+ if (open_local(rdma, &rdma->lc_src)) {
+ return 1;
+ }
+
+ DDPRINTF("Opening copy local destination queue pair...\n");
+ if (open_local(rdma, &rdma->lc_dest)) {
+ return 1;
+ }
+
+ DDPRINTF("Connecting local src queue pairs...\n");
+ if (connect_local(rdma, &rdma->lc_src, &rdma->lc_dest)) {
+ return 1;
+ }
+
+ DDPRINTF("Connecting local dest queue pairs...\n");
+ if (connect_local(rdma, &rdma->lc_dest, &rdma->lc_src)) {
+ return 1;
+ }
+
+ return 0;
+}
+
static void rdma_accept_incoming_migration(void *opaque)
{
RDMAContext *rdma = opaque;
@@ -3342,21 +4245,32 @@ static void rdma_accept_incoming_migration(void *opaque)
ret = qemu_rdma_accept(rdma);
if (ret) {
- ERROR(errp, "RDMA Migration initialization failed!");
+ ERROR(errp, "initialization failed!");
return;
}
DPRINTF("Accepted migration\n");
+ if (init_local(rdma)) {
+ ERROR(errp, "could not initialize local rdma queue pairs!");
+ goto err;
+ }
+
f = qemu_fopen_rdma(rdma, "rb");
if (f == NULL) {
ERROR(errp, "could not qemu_fopen_rdma!");
- qemu_rdma_cleanup(rdma);
- return;
+ goto err;
}
- rdma->migration_started_on_destination = 1;
+ if (rdma->do_keepalive) {
+ qemu_rdma_keepalive_start();
+ }
+
+ rdma->migration_started = 1;
process_incoming_migration(f);
+ return;
+err:
+ qemu_rdma_cleanup(rdma, false);
}
void rdma_start_incoming_migration(const char *host_port, Error **errp)
@@ -3372,6 +4286,9 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp)
goto err;
}
+ rdma->source = false;
+ rdma->dest = true;
+
ret = qemu_rdma_dest_init(rdma, &local_err);
if (ret) {
@@ -3411,8 +4328,10 @@ void rdma_start_outgoing_migration(void *opaque,
goto err;
}
- ret = qemu_rdma_source_init(rdma, &local_err,
- s->enabled_capabilities[MIGRATION_CAPABILITY_X_RDMA_PIN_ALL]);
+ rdma->source = true;
+ rdma->dest = false;
+
+ ret = qemu_rdma_source_init(rdma, &local_err, s);
if (ret) {
goto err;
@@ -3425,9 +4344,20 @@ void rdma_start_outgoing_migration(void *opaque,
goto err;
}
+ if (init_local(rdma)) {
+ ERROR(temp, "could not initialize local rdma queue pairs!");
+ goto err;
+ }
+
DPRINTF("qemu_rdma_source_connect success\n");
s->file = qemu_fopen_rdma(rdma, "wb");
+ rdma->migration_started = 1;
+
+ if (rdma->do_keepalive) {
+ qemu_rdma_keepalive_start();
+ }
+
migrate_fd_connect(s);
return;
err:
@@ -647,6 +647,31 @@ void ram_control_after_iterate(QEMUFile *f, uint64_t flags)
}
}
+void ram_control_add(QEMUFile *f, void *host_addr,
+ ram_addr_t block_offset, uint64_t length)
+{
+ int ret = 0;
+
+ if (f->ops->add) {
+ ret = f->ops->add(f, f->opaque, host_addr, block_offset, length);
+ if (ret < 0) {
+ qemu_file_set_error(f, ret);
+ }
+ }
+}
+
+void ram_control_remove(QEMUFile *f, ram_addr_t block_offset)
+{
+ int ret = 0;
+
+ if (f->ops->remove) {
+ ret = f->ops->remove(f, f->opaque, block_offset);
+ if (ret < 0) {
+ qemu_file_set_error(f, ret);
+ }
+ }
+}
+
void ram_control_load_hook(QEMUFile *f, uint64_t flags)
{
int ret = -EINVAL;
@@ -703,6 +728,33 @@ int ram_control_load_page(QEMUFile *f, void *host_addr, long size)
return RAM_LOAD_CONTROL_NOT_SUPP;
}
+int ram_control_copy_page(QEMUFile *f,
+ ram_addr_t block_offset_dest,
+ ram_addr_t offset_dest,
+ ram_addr_t block_offset_source,
+ ram_addr_t offset_source,
+ long size)
+{
+ if (f->ops->copy_page) {
+ int ret = f->ops->copy_page(f, f->opaque,
+ block_offset_dest,
+ offset_dest,
+ block_offset_source,
+ offset_source,
+ size);
+
+ if (ret != RAM_COPY_CONTROL_DELAYED) {
+ if (ret < 0) {
+ qemu_file_set_error(f, ret);
+ }
+ }
+
+ return ret;
+ }
+
+ return RAM_COPY_CONTROL_NOT_SUPP;
+}
+
static void qemu_fill_buffer(QEMUFile *f)
{
int len;