diff --git a/unified-runtime/test/conformance/memory-migrate/urMemBufferMigrateAcrossDevices.cpp b/unified-runtime/test/conformance/memory-migrate/urMemBufferMigrateAcrossDevices.cpp index 24a768d98b714..0ca76b2171dd6 100644 --- a/unified-runtime/test/conformance/memory-migrate/urMemBufferMigrateAcrossDevices.cpp +++ b/unified-runtime/test/conformance/memory-migrate/urMemBufferMigrateAcrossDevices.cpp @@ -7,6 +7,8 @@ // in the same context. #include +#include +#include #include "uur/fixtures.h" @@ -301,3 +303,196 @@ TEST_P(urMultiDeviceContextMemBufferTest, WriteKernelKernelRead) { ASSERT_EQ(a, fill_val + 2); } } + +TEST_P(urMultiDeviceContextMemBufferTest, PingPongKernelExecution) { + if (num_devices <= 1) { + GTEST_SKIP(); + } + + // Setup kernels for alternating devices + AddBuffer1DArg(0, 0, buffer); + AddBuffer1DArg(1, 0, buffer); + + T fill_val = 50; + std::vector in_vec(buffer_size, fill_val); + std::vector out_vec(buffer_size); + + const uint32_t ping_pong_iterations = 20; + std::vector events(ping_pong_iterations); + ur_event_handle_t write_event, read_event; + + size_t work_dims[3] = {buffer_size, 1, 1}; + size_t offset[3] = {0, 0, 0}; + + ASSERT_SUCCESS(urEnqueueMemBufferWrite(queues[0], buffer, false, 0, + buffer_size_bytes, in_vec.data(), 0, + nullptr, &write_event)); + + // Ping-pong kernel execution across two devices + // Each kernel increments the buffer values by 1 + for (uint32_t i = 0; i < ping_pong_iterations; ++i) { + uint32_t device_idx = i % 2; + ur_event_handle_t *wait_event = (i == 0) ? &write_event : &events[i - 1]; + + ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp( + queues[device_idx], kernels[device_idx], 1, offset, work_dims, nullptr, + static_cast(kernel_args[device_idx].size()), + kernel_args[device_idx].data(), nullptr, 1, wait_event, &events[i])); + } + + ASSERT_SUCCESS(urEnqueueMemBufferRead( + queues[0], buffer, false, 0, buffer_size_bytes, out_vec.data(), 1, + &events[ping_pong_iterations - 1], &read_event)); + + ASSERT_SUCCESS(urEventWait(1, &read_event)); + + // Verify final result + for (auto &a : out_vec) { + ASSERT_EQ(a, fill_val + ping_pong_iterations); + } +} + +TEST_P(urMultiDeviceContextMemBufferTest, ComplexMigrationPattern) { + if (num_devices <= 1) { + GTEST_SKIP(); + } + + AddBuffer1DArg(1, 0, buffer); + + T fill_val = 200; + std::vector in_vec(buffer_size, fill_val); + std::vector intermediate_vec(buffer_size); + std::vector out_vec(buffer_size); + + ur_event_handle_t write_event, intermediate_read_event, + intermediate_write_event, final_kernel_event, read_event; + + size_t work_dims[3] = {buffer_size, 1, 1}; + size_t offset[3] = {0, 0, 0}; + + // Write on device 0, execute kernel on device 1 + ASSERT_SUCCESS(urEnqueueMemBufferWrite(queues[0], buffer, false, 0, + buffer_size_bytes, in_vec.data(), 0, + nullptr, &write_event)); + + ur_event_handle_t phase1_kernel_event; + ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp( + queues[1], kernels[1], 1, offset, work_dims, nullptr, + static_cast(kernel_args[1].size()), kernel_args[1].data(), + nullptr, 1, &write_event, &phase1_kernel_event)); + + // Read intermediate result back to host + ASSERT_SUCCESS(urEnqueueMemBufferRead( + queues[0], buffer, false, 0, buffer_size_bytes, intermediate_vec.data(), + 1, &phase1_kernel_event, &intermediate_read_event)); + + // Modify on host and write back via the same device that performed the read + // (queues[0]). This ensures a coherent ownership chain: the device that last + // read the buffer writes the updated data back, so the subsequent kernel + // launch on device 1 observes the correct value after migration. + ASSERT_SUCCESS(urEventWait(1, &intermediate_read_event)); + for (auto &val : intermediate_vec) { + val += 10; + } + + ASSERT_SUCCESS(urEnqueueMemBufferWrite( + queues[0], buffer, false, 0, buffer_size_bytes, intermediate_vec.data(), + 0, nullptr, &intermediate_write_event)); + + // Final kernel execution on device 1, triggering migration from device 0 + ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp( + queues[1], kernels[1], 1, offset, work_dims, nullptr, + static_cast(kernel_args[1].size()), kernel_args[1].data(), + nullptr, 1, &intermediate_write_event, &final_kernel_event)); + + // Final read via device 0 + ASSERT_SUCCESS(urEnqueueMemBufferRead(queues[0], buffer, false, 0, + buffer_size_bytes, out_vec.data(), 1, + &final_kernel_event, &read_event)); + + ASSERT_SUCCESS(urEventWait(1, &read_event)); + + // Verify result: initial + 1 (phase1) + 10 (host) + 1 (final) = initial + 12 + for (auto &a : out_vec) { + ASSERT_EQ(a, fill_val + 12); + } +} + +TEST_P(urMultiDeviceContextMemBufferTest, KernelsExecutionWithThreads) { + if (num_devices <= 1) { + GTEST_SKIP(); + } + + AddBuffer1DArg(0, 0, buffer); + AddBuffer1DArg(1, 0, buffer); + + T fill_val = 100; + std::vector in_vec(buffer_size, fill_val); + std::vector out_vec(buffer_size); + + const uint32_t thread_count = 8; + const uint32_t iterations_per_thread = 10; + std::vector threads(thread_count); + ur_event_handle_t write_event, read_event; + + size_t work_dims[3] = {buffer_size, 1, 1}; + size_t offset[3] = {0, 0, 0}; + + // Create a dedicated queue per thread so that concurrent kernel submissions + // from different threads go through independent queues. This exercises the + // runtime's ability to handle simultaneous multi-queue access to the same + // buffer, which is the primary intent of this migration test. + std::vector thread_queues(thread_count); + for (auto t = 0u; t < thread_count; ++t) { + ASSERT_SUCCESS( + urQueueCreate(context, devices[t % 2], nullptr, &thread_queues[t])); + } + + ASSERT_SUCCESS(urEnqueueMemBufferWrite(queues[0], buffer, false, 0, + buffer_size_bytes, in_vec.data(), 0, + nullptr, &write_event)); + + // A shared last-event and its lock ensure a total ordering across all kernel + // submissions from all threads. Each thread reads the current last event as + // its dependency, submits its kernel, then updates the shared last event + // before releasing the lock. This guarantees no two kernels race on the + // buffer while still exercising concurrent multi-queue submission paths. + std::mutex last_event_mutex; + ur_event_handle_t shared_last_event = write_event; + + for (auto t = 0u; t < thread_count; ++t) { + threads[t] = std::thread([&, t]() { + const uint32_t device_idx = t % 2; + for (auto i = 0u; i < iterations_per_thread; ++i) { + ur_event_handle_t kernel_event; + std::unique_lock lock(last_event_mutex); + ur_event_handle_t dep_event = shared_last_event; + ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp( + thread_queues[t], kernels[device_idx], 1, offset, work_dims, + nullptr, static_cast(kernel_args[device_idx].size()), + kernel_args[device_idx].data(), nullptr, 1, &dep_event, + &kernel_event)); + shared_last_event = kernel_event; + } + }); + } + for (auto &th : threads) { + th.join(); + } + + // Enqueue read after all threads have finished submitting, waiting on the + // last event in the global chain. + ASSERT_SUCCESS(urEnqueueMemBufferRead(queues[0], buffer, false, 0, + buffer_size_bytes, out_vec.data(), 1, + &shared_last_event, &read_event)); + + ASSERT_SUCCESS(urEventWait(1, &read_event)); + + for (auto t = 0u; t < thread_count; ++t) { + urQueueRelease(thread_queues[t]); + } + + for (auto &a : out_vec) { + ASSERT_EQ(a, fill_val + thread_count * iterations_per_thread); + } +}