@@ -1,3 +1,24 @@
+2019-09-10 Julian Brown <julian@codesourcery.com>
+
+ * plugin/plugin-gcn.c (struct placeholder, struct asyncwait_info,
+ enum entry_type): New.
+ (queue_entry): Use entry_type enum for tag. Add asyncwait and
+ placeholder event type fields.
+ (wait_for_queue_nonfull): New function.
+ (queue_push_launch): Use above function instead of raising a fatal
+ error on queue-full condition. Use KERNEL_LAUNCH instead of hardwired
+ 0.
+ (queue_push_callback): Use wait_for_queue_nonfull instead of open-coded
+ wait sequence. Use CALLBACK instead of hardwired 1.
+ (queue_push_asyncwait, queue_push_placeholder): New.
+ (execute_queue_entry): Implement ASYNC_WAIT and ASYNC_PLACEHOLDER event
+ types.
+ (GOMP_OFFLOAD_openacc_async_serialize): Use queue_push_placeholder and
+ queue_push_asyncwait instead of host-synchronized wait_queue calls.
+ * testsuite/libgomp.oacc-c-c++-common/data-2-lib.c (main): Add missing
+ asynchronous waits.
+ * testsuite/libgomp.oacc-c-c++-common/data-2.c (main): Likewise.
+
2019-09-10 Julian Brown <julian@codesourcery.com>
* plugin/plugin-gcn.c (GOMP_OFFLOAD_openacc_async_host2dev): Enqueue
@@ -300,12 +300,34 @@ struct callback
void *data;
};
+struct placeholder
+{
+ int executed;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+};
+
+struct asyncwait_info
+{
+ struct placeholder *placeholderp;
+};
+
+enum entry_type
+{
+ KERNEL_LAUNCH,
+ CALLBACK,
+ ASYNC_WAIT,
+ ASYNC_PLACEHOLDER
+};
+
struct queue_entry
{
- int type;
+ enum entry_type type;
union {
struct kernel_launch launch;
struct callback callback;
+ struct asyncwait_info asyncwait;
+ struct placeholder placeholder;
} u;
};
@@ -1407,15 +1429,28 @@ GOMP_OFFLOAD_get_property (int device, int prop)
return propval;
}
+static void
+wait_for_queue_nonfull (struct goacc_asyncqueue *aq)
+{
+ if (aq->queue_n == ASYNC_QUEUE_SIZE)
+ {
+ pthread_mutex_lock (&aq->mutex);
+
+ /* Queue is full. Wait for it to not be full. */
+ while (aq->queue_n == ASYNC_QUEUE_SIZE)
+ pthread_cond_wait (&aq->queue_cond_out, &aq->mutex);
+
+ pthread_mutex_unlock (&aq->mutex);
+ }
+}
+
static void
queue_push_launch (struct goacc_asyncqueue *aq, struct kernel_info *kernel,
void *vars, struct GOMP_kernel_launch_attributes *kla)
{
assert (aq->agent == kernel->agent);
- if (aq->queue_n == ASYNC_QUEUE_SIZE)
- GOMP_PLUGIN_fatal ("ran out of async queue in thread %d:%d",
- aq->agent->device_id, aq->id);
+ wait_for_queue_nonfull (aq);
pthread_mutex_lock (&aq->mutex);
@@ -1425,7 +1460,7 @@ queue_push_launch (struct goacc_asyncqueue *aq, struct kernel_info *kernel,
HSA_DEBUG ("queue_push_launch %d:%d: at %i\n", aq->agent->device_id,
aq->id, queue_last);
- aq->queue[queue_last].type = 0;
+ aq->queue[queue_last].type = KERNEL_LAUNCH;
aq->queue[queue_last].u.launch.kernel = kernel;
aq->queue[queue_last].u.launch.vars = vars;
aq->queue[queue_last].u.launch.kla = *kla;
@@ -1444,16 +1479,7 @@ static void
queue_push_callback (struct goacc_asyncqueue *aq, void (*fn)(void *),
void *data)
{
- if (aq->queue_n == ASYNC_QUEUE_SIZE)
- {
- pthread_mutex_lock (&aq->mutex);
-
- /* Queue is full. Wait for it to not be full. */
- while (aq->queue_n == ASYNC_QUEUE_SIZE)
- pthread_cond_wait (&aq->queue_cond_out, &aq->mutex);
-
- pthread_mutex_unlock (&aq->mutex);
- }
+ wait_for_queue_nonfull (aq);
pthread_mutex_lock (&aq->mutex);
@@ -1463,7 +1489,7 @@ queue_push_callback (struct goacc_asyncqueue *aq, void (*fn)(void *),
HSA_DEBUG ("queue_push_callback %d:%d: at %i\n", aq->agent->device_id,
aq->id, queue_last);
- aq->queue[queue_last].type = 1;
+ aq->queue[queue_last].type = CALLBACK;
aq->queue[queue_last].u.callback.fn = fn;
aq->queue[queue_last].u.callback.data = data;
@@ -1477,16 +1503,92 @@ queue_push_callback (struct goacc_asyncqueue *aq, void (*fn)(void *),
pthread_mutex_unlock (&aq->mutex);
}
+/* Push an entry on AQ to wait for the event described by PLACEHOLDERP (on
+ another queue) to execute. */
+
+static void
+queue_push_asyncwait (struct goacc_asyncqueue *aq,
+ struct placeholder *placeholderp)
+{
+ wait_for_queue_nonfull (aq);
+
+ pthread_mutex_lock (&aq->mutex);
+
+ int queue_last = ((aq->queue_first + aq->queue_n) % ASYNC_QUEUE_SIZE);
+ if (DEBUG_QUEUES)
+ HSA_DEBUG ("queue_push_asyncwait %d:%d: at %i\n", aq->agent->device_id,
+ aq->id, queue_last);
+
+ aq->queue[queue_last].type = ASYNC_WAIT;
+ aq->queue[queue_last].u.asyncwait.placeholderp = placeholderp;
+
+ aq->queue_n++;
+
+ if (DEBUG_THREAD_SIGNAL)
+ HSA_DEBUG ("signalling async thread %d:%d: cond_in\n",
+ aq->agent->device_id, aq->id);
+ pthread_cond_signal (&aq->queue_cond_in);
+
+ pthread_mutex_unlock (&aq->mutex);
+}
+
+static struct placeholder *
+queue_push_placeholder (struct goacc_asyncqueue *aq)
+{
+ struct placeholder *placeholderp;
+
+ wait_for_queue_nonfull (aq);
+
+ pthread_mutex_lock (&aq->mutex);
+
+ int queue_last = ((aq->queue_first + aq->queue_n) % ASYNC_QUEUE_SIZE);
+ if (DEBUG_QUEUES)
+ HSA_DEBUG ("queue_push_placeholder %d:%d: at %i\n", aq->agent->device_id,
+ aq->id, queue_last);
+
+ aq->queue[queue_last].type = ASYNC_PLACEHOLDER;
+ placeholderp = &aq->queue[queue_last].u.placeholder;
+
+ if (pthread_mutex_init (&placeholderp->mutex, NULL))
+ {
+ pthread_mutex_unlock (&aq->mutex);
+ GOMP_PLUGIN_error ("Failed to initialize serialization mutex");
+ }
+
+ if (pthread_cond_init (&placeholderp->cond, NULL))
+ {
+ pthread_mutex_unlock (&aq->mutex);
+ GOMP_PLUGIN_error ("Failed to initialize serialization cond");
+ }
+
+ placeholderp->executed = 0;
+
+ aq->queue_n++;
+
+ if (DEBUG_THREAD_SIGNAL)
+ HSA_DEBUG ("signalling async thread %d:%d: cond_in\n",
+ aq->agent->device_id, aq->id);
+ pthread_cond_signal (&aq->queue_cond_in);
+
+ pthread_mutex_unlock (&aq->mutex);
+
+ return placeholderp;
+}
+
static void run_kernel (struct kernel_info *kernel, void *vars,
struct GOMP_kernel_launch_attributes *kla,
struct goacc_asyncqueue *aq, bool module_locked);
+static void wait_queue (struct goacc_asyncqueue *aq);
+
static void
execute_queue_entry (struct goacc_asyncqueue *aq, int index)
{
struct queue_entry *entry = &aq->queue[index];
- if (entry->type == 0)
+
+ switch (entry->type)
{
+ case KERNEL_LAUNCH:
if (DEBUG_QUEUES)
HSA_DEBUG ("Async thread %d:%d: Executing launch entry (%d)\n",
aq->agent->device_id, aq->id, index);
@@ -1496,9 +1598,9 @@ execute_queue_entry (struct goacc_asyncqueue *aq, int index)
if (DEBUG_QUEUES)
HSA_DEBUG ("Async thread %d:%d: Executing launch entry (%d) done\n",
aq->agent->device_id, aq->id, index);
- }
- else if (entry->type == 1)
- {
+ break;
+
+ case CALLBACK:
if (DEBUG_QUEUES)
HSA_DEBUG ("Async thread %d:%d: Executing callback entry (%d)\n",
aq->agent->device_id, aq->id, index);
@@ -1506,9 +1608,45 @@ execute_queue_entry (struct goacc_asyncqueue *aq, int index)
if (DEBUG_QUEUES)
HSA_DEBUG ("Async thread %d:%d: Executing callback entry (%d) done\n",
aq->agent->device_id, aq->id, index);
+ break;
+
+ case ASYNC_WAIT:
+ {
+ struct placeholder *placeholderp = entry->u.asyncwait.placeholderp;
+
+ if (DEBUG_QUEUES)
+ HSA_DEBUG ("Async thread %d:%d: Executing async wait entry (%d)\n",
+ aq->agent->device_id, aq->id, index);
+
+ pthread_mutex_lock (&placeholderp->mutex);
+
+ while (!placeholderp->executed)
+ pthread_cond_wait (&placeholderp->cond, &placeholderp->mutex);
+
+ pthread_mutex_unlock (&placeholderp->mutex);
+
+ if (pthread_cond_destroy (&placeholderp->cond))
+ GOMP_PLUGIN_error ("Failed to destroy serialization cond");
+
+ if (pthread_mutex_destroy (&placeholderp->mutex))
+ GOMP_PLUGIN_error ("Failed to destroy serialization mutex");
+
+ if (DEBUG_QUEUES)
+ HSA_DEBUG ("Async thread %d:%d: Executing async wait "
+ "entry (%d) done\n", aq->agent->device_id, aq->id, index);
+ }
+ break;
+
+ case ASYNC_PLACEHOLDER:
+ pthread_mutex_lock (&entry->u.placeholder.mutex);
+ entry->u.placeholder.executed = 1;
+ pthread_cond_signal (&entry->u.placeholder.cond);
+ pthread_mutex_unlock (&entry->u.placeholder.mutex);
+ break;
+
+ default:
+ GOMP_PLUGIN_fatal ("Unknown queue element");
}
- else
- GOMP_PLUGIN_fatal ("Unknown queue element");
}
static void *
@@ -3586,9 +3724,13 @@ bool
GOMP_OFFLOAD_openacc_async_serialize (struct goacc_asyncqueue *aq1,
struct goacc_asyncqueue *aq2)
{
- /* FIXME: what should happen here???? */
- wait_queue (aq1);
- wait_queue (aq2);
+ /* For serialize, stream aq2 waits for aq1 to complete work that has been
+ scheduled to run on it up to this point. */
+ if (aq1 != aq2)
+ {
+ struct placeholder *placeholderp = queue_push_placeholder (aq1);
+ queue_push_asyncwait (aq2, placeholderp);
+ }
return true;
}
@@ -155,11 +155,16 @@ main (int argc, char **argv)
for (int ii = 0; ii < N; ii++)
e[ii] = a[ii] + b[ii] + c[ii] + d[ii];
+ acc_wait_async (14, 10);
acc_copyout_async (a, nbytes, 10);
+ acc_wait_async (14, 11);
acc_copyout_async (b, nbytes, 11);
+ acc_wait_async (14, 12);
acc_copyout_async (c, nbytes, 12);
+ acc_wait_async (14, 13);
acc_copyout_async (d, nbytes, 13);
acc_copyout_async (e, nbytes, 14);
+ acc_wait_async (14, 15);
acc_delete_async (&N, sizeof (int), 15);
acc_wait_all ();
@@ -149,11 +149,16 @@ main (int argc, char **argv)
for (int ii = 0; ii < N; ii++)
e[ii] = a[ii] + b[ii] + c[ii] + d[ii];
+#pragma acc wait (14) async (10)
#pragma acc exit data copyout (a[0:N]) async (10)
+#pragma acc wait (14) async (11)
#pragma acc exit data copyout (b[0:N]) async (11)
+#pragma acc wait (14) async (12)
#pragma acc exit data copyout (c[0:N]) async (12)
+#pragma acc wait (14) async (13)
#pragma acc exit data copyout (d[0:N]) async (13)
#pragma acc exit data copyout (e[0:N]) async (14)
+#pragma acc wait (14) async (15)
#pragma acc exit data delete (N) async (15)
#pragma acc wait