diff mbox

[RFC,v2,01/10] virtagent: add common rpc transport defs

Message ID 1288784139-1110-2-git-send-email-mdroth@linux.vnet.ibm.com
State New
Headers show

Commit Message

Michael Roth Nov. 3, 2010, 11:35 a.m. UTC
Common code for sending/recieving RPCs via http over virtproxy channel.
All communication is done via asynchronous read/write handlers and using
non-blocking reads/writes

Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
---
 virtagent-common.c |  431 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 virtagent-common.h |   73 +++++++++
 2 files changed, 504 insertions(+), 0 deletions(-)
 create mode 100644 virtagent-common.c
 create mode 100644 virtagent-common.h
diff mbox

Patch

diff --git a/virtagent-common.c b/virtagent-common.c
new file mode 100644
index 0000000..cc58938
--- /dev/null
+++ b/virtagent-common.c
@@ -0,0 +1,431 @@ 
+/*
+ * virt-agent - common host/guest RPC functions
+ *
+ * Copyright IBM Corp. 2010
+ *
+ * Authors:
+ *  Adam Litke        <aglitke@linux.vnet.ibm.com>
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "virtagent-common.h"
+
+#define VA_READ true
+#define VA_SEND false
+
+enum va_rpc_type {
+    VA_RPC_REQUEST,
+    VA_RPC_RESPONSE,
+};
+
+typedef struct VARPCState {
+    char hdr[VA_HDR_LEN_MAX];
+    int fd;
+    size_t hdr_len;
+    size_t hdr_pos;
+    enum {
+        VA_READ_START,
+        VA_READ_HDR,
+        VA_READ_BODY,
+        VA_SEND_START,
+        VA_SEND_HDR,
+        VA_SEND_BODY,
+    } state;
+    enum va_rpc_type rpc_type;
+    char *content;
+    size_t content_len;
+    size_t content_pos;
+    VARPCData *data;
+} VARPCState;
+
+static void va_rpc_read_handler(void *opaque);
+static void va_rpc_send_handler(void *opaque);
+
+static int end_of_header(char *buf, int end_pos)
+{
+    return !strncmp(buf+(end_pos-2), "\n\r\n", 3);
+}
+
+static void va_rpc_hdr_init(VARPCState *s) {
+    const char *preamble;
+
+    TRACE("called");
+    /* essentially ignored in the context of virtagent, but might as well */
+    if (s->rpc_type == VA_RPC_REQUEST) {
+        preamble = "POST /RPC2 HTTP/1.1";
+    } else if (s->rpc_type == VA_RPC_RESPONSE) {
+        preamble = "HTTP/1.1 200 OK";
+    } else {
+        s->hdr_len = 0;
+        return;
+    }
+
+    s->hdr_len = sprintf(s->hdr,
+                         "%s" EOL
+                         "Content-Type: text/xml" EOL
+                         "Content-Length: %u" EOL EOL,
+                         preamble,
+                         (uint32_t)s->content_len);
+}
+
+static void va_rpc_parse_hdr(VARPCState *s)
+{
+    int i, line_pos = 0;
+    char line_buf[4096];
+
+    for (i = 0; i < VA_HDR_LEN_MAX; ++i) {
+        if (s->hdr[i] != '\n') {
+            /* read line */
+            line_buf[line_pos++] = s->hdr[i];
+        } else {
+            /* process line */
+            if (strncmp(line_buf, "Content-Length: ", 16) == 0) {
+                s->content_len = atoi(&line_buf[16]);
+                return;
+            }
+            line_pos = 0;
+        }
+    }
+}
+
+static VARPCState *va_rpc_state_new(VARPCData *data, int fd,
+                                    enum va_rpc_type rpc_type, bool read)
+{
+    VARPCState *s = qemu_mallocz(sizeof(VARPCState));
+
+    s->rpc_type = rpc_type;
+    s->fd = fd;
+    s->data = data;
+    if (s->data == NULL) {
+        goto EXIT_BAD;
+    }
+
+    if (read) {
+        s->state = VA_READ_START;
+        s->content = NULL;
+    } else {
+        s->state = VA_SEND_START;
+        if (rpc_type == VA_RPC_REQUEST) {
+            s->content = XMLRPC_MEMBLOCK_CONTENTS(char, s->data->send_req_xml);
+            s->content_len = XMLRPC_MEMBLOCK_SIZE(char, s->data->send_req_xml);
+        } else if (rpc_type == VA_RPC_RESPONSE) {
+            s->content = XMLRPC_MEMBLOCK_CONTENTS(char, s->data->send_resp_xml);
+            s->content_len = XMLRPC_MEMBLOCK_SIZE(char, s->data->send_resp_xml);
+        } else {
+            LOG("unknown rcp type");
+            goto EXIT_BAD;
+        }
+        va_rpc_hdr_init(s);
+        if (s->hdr_len == 0) {
+            LOG("failed to initialize http header");
+            goto EXIT_BAD;
+        }
+    }
+
+    return s;
+EXIT_BAD:
+    qemu_free(s);
+    return NULL;
+}
+
+/* called by va_rpc_read_handler after reading requests */
+static int va_rpc_send_response(VARPCData *data, int fd)
+{
+    VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_RESPONSE, VA_SEND);
+
+    TRACE("called");
+    if (s == NULL) {
+        LOG("failed to set up RPC state");
+        return -1;
+    }
+    TRACE("setting up send handler for RPC request");
+    vp_set_fd_handler(fd, NULL, va_rpc_send_handler, s);
+
+    return 0;
+}
+
+static void va_rpc_read_handler_completion(VARPCState *s) {
+    int ret;
+
+    if (s->rpc_type == VA_RPC_REQUEST) {
+        /* server read request, call it's cb function then set up
+         * a send handler for the rpc response if there weren't any
+         * communication errors
+         */ 
+        s->data->cb(s->data);
+        if (s->data->status == VA_RPC_STATUS_OK) {
+            ret = va_rpc_send_response(s->data, s->fd);
+            if (ret != 0) {
+                LOG("error setting up send handler for rpc response");
+            }
+        } else {
+            LOG("error reading rpc request, skipping response");
+            vp_set_fd_handler(s->fd, NULL, NULL, NULL);
+            closesocket(s->fd);
+            qemu_free(s->data);
+        }
+    } else if (s->rpc_type == VA_RPC_RESPONSE) {
+        /* client read response, call it's cb function and complete
+         * the RPC
+         */
+        s->data->cb(s->data);
+        vp_set_fd_handler(s->fd, NULL, NULL, NULL);
+        closesocket(s->fd);
+        qemu_free(s->data);
+    } else {
+        LOG("unknown rpc_type");
+    }
+    if (s->content != NULL) {
+        qemu_free(s->content);
+    }
+    qemu_free(s);
+}
+
+static void va_rpc_read_handler(void *opaque)
+{
+    VARPCState *s = opaque;
+    int ret;
+
+    TRACE("called with opaque: %p", opaque);
+
+    switch (s->state) {
+    case VA_READ_START:
+        s->state = VA_READ_HDR;
+    case VA_READ_HDR:
+        while((ret = read(s->fd, s->hdr + s->hdr_pos, 1)) > 0
+              && s->hdr_pos < VA_HDR_LEN_MAX) {
+            s->hdr_pos += ret;
+            if (end_of_header(s->hdr, s->hdr_pos - 1)) {
+                break;
+            }
+        }
+        if (ret == -1) {
+            if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                return;
+            } else {
+                LOG("error reading connection: %s", strerror(errno));
+                goto out_bad;
+            }
+        } else if (ret == 0) {
+            LOG("connected closed unexpectedly");
+            goto out_bad;
+        } else if (s->hdr_pos >= VA_HDR_LEN_MAX) {
+            LOG("http header too long");
+            goto out_bad;
+        } else {
+            s->content_len = -1;
+            va_rpc_parse_hdr(s);
+            if (s->content_len == -1) {
+                LOG("malformed http header");
+                goto out_bad;
+            } else if (s->content_len > VA_CONTENT_LEN_MAX) {
+                LOG("http content length too long");
+                goto out_bad;
+            }
+            s->content = qemu_mallocz(s->content_len);
+            s->state = VA_READ_BODY;
+        }
+    case VA_READ_BODY:
+        while(s->content_pos < s->content_len) {
+            ret = read(s->fd, s->content + s->content_pos,
+                       s->content_len - s->content_pos);
+            if (ret == -1) {
+                if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                    return;
+                } else {
+                    LOG("error reading connection: %s", strerror(errno));
+                    goto out_bad;
+                }
+            } else if (ret == 0) {
+                LOG("connection closed unexpectedly:"
+                    " read %u bytes, expected %u bytes",
+                    (unsigned int)s->content_pos, (unsigned int)s->content_len);
+                goto out_bad;
+            }
+            s->content_pos += ret;
+        }
+
+        if (s->rpc_type == VA_RPC_REQUEST) {
+            s->data->req_xml = s->content;
+            s->data->req_xml_len = s->content_len;
+        } else if (s->rpc_type == VA_RPC_RESPONSE) {
+            s->data->resp_xml = s->content;
+            s->data->resp_xml_len = s->content_len;
+        }
+        s->data->status = VA_RPC_STATUS_OK;
+        goto out;
+    default:
+        LOG("unknown state");
+        goto out_bad;
+    }
+
+out_bad:
+    s->data->status = VA_RPC_STATUS_ERR;
+out:
+    va_rpc_read_handler_completion(s);
+}
+
+/* called by va_rpc_send_handler after sending requests */
+static int va_rpc_read_response(VARPCData *data, int fd)
+{
+    VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_RESPONSE, VA_READ);
+
+    TRACE("called");
+    if (s == NULL) {
+        LOG("failed to set up RPC state");
+        return -1;
+    }
+    TRACE("setting up send handler for RPC request");
+    vp_set_fd_handler(fd, NULL, va_rpc_read_handler, s);
+
+    return 0;
+}
+
+static void va_rpc_send_handler_completion(VARPCState *s) {
+    int ret;
+
+    if (s->rpc_type == VA_RPC_REQUEST) {
+        /* client sent request. free request's memblock, and set up read
+         * handler for server response if there weren't any communication
+         * errors
+         */
+        XMLRPC_MEMBLOCK_FREE(char, s->data->send_req_xml);
+        if (s->data->status == VA_RPC_STATUS_OK) {
+            ret = va_rpc_read_response(s->data, s->fd);
+            if (ret != 0) {
+                LOG("error setting up read handler for rpc response");
+            }
+        } else {
+            s->data->cb(s->data);
+            LOG("error sending rpc request, skipping response");
+            vp_set_fd_handler(s->fd, NULL, NULL, NULL);
+            closesocket(s->fd);
+            qemu_free(s->data);
+        }
+    } else if (s->rpc_type == VA_RPC_RESPONSE) {
+        /* server sent response. call it's cb once more, then free
+         * response's memblock and complete the RPC
+         */
+        s->data->cb(s->data);
+        XMLRPC_MEMBLOCK_FREE(char, s->data->send_resp_xml);
+        vp_set_fd_handler(s->fd, NULL, NULL, NULL);
+        closesocket(s->fd);
+        qemu_free(s->data);
+    } else {
+        LOG("unknown rpc_type");
+    }
+    qemu_free(s);
+}
+
+static void va_rpc_send_handler(void *opaque)
+{
+    VARPCState *s = opaque;
+    int ret;
+
+    TRACE("called with opaque: %p", opaque);
+
+    switch (s->state) {
+    case VA_SEND_START:
+        s->state = VA_SEND_HDR;
+    case VA_SEND_HDR:
+        do {
+            ret = write(s->fd, s->hdr + s->hdr_pos, s->hdr_len - s->hdr_pos);
+            if (ret <= 0) {
+                break;
+            }
+            s->hdr_pos += ret;
+        } while (s->hdr_pos < s->hdr_len);
+        if (ret == -1) {
+            if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                return;
+            } else {
+                LOG("error reading connection: %s", strerror(errno));
+                goto out_bad;
+            }
+        } else if (ret == 0) {
+            LOG("connected closed unexpectedly");
+            goto out_bad;
+        } else {
+            s->state = VA_SEND_BODY;
+        }
+    case VA_SEND_BODY:
+        do {
+            ret = write(s->fd, s->content + s->content_pos,
+                        s->content_len - s->content_pos);
+            if (ret <= 0) {
+                break;
+            }
+            s->content_pos += ret;
+        } while (s->content_pos < s->content_len);
+        if (ret == -1) {
+            if (errno == EAGAIN || errno == EWOULDBLOCK) {
+                return;
+            } else {
+                LOG("error reading connection: %s", strerror(errno));
+                goto out_bad;
+            }
+        } else if (ret == 0) {
+            LOG("connected closed unexpectedly");
+            goto out_bad;
+        } else {
+            s->data->status = VA_RPC_STATUS_OK;
+            goto out;
+        }
+    default:
+        LOG("unknown state");
+        goto out_bad;
+    }
+
+out_bad:
+    s->data->status = VA_RPC_STATUS_ERR;
+out:
+    va_rpc_send_handler_completion(s);
+}
+
+/* called by rpc client
+ * one callback to data->cb after response is read.
+ * data and data->send_req_xml should be allocated by caller,
+ * callee will de-allocate these after calling data->cb(data)
+ *
+ * if non-zero returned however, caller should free data and hanging refs
+ */ 
+int va_rpc_send_request(VARPCData *data, int fd)
+{
+    VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_REQUEST, VA_SEND);
+
+    TRACE("called");
+    if (s == NULL) {
+        LOG("failed to set up RPC state");
+        return -1;
+    }
+    TRACE("setting up send handler for RPC request");
+    vp_set_fd_handler(fd, NULL, va_rpc_send_handler, s);
+
+    return 0;
+}
+
+/* called by rpc server
+ * one callback to current data->cb after read, one callback after send.
+ * data should be allocated by caller, data->send_resp_xml should be
+ * allocated by first data->cb(data) callback, "callee" will de-allocate
+ * data and data->send_resp_xml after sending rpc response
+ *
+ * if non-zero returned however, caller should free data and hanging refs
+ */
+int va_rpc_read_request(VARPCData *data, int fd)
+{
+    VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_REQUEST, VA_READ);
+
+    TRACE("called");
+    if (s == NULL) {
+        LOG("failed to set up RPC state");
+        return -1;
+    }
+    TRACE("setting up read handler for RPC request");
+    vp_set_fd_handler(fd, va_rpc_read_handler, NULL, s);
+    return 0;
+}
diff --git a/virtagent-common.h b/virtagent-common.h
new file mode 100644
index 0000000..6a58bcd
--- /dev/null
+++ b/virtagent-common.h
@@ -0,0 +1,73 @@ 
+/*
+ * virt-agent - host/guest RPC client functions
+ *
+ * Copyright IBM Corp. 2010
+ *
+ * Authors:
+ *  Adam Litke        <aglitke@linux.vnet.ibm.com>
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+#ifndef VIRTAGENT_COMMON_H
+#define VIRTAGENT_COMMON_H
+
+#include <xmlrpc-c/base.h>
+#include <xmlrpc-c/client.h>
+#include <xmlrpc-c/server.h>
+#include "qemu-common.h"
+#include "qemu_socket.h"
+#include "monitor.h"
+#include "virtproxy.h"
+
+#define DEBUG_VA
+
+#ifdef DEBUG_VA
+#define TRACE(msg, ...) do { \
+    fprintf(stderr, "%s:%s():L%d: " msg "\n", \
+            __FILE__, __FUNCTION__, __LINE__, ## __VA_ARGS__); \
+} while(0)
+#else
+#define TRACE(msg, ...) \
+    do { } while (0)
+#endif
+
+#define LOG(msg, ...) do { \
+    fprintf(stderr, "%s:%s(): " msg "\n", \
+            __FILE__, __FUNCTION__, ## __VA_ARGS__); \
+} while(0)
+
+#define TADDR "127.0.0.1:8080"
+#define URL "http://localhost:8080/RPC2"
+#define NAME "QEMU virt-agent host client"
+#define VERSION "1.0"
+#define EOL "\r\n"
+
+#define VA_RPC_STATUS_OK 0
+#define VA_RPC_STATUS_ERR 1
+#define VA_HDR_LEN_MAX 4096 /* http header limit */
+#define VA_CONTENT_LEN_MAX 2*1024*1024 /* rpc/http send limit */
+
+typedef void (VARPCDataCallback)(void *rpc_data);
+typedef struct VARPCData {
+    VARPCDataCallback *cb;
+    int status;
+    void *opaque;
+    /* provided/allocated by caller for sending as memblocks */
+    xmlrpc_mem_block *send_req_xml;
+    xmlrpc_mem_block *send_resp_xml;
+    /* recieved, and passed to cb func, as char arrays */
+    char *req_xml;
+    int req_xml_len;
+    char *resp_xml;
+    int resp_xml_len;
+    /* for use by QMP functions */
+    MonitorCompletion *mon_cb;
+    void *mon_data;
+} VARPCData;
+
+int va_rpc_send_request(VARPCData *data, int fd);
+int va_rpc_read_request(VARPCData *data, int fd);
+#endif /* VIRTAGENT_COMMON_H */