Message ID | 1275406821-30024-3-git-send-email-tamura.yoshiaki@lab.ntt.co.jp |
---|---|
State | New |
Headers | show |
On 06/01/2010 10:40 AM, Yoshiaki Tamura wrote: > Create a thread to handle tcp incoming migration when CONFIG_IOTHREAD > is enabled. Spawned thread writes it's return status to th_fds[1] > before exit, and main thread joins and reads it. In > tcp_start_incoming_migration(), allocate FdMigrationState and return > MigrationState to let migration to print incoming migration info. > In the absence of any locking, I can't see how this is safe. Regards, Anthony Liguori > Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp> > --- > migration-tcp.c | 86 ++++++++++++++++++++++++++++++++++++++++++++++--------- > migration.h | 2 +- > 2 files changed, 73 insertions(+), 15 deletions(-) > > diff --git a/migration-tcp.c b/migration-tcp.c > index 95ce722..f20e5fe 100644 > --- a/migration-tcp.c > +++ b/migration-tcp.c > @@ -18,6 +18,7 @@ > #include "sysemu.h" > #include "buffered_file.h" > #include "block.h" > +#include "qemu-thread.h" > > //#define DEBUG_MIGRATION_TCP > > @@ -29,6 +30,11 @@ > do { } while (0) > #endif > > +#ifdef CONFIG_IOTHREAD > +static QemuThread migration_thread; > +static int th_fds[2]; > +#endif > + > static int socket_errno(FdMigrationState *s) > { > return socket_error(); > @@ -176,41 +182,93 @@ static void tcp_accept_incoming_migration(void *opaque) > out_fopen: > qemu_fclose(f); > out: > +#ifndef CONFIG_IOTHREAD > qemu_set_fd_handler2(s, NULL, NULL, NULL, NULL); > +#endif > close(s); > close(c); > +#ifdef CONFIG_IOTHREAD > + write(th_fds[1],&ret, sizeof(ret)); > + qemu_thread_exit(NULL); > +#endif > +} > + > +#ifdef CONFIG_IOTHREAD > +static void tcp_incoming_migration_complete(void *opaque) > +{ > + int ret, state = 0; > + FdMigrationState *s = opaque; > + > + qemu_thread_join(&migration_thread, NULL); > + > + ret = read(th_fds[0],&state, sizeof(state)); > + if (ret == -1) { > + fprintf(stderr, "failed to read from pipe\n"); > + goto err; > + } > + > + s->state = state< 0 ? MIG_STATE_ERROR : MIG_STATE_COMPLETED; > + > +err: > + qemu_set_fd_handler2(th_fds[0], NULL, NULL, NULL, NULL); > + close(th_fds[0]); > + close(th_fds[1]); > } > +#endif > > -int tcp_start_incoming_migration(const char *host_port) > +MigrationState *tcp_start_incoming_migration(const char *host_port) > { > struct sockaddr_in addr; > + FdMigrationState *s; > int val; > - int s; > > if (parse_host_port(&addr, host_port)< 0) { > fprintf(stderr, "invalid host/port combination: %s\n", host_port); > - return -EINVAL; > + return NULL; > } > > - s = qemu_socket(PF_INET, SOCK_STREAM, 0); > - if (s == -1) > - return -socket_error(); > + s = qemu_mallocz(sizeof(*s)); > + > + s->get_error = socket_errno; > + s->close = tcp_close; > + s->mig_state.cancel = migrate_fd_cancel; > + s->mig_state.get_status = migrate_fd_get_status; > + s->state = MIG_STATE_ACTIVE; > + > + s->fd = qemu_socket(PF_INET, SOCK_STREAM, 0); > + if (s->fd == -1) { > + qemu_free(s); > + return NULL; > + } > > val = 1; > - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, sizeof(val)); > + setsockopt(s->fd, SOL_SOCKET, SO_REUSEADDR, > + (const char *)&val, sizeof(val)); > > - if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) > + if (bind(s->fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) > goto err; > > - if (listen(s, 1) == -1) > + if (listen(s->fd, 1) == -1) > goto err; > > - qemu_set_fd_handler2(s, NULL, tcp_accept_incoming_migration, NULL, > - (void *)(unsigned long)s); > +#ifdef CONFIG_IOTHREAD > + if (qemu_pipe(th_fds) == -1) { > + fprintf(stderr, "failed to create pipe\n"); > + goto err; > + } > > - return 0; > + qemu_thread_create(&migration_thread, (void *)tcp_accept_incoming_migration, > + (void *)(unsigned long)s->fd); > + qemu_set_fd_handler2(th_fds[0], NULL, tcp_incoming_migration_complete, NULL, > + (void *)s); > +#else > + qemu_set_fd_handler2(s->fd, NULL, tcp_accept_incoming_migration, NULL, > + (void *)(unsigned long)s->fd); > +#endif > + > + return&s->mig_state; > > err: > - close(s); > - return -socket_error(); > + close(s->fd); > + return NULL; > } > diff --git a/migration.h b/migration.h > index 385423f..c11e6db 100644 > --- a/migration.h > +++ b/migration.h > @@ -76,7 +76,7 @@ MigrationState *exec_start_outgoing_migration(Monitor *mon, > int blk, > int inc); > > -int tcp_start_incoming_migration(const char *host_port); > +MigrationState *tcp_start_incoming_migration(const char *host_port); > > MigrationState *tcp_start_outgoing_migration(Monitor *mon, > const char *host_port, >
2010/6/2 Anthony Liguori <aliguori@linux.vnet.ibm.com>: > On 06/01/2010 10:40 AM, Yoshiaki Tamura wrote: >> >> Create a thread to handle tcp incoming migration when CONFIG_IOTHREAD >> is enabled. Spawned thread writes it's return status to th_fds[1] >> before exit, and main thread joins and reads it. In >> tcp_start_incoming_migration(), allocate FdMigrationState and return >> MigrationState to let migration to print incoming migration info. >> > > In the absence of any locking, I can't see how this is safe. Right. If we use threading here, we need to prevent commands from monitor that affects incoming thread. Thanks, Yoshi > > Regards, > > Anthony Liguori > >> Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp> >> --- >> migration-tcp.c | 86 >> ++++++++++++++++++++++++++++++++++++++++++++++--------- >> migration.h | 2 +- >> 2 files changed, 73 insertions(+), 15 deletions(-) >> >> diff --git a/migration-tcp.c b/migration-tcp.c >> index 95ce722..f20e5fe 100644 >> --- a/migration-tcp.c >> +++ b/migration-tcp.c >> @@ -18,6 +18,7 @@ >> #include "sysemu.h" >> #include "buffered_file.h" >> #include "block.h" >> +#include "qemu-thread.h" >> >> //#define DEBUG_MIGRATION_TCP >> >> @@ -29,6 +30,11 @@ >> do { } while (0) >> #endif >> >> +#ifdef CONFIG_IOTHREAD >> +static QemuThread migration_thread; >> +static int th_fds[2]; >> +#endif >> + >> static int socket_errno(FdMigrationState *s) >> { >> return socket_error(); >> @@ -176,41 +182,93 @@ static void tcp_accept_incoming_migration(void >> *opaque) >> out_fopen: >> qemu_fclose(f); >> out: >> +#ifndef CONFIG_IOTHREAD >> qemu_set_fd_handler2(s, NULL, NULL, NULL, NULL); >> +#endif >> close(s); >> close(c); >> +#ifdef CONFIG_IOTHREAD >> + write(th_fds[1],&ret, sizeof(ret)); >> + qemu_thread_exit(NULL); >> +#endif >> +} >> + >> +#ifdef CONFIG_IOTHREAD >> +static void tcp_incoming_migration_complete(void *opaque) >> +{ >> + int ret, state = 0; >> + FdMigrationState *s = opaque; >> + >> + qemu_thread_join(&migration_thread, NULL); >> + >> + ret = read(th_fds[0],&state, sizeof(state)); >> + if (ret == -1) { >> + fprintf(stderr, "failed to read from pipe\n"); >> + goto err; >> + } >> + >> + s->state = state< 0 ? MIG_STATE_ERROR : MIG_STATE_COMPLETED; >> + >> +err: >> + qemu_set_fd_handler2(th_fds[0], NULL, NULL, NULL, NULL); >> + close(th_fds[0]); >> + close(th_fds[1]); >> } >> +#endif >> >> -int tcp_start_incoming_migration(const char *host_port) >> +MigrationState *tcp_start_incoming_migration(const char *host_port) >> { >> struct sockaddr_in addr; >> + FdMigrationState *s; >> int val; >> - int s; >> >> if (parse_host_port(&addr, host_port)< 0) { >> fprintf(stderr, "invalid host/port combination: %s\n", >> host_port); >> - return -EINVAL; >> + return NULL; >> } >> >> - s = qemu_socket(PF_INET, SOCK_STREAM, 0); >> - if (s == -1) >> - return -socket_error(); >> + s = qemu_mallocz(sizeof(*s)); >> + >> + s->get_error = socket_errno; >> + s->close = tcp_close; >> + s->mig_state.cancel = migrate_fd_cancel; >> + s->mig_state.get_status = migrate_fd_get_status; >> + s->state = MIG_STATE_ACTIVE; >> + >> + s->fd = qemu_socket(PF_INET, SOCK_STREAM, 0); >> + if (s->fd == -1) { >> + qemu_free(s); >> + return NULL; >> + } >> >> val = 1; >> - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, >> sizeof(val)); >> + setsockopt(s->fd, SOL_SOCKET, SO_REUSEADDR, >> + (const char *)&val, sizeof(val)); >> >> - if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) >> + if (bind(s->fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) >> goto err; >> >> - if (listen(s, 1) == -1) >> + if (listen(s->fd, 1) == -1) >> goto err; >> >> - qemu_set_fd_handler2(s, NULL, tcp_accept_incoming_migration, NULL, >> - (void *)(unsigned long)s); >> +#ifdef CONFIG_IOTHREAD >> + if (qemu_pipe(th_fds) == -1) { >> + fprintf(stderr, "failed to create pipe\n"); >> + goto err; >> + } >> >> - return 0; >> + qemu_thread_create(&migration_thread, (void >> *)tcp_accept_incoming_migration, >> + (void *)(unsigned long)s->fd); >> + qemu_set_fd_handler2(th_fds[0], NULL, >> tcp_incoming_migration_complete, NULL, >> + (void *)s); >> +#else >> + qemu_set_fd_handler2(s->fd, NULL, tcp_accept_incoming_migration, >> NULL, >> + (void *)(unsigned long)s->fd); >> +#endif >> + >> + return&s->mig_state; >> >> err: >> - close(s); >> - return -socket_error(); >> + close(s->fd); >> + return NULL; >> } >> diff --git a/migration.h b/migration.h >> index 385423f..c11e6db 100644 >> --- a/migration.h >> +++ b/migration.h >> @@ -76,7 +76,7 @@ MigrationState *exec_start_outgoing_migration(Monitor >> *mon, >> int blk, >> int inc); >> >> -int tcp_start_incoming_migration(const char *host_port); >> +MigrationState *tcp_start_incoming_migration(const char *host_port); >> >> MigrationState *tcp_start_outgoing_migration(Monitor *mon, >> const char *host_port, >> > > >
On 06/01/2010 11:23 AM, Yoshiaki Tamura wrote: > 2010/6/2 Anthony Liguori<aliguori@linux.vnet.ibm.com>: > >> On 06/01/2010 10:40 AM, Yoshiaki Tamura wrote: >> >>> Create a thread to handle tcp incoming migration when CONFIG_IOTHREAD >>> is enabled. Spawned thread writes it's return status to th_fds[1] >>> before exit, and main thread joins and reads it. In >>> tcp_start_incoming_migration(), allocate FdMigrationState and return >>> MigrationState to let migration to print incoming migration info. >>> >>> >> In the absence of any locking, I can't see how this is safe. >> > Right. If we use threading here, we need to prevent commands from > monitor that affects incoming thread. > There's a huge number of calls that can get made during live migration that can also be triggered by the monitor. Even the basic things likes qemu_set_fd_handler2 can cause havoc. Just preventing certain monitor commands is not sufficient. Regards, Anthony Liguori > Thanks, > > Yoshi > > >> Regards, >> >> Anthony Liguori >> >> >>> Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp> >>> --- >>> migration-tcp.c | 86 >>> ++++++++++++++++++++++++++++++++++++++++++++++--------- >>> migration.h | 2 +- >>> 2 files changed, 73 insertions(+), 15 deletions(-) >>> >>> diff --git a/migration-tcp.c b/migration-tcp.c >>> index 95ce722..f20e5fe 100644 >>> --- a/migration-tcp.c >>> +++ b/migration-tcp.c >>> @@ -18,6 +18,7 @@ >>> #include "sysemu.h" >>> #include "buffered_file.h" >>> #include "block.h" >>> +#include "qemu-thread.h" >>> >>> //#define DEBUG_MIGRATION_TCP >>> >>> @@ -29,6 +30,11 @@ >>> do { } while (0) >>> #endif >>> >>> +#ifdef CONFIG_IOTHREAD >>> +static QemuThread migration_thread; >>> +static int th_fds[2]; >>> +#endif >>> + >>> static int socket_errno(FdMigrationState *s) >>> { >>> return socket_error(); >>> @@ -176,41 +182,93 @@ static void tcp_accept_incoming_migration(void >>> *opaque) >>> out_fopen: >>> qemu_fclose(f); >>> out: >>> +#ifndef CONFIG_IOTHREAD >>> qemu_set_fd_handler2(s, NULL, NULL, NULL, NULL); >>> +#endif >>> close(s); >>> close(c); >>> +#ifdef CONFIG_IOTHREAD >>> + write(th_fds[1],&ret, sizeof(ret)); >>> + qemu_thread_exit(NULL); >>> +#endif >>> +} >>> + >>> +#ifdef CONFIG_IOTHREAD >>> +static void tcp_incoming_migration_complete(void *opaque) >>> +{ >>> + int ret, state = 0; >>> + FdMigrationState *s = opaque; >>> + >>> + qemu_thread_join(&migration_thread, NULL); >>> + >>> + ret = read(th_fds[0],&state, sizeof(state)); >>> + if (ret == -1) { >>> + fprintf(stderr, "failed to read from pipe\n"); >>> + goto err; >>> + } >>> + >>> + s->state = state< 0 ? MIG_STATE_ERROR : MIG_STATE_COMPLETED; >>> + >>> +err: >>> + qemu_set_fd_handler2(th_fds[0], NULL, NULL, NULL, NULL); >>> + close(th_fds[0]); >>> + close(th_fds[1]); >>> } >>> +#endif >>> >>> -int tcp_start_incoming_migration(const char *host_port) >>> +MigrationState *tcp_start_incoming_migration(const char *host_port) >>> { >>> struct sockaddr_in addr; >>> + FdMigrationState *s; >>> int val; >>> - int s; >>> >>> if (parse_host_port(&addr, host_port)< 0) { >>> fprintf(stderr, "invalid host/port combination: %s\n", >>> host_port); >>> - return -EINVAL; >>> + return NULL; >>> } >>> >>> - s = qemu_socket(PF_INET, SOCK_STREAM, 0); >>> - if (s == -1) >>> - return -socket_error(); >>> + s = qemu_mallocz(sizeof(*s)); >>> + >>> + s->get_error = socket_errno; >>> + s->close = tcp_close; >>> + s->mig_state.cancel = migrate_fd_cancel; >>> + s->mig_state.get_status = migrate_fd_get_status; >>> + s->state = MIG_STATE_ACTIVE; >>> + >>> + s->fd = qemu_socket(PF_INET, SOCK_STREAM, 0); >>> + if (s->fd == -1) { >>> + qemu_free(s); >>> + return NULL; >>> + } >>> >>> val = 1; >>> - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, >>> sizeof(val)); >>> + setsockopt(s->fd, SOL_SOCKET, SO_REUSEADDR, >>> + (const char *)&val, sizeof(val)); >>> >>> - if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) >>> + if (bind(s->fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) >>> goto err; >>> >>> - if (listen(s, 1) == -1) >>> + if (listen(s->fd, 1) == -1) >>> goto err; >>> >>> - qemu_set_fd_handler2(s, NULL, tcp_accept_incoming_migration, NULL, >>> - (void *)(unsigned long)s); >>> +#ifdef CONFIG_IOTHREAD >>> + if (qemu_pipe(th_fds) == -1) { >>> + fprintf(stderr, "failed to create pipe\n"); >>> + goto err; >>> + } >>> >>> - return 0; >>> + qemu_thread_create(&migration_thread, (void >>> *)tcp_accept_incoming_migration, >>> + (void *)(unsigned long)s->fd); >>> + qemu_set_fd_handler2(th_fds[0], NULL, >>> tcp_incoming_migration_complete, NULL, >>> + (void *)s); >>> +#else >>> + qemu_set_fd_handler2(s->fd, NULL, tcp_accept_incoming_migration, >>> NULL, >>> + (void *)(unsigned long)s->fd); >>> +#endif >>> + >>> + return&s->mig_state; >>> >>> err: >>> - close(s); >>> - return -socket_error(); >>> + close(s->fd); >>> + return NULL; >>> } >>> diff --git a/migration.h b/migration.h >>> index 385423f..c11e6db 100644 >>> --- a/migration.h >>> +++ b/migration.h >>> @@ -76,7 +76,7 @@ MigrationState *exec_start_outgoing_migration(Monitor >>> *mon, >>> int blk, >>> int inc); >>> >>> -int tcp_start_incoming_migration(const char *host_port); >>> +MigrationState *tcp_start_incoming_migration(const char *host_port); >>> >>> MigrationState *tcp_start_outgoing_migration(Monitor *mon, >>> const char *host_port, >>> >>> >> >> >>
2010/6/2 Anthony Liguori <aliguori@linux.vnet.ibm.com>: > On 06/01/2010 11:23 AM, Yoshiaki Tamura wrote: >> >> 2010/6/2 Anthony Liguori<aliguori@linux.vnet.ibm.com>: >> >>> >>> On 06/01/2010 10:40 AM, Yoshiaki Tamura wrote: >>> >>>> >>>> Create a thread to handle tcp incoming migration when CONFIG_IOTHREAD >>>> is enabled. Spawned thread writes it's return status to th_fds[1] >>>> before exit, and main thread joins and reads it. In >>>> tcp_start_incoming_migration(), allocate FdMigrationState and return >>>> MigrationState to let migration to print incoming migration info. >>>> >>>> >>> >>> In the absence of any locking, I can't see how this is safe. >>> >> >> Right. If we use threading here, we need to prevent commands from >> monitor that affects incoming thread. >> > > There's a huge number of calls that can get made during live migration that > can also be triggered by the monitor. > > Even the basic things likes qemu_set_fd_handler2 can cause havoc. > > Just preventing certain monitor commands is not sufficient. Ah, that was my concern. Sorry for not understanding well. Thanks for your advice. Yoshi > > Regards, > > Anthony Liguori > >> Thanks, >> >> Yoshi >> >> >>> >>> Regards, >>> >>> Anthony Liguori >>> >>> >>>> >>>> Signed-off-by: Yoshiaki Tamura<tamura.yoshiaki@lab.ntt.co.jp> >>>> --- >>>> migration-tcp.c | 86 >>>> ++++++++++++++++++++++++++++++++++++++++++++++--------- >>>> migration.h | 2 +- >>>> 2 files changed, 73 insertions(+), 15 deletions(-) >>>> >>>> diff --git a/migration-tcp.c b/migration-tcp.c >>>> index 95ce722..f20e5fe 100644 >>>> --- a/migration-tcp.c >>>> +++ b/migration-tcp.c >>>> @@ -18,6 +18,7 @@ >>>> #include "sysemu.h" >>>> #include "buffered_file.h" >>>> #include "block.h" >>>> +#include "qemu-thread.h" >>>> >>>> //#define DEBUG_MIGRATION_TCP >>>> >>>> @@ -29,6 +30,11 @@ >>>> do { } while (0) >>>> #endif >>>> >>>> +#ifdef CONFIG_IOTHREAD >>>> +static QemuThread migration_thread; >>>> +static int th_fds[2]; >>>> +#endif >>>> + >>>> static int socket_errno(FdMigrationState *s) >>>> { >>>> return socket_error(); >>>> @@ -176,41 +182,93 @@ static void tcp_accept_incoming_migration(void >>>> *opaque) >>>> out_fopen: >>>> qemu_fclose(f); >>>> out: >>>> +#ifndef CONFIG_IOTHREAD >>>> qemu_set_fd_handler2(s, NULL, NULL, NULL, NULL); >>>> +#endif >>>> close(s); >>>> close(c); >>>> +#ifdef CONFIG_IOTHREAD >>>> + write(th_fds[1],&ret, sizeof(ret)); >>>> + qemu_thread_exit(NULL); >>>> +#endif >>>> +} >>>> + >>>> +#ifdef CONFIG_IOTHREAD >>>> +static void tcp_incoming_migration_complete(void *opaque) >>>> +{ >>>> + int ret, state = 0; >>>> + FdMigrationState *s = opaque; >>>> + >>>> + qemu_thread_join(&migration_thread, NULL); >>>> + >>>> + ret = read(th_fds[0],&state, sizeof(state)); >>>> + if (ret == -1) { >>>> + fprintf(stderr, "failed to read from pipe\n"); >>>> + goto err; >>>> + } >>>> + >>>> + s->state = state< 0 ? MIG_STATE_ERROR : MIG_STATE_COMPLETED; >>>> + >>>> +err: >>>> + qemu_set_fd_handler2(th_fds[0], NULL, NULL, NULL, NULL); >>>> + close(th_fds[0]); >>>> + close(th_fds[1]); >>>> } >>>> +#endif >>>> >>>> -int tcp_start_incoming_migration(const char *host_port) >>>> +MigrationState *tcp_start_incoming_migration(const char *host_port) >>>> { >>>> struct sockaddr_in addr; >>>> + FdMigrationState *s; >>>> int val; >>>> - int s; >>>> >>>> if (parse_host_port(&addr, host_port)< 0) { >>>> fprintf(stderr, "invalid host/port combination: %s\n", >>>> host_port); >>>> - return -EINVAL; >>>> + return NULL; >>>> } >>>> >>>> - s = qemu_socket(PF_INET, SOCK_STREAM, 0); >>>> - if (s == -1) >>>> - return -socket_error(); >>>> + s = qemu_mallocz(sizeof(*s)); >>>> + >>>> + s->get_error = socket_errno; >>>> + s->close = tcp_close; >>>> + s->mig_state.cancel = migrate_fd_cancel; >>>> + s->mig_state.get_status = migrate_fd_get_status; >>>> + s->state = MIG_STATE_ACTIVE; >>>> + >>>> + s->fd = qemu_socket(PF_INET, SOCK_STREAM, 0); >>>> + if (s->fd == -1) { >>>> + qemu_free(s); >>>> + return NULL; >>>> + } >>>> >>>> val = 1; >>>> - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, >>>> sizeof(val)); >>>> + setsockopt(s->fd, SOL_SOCKET, SO_REUSEADDR, >>>> + (const char *)&val, sizeof(val)); >>>> >>>> - if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) >>>> + if (bind(s->fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) >>>> goto err; >>>> >>>> - if (listen(s, 1) == -1) >>>> + if (listen(s->fd, 1) == -1) >>>> goto err; >>>> >>>> - qemu_set_fd_handler2(s, NULL, tcp_accept_incoming_migration, NULL, >>>> - (void *)(unsigned long)s); >>>> +#ifdef CONFIG_IOTHREAD >>>> + if (qemu_pipe(th_fds) == -1) { >>>> + fprintf(stderr, "failed to create pipe\n"); >>>> + goto err; >>>> + } >>>> >>>> - return 0; >>>> + qemu_thread_create(&migration_thread, (void >>>> *)tcp_accept_incoming_migration, >>>> + (void *)(unsigned long)s->fd); >>>> + qemu_set_fd_handler2(th_fds[0], NULL, >>>> tcp_incoming_migration_complete, NULL, >>>> + (void *)s); >>>> +#else >>>> + qemu_set_fd_handler2(s->fd, NULL, tcp_accept_incoming_migration, >>>> NULL, >>>> + (void *)(unsigned long)s->fd); >>>> +#endif >>>> + >>>> + return&s->mig_state; >>>> >>>> err: >>>> - close(s); >>>> - return -socket_error(); >>>> + close(s->fd); >>>> + return NULL; >>>> } >>>> diff --git a/migration.h b/migration.h >>>> index 385423f..c11e6db 100644 >>>> --- a/migration.h >>>> +++ b/migration.h >>>> @@ -76,7 +76,7 @@ MigrationState *exec_start_outgoing_migration(Monitor >>>> *mon, >>>> int blk, >>>> int inc); >>>> >>>> -int tcp_start_incoming_migration(const char *host_port); >>>> +MigrationState *tcp_start_incoming_migration(const char *host_port); >>>> >>>> MigrationState *tcp_start_outgoing_migration(Monitor *mon, >>>> const char *host_port, >>>> >>>> >>> >>> >>> > > >
diff --git a/migration-tcp.c b/migration-tcp.c index 95ce722..f20e5fe 100644 --- a/migration-tcp.c +++ b/migration-tcp.c @@ -18,6 +18,7 @@ #include "sysemu.h" #include "buffered_file.h" #include "block.h" +#include "qemu-thread.h" //#define DEBUG_MIGRATION_TCP @@ -29,6 +30,11 @@ do { } while (0) #endif +#ifdef CONFIG_IOTHREAD +static QemuThread migration_thread; +static int th_fds[2]; +#endif + static int socket_errno(FdMigrationState *s) { return socket_error(); @@ -176,41 +182,93 @@ static void tcp_accept_incoming_migration(void *opaque) out_fopen: qemu_fclose(f); out: +#ifndef CONFIG_IOTHREAD qemu_set_fd_handler2(s, NULL, NULL, NULL, NULL); +#endif close(s); close(c); +#ifdef CONFIG_IOTHREAD + write(th_fds[1], &ret, sizeof(ret)); + qemu_thread_exit(NULL); +#endif +} + +#ifdef CONFIG_IOTHREAD +static void tcp_incoming_migration_complete(void *opaque) +{ + int ret, state = 0; + FdMigrationState *s = opaque; + + qemu_thread_join(&migration_thread, NULL); + + ret = read(th_fds[0], &state, sizeof(state)); + if (ret == -1) { + fprintf(stderr, "failed to read from pipe\n"); + goto err; + } + + s->state = state < 0 ? MIG_STATE_ERROR : MIG_STATE_COMPLETED; + +err: + qemu_set_fd_handler2(th_fds[0], NULL, NULL, NULL, NULL); + close(th_fds[0]); + close(th_fds[1]); } +#endif -int tcp_start_incoming_migration(const char *host_port) +MigrationState *tcp_start_incoming_migration(const char *host_port) { struct sockaddr_in addr; + FdMigrationState *s; int val; - int s; if (parse_host_port(&addr, host_port) < 0) { fprintf(stderr, "invalid host/port combination: %s\n", host_port); - return -EINVAL; + return NULL; } - s = qemu_socket(PF_INET, SOCK_STREAM, 0); - if (s == -1) - return -socket_error(); + s = qemu_mallocz(sizeof(*s)); + + s->get_error = socket_errno; + s->close = tcp_close; + s->mig_state.cancel = migrate_fd_cancel; + s->mig_state.get_status = migrate_fd_get_status; + s->state = MIG_STATE_ACTIVE; + + s->fd = qemu_socket(PF_INET, SOCK_STREAM, 0); + if (s->fd == -1) { + qemu_free(s); + return NULL; + } val = 1; - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const char *)&val, sizeof(val)); + setsockopt(s->fd, SOL_SOCKET, SO_REUSEADDR, + (const char *)&val, sizeof(val)); - if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) == -1) + if (bind(s->fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) goto err; - if (listen(s, 1) == -1) + if (listen(s->fd, 1) == -1) goto err; - qemu_set_fd_handler2(s, NULL, tcp_accept_incoming_migration, NULL, - (void *)(unsigned long)s); +#ifdef CONFIG_IOTHREAD + if (qemu_pipe(th_fds) == -1) { + fprintf(stderr, "failed to create pipe\n"); + goto err; + } - return 0; + qemu_thread_create(&migration_thread, (void *)tcp_accept_incoming_migration, + (void *)(unsigned long)s->fd); + qemu_set_fd_handler2(th_fds[0], NULL, tcp_incoming_migration_complete, NULL, + (void *)s); +#else + qemu_set_fd_handler2(s->fd, NULL, tcp_accept_incoming_migration, NULL, + (void *)(unsigned long)s->fd); +#endif + + return &s->mig_state; err: - close(s); - return -socket_error(); + close(s->fd); + return NULL; } diff --git a/migration.h b/migration.h index 385423f..c11e6db 100644 --- a/migration.h +++ b/migration.h @@ -76,7 +76,7 @@ MigrationState *exec_start_outgoing_migration(Monitor *mon, int blk, int inc); -int tcp_start_incoming_migration(const char *host_port); +MigrationState *tcp_start_incoming_migration(const char *host_port); MigrationState *tcp_start_outgoing_migration(Monitor *mon, const char *host_port,
Create a thread to handle tcp incoming migration when CONFIG_IOTHREAD is enabled. Spawned thread writes it's return status to th_fds[1] before exit, and main thread joins and reads it. In tcp_start_incoming_migration(), allocate FdMigrationState and return MigrationState to let migration to print incoming migration info. Signed-off-by: Yoshiaki Tamura <tamura.yoshiaki@lab.ntt.co.jp> --- migration-tcp.c | 86 ++++++++++++++++++++++++++++++++++++++++++++++--------- migration.h | 2 +- 2 files changed, 73 insertions(+), 15 deletions(-)