Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// in the same context.

#include <cstring>
#include <mutex>
#include <thread>

#include "uur/fixtures.h"

Expand Down Expand Up @@ -301,3 +303,196 @@ TEST_P(urMultiDeviceContextMemBufferTest, WriteKernelKernelRead) {
ASSERT_EQ(a, fill_val + 2);
}
}

TEST_P(urMultiDeviceContextMemBufferTest, PingPongKernelExecution) {
if (num_devices <= 1) {
GTEST_SKIP();
}

// Setup kernels for alternating devices
AddBuffer1DArg(0, 0, buffer);
AddBuffer1DArg(1, 0, buffer);

T fill_val = 50;
std::vector<T> in_vec(buffer_size, fill_val);
std::vector<T> out_vec(buffer_size);

const uint32_t ping_pong_iterations = 20;
std::vector<ur_event_handle_t> events(ping_pong_iterations);
ur_event_handle_t write_event, read_event;

size_t work_dims[3] = {buffer_size, 1, 1};
size_t offset[3] = {0, 0, 0};

ASSERT_SUCCESS(urEnqueueMemBufferWrite(queues[0], buffer, false, 0,
buffer_size_bytes, in_vec.data(), 0,
nullptr, &write_event));

// Ping-pong kernel execution across two devices
// Each kernel increments the buffer values by 1
for (uint32_t i = 0; i < ping_pong_iterations; ++i) {
uint32_t device_idx = i % 2;
ur_event_handle_t *wait_event = (i == 0) ? &write_event : &events[i - 1];

ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp(
queues[device_idx], kernels[device_idx], 1, offset, work_dims, nullptr,
static_cast<uint32_t>(kernel_args[device_idx].size()),
kernel_args[device_idx].data(), nullptr, 1, wait_event, &events[i]));
}

ASSERT_SUCCESS(urEnqueueMemBufferRead(
queues[0], buffer, false, 0, buffer_size_bytes, out_vec.data(), 1,
&events[ping_pong_iterations - 1], &read_event));

ASSERT_SUCCESS(urEventWait(1, &read_event));

// Verify final result
for (auto &a : out_vec) {
ASSERT_EQ(a, fill_val + ping_pong_iterations);
}
}

TEST_P(urMultiDeviceContextMemBufferTest, ComplexMigrationPattern) {
if (num_devices <= 1) {
GTEST_SKIP();
}

AddBuffer1DArg(1, 0, buffer);

T fill_val = 200;
std::vector<T> in_vec(buffer_size, fill_val);
std::vector<T> intermediate_vec(buffer_size);
std::vector<T> out_vec(buffer_size);

ur_event_handle_t write_event, intermediate_read_event,
intermediate_write_event, final_kernel_event, read_event;

size_t work_dims[3] = {buffer_size, 1, 1};
size_t offset[3] = {0, 0, 0};

// Write on device 0, execute kernel on device 1
ASSERT_SUCCESS(urEnqueueMemBufferWrite(queues[0], buffer, false, 0,
buffer_size_bytes, in_vec.data(), 0,
nullptr, &write_event));

ur_event_handle_t phase1_kernel_event;
ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp(
queues[1], kernels[1], 1, offset, work_dims, nullptr,
static_cast<uint32_t>(kernel_args[1].size()), kernel_args[1].data(),
nullptr, 1, &write_event, &phase1_kernel_event));

// Read intermediate result back to host
ASSERT_SUCCESS(urEnqueueMemBufferRead(
queues[0], buffer, false, 0, buffer_size_bytes, intermediate_vec.data(),
1, &phase1_kernel_event, &intermediate_read_event));

// Modify on host and write back via the same device that performed the read
// (queues[0]). This ensures a coherent ownership chain: the device that last
// read the buffer writes the updated data back, so the subsequent kernel
// launch on device 1 observes the correct value after migration.
ASSERT_SUCCESS(urEventWait(1, &intermediate_read_event));
for (auto &val : intermediate_vec) {
val += 10;
}

ASSERT_SUCCESS(urEnqueueMemBufferWrite(
queues[0], buffer, false, 0, buffer_size_bytes, intermediate_vec.data(),
0, nullptr, &intermediate_write_event));

// Final kernel execution on device 1, triggering migration from device 0
ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp(
queues[1], kernels[1], 1, offset, work_dims, nullptr,
static_cast<uint32_t>(kernel_args[1].size()), kernel_args[1].data(),
nullptr, 1, &intermediate_write_event, &final_kernel_event));

// Final read via device 0
ASSERT_SUCCESS(urEnqueueMemBufferRead(queues[0], buffer, false, 0,
buffer_size_bytes, out_vec.data(), 1,
&final_kernel_event, &read_event));

ASSERT_SUCCESS(urEventWait(1, &read_event));

// Verify result: initial + 1 (phase1) + 10 (host) + 1 (final) = initial + 12
for (auto &a : out_vec) {
ASSERT_EQ(a, fill_val + 12);
}
}

TEST_P(urMultiDeviceContextMemBufferTest, KernelsExecutionWithThreads) {
Comment thread
kswiecicki marked this conversation as resolved.
if (num_devices <= 1) {
GTEST_SKIP();
}

AddBuffer1DArg(0, 0, buffer);
AddBuffer1DArg(1, 0, buffer);

T fill_val = 100;
std::vector<T> in_vec(buffer_size, fill_val);
std::vector<T> out_vec(buffer_size);

const uint32_t thread_count = 8;
const uint32_t iterations_per_thread = 10;
std::vector<std::thread> threads(thread_count);
ur_event_handle_t write_event, read_event;

size_t work_dims[3] = {buffer_size, 1, 1};
size_t offset[3] = {0, 0, 0};

// Create a dedicated queue per thread so that concurrent kernel submissions
// from different threads go through independent queues. This exercises the
// runtime's ability to handle simultaneous multi-queue access to the same
// buffer, which is the primary intent of this migration test.
std::vector<ur_queue_handle_t> thread_queues(thread_count);
for (auto t = 0u; t < thread_count; ++t) {
ASSERT_SUCCESS(
urQueueCreate(context, devices[t % 2], nullptr, &thread_queues[t]));
}

ASSERT_SUCCESS(urEnqueueMemBufferWrite(queues[0], buffer, false, 0,
buffer_size_bytes, in_vec.data(), 0,
nullptr, &write_event));

// A shared last-event and its lock ensure a total ordering across all kernel
// submissions from all threads. Each thread reads the current last event as
// its dependency, submits its kernel, then updates the shared last event
// before releasing the lock. This guarantees no two kernels race on the
// buffer while still exercising concurrent multi-queue submission paths.
std::mutex last_event_mutex;
ur_event_handle_t shared_last_event = write_event;

for (auto t = 0u; t < thread_count; ++t) {
threads[t] = std::thread([&, t]() {
const uint32_t device_idx = t % 2;
for (auto i = 0u; i < iterations_per_thread; ++i) {
ur_event_handle_t kernel_event;
std::unique_lock<std::mutex> lock(last_event_mutex);
ur_event_handle_t dep_event = shared_last_event;
ASSERT_SUCCESS(urEnqueueKernelLaunchWithArgsExp(
Comment thread
kswiecicki marked this conversation as resolved.
thread_queues[t], kernels[device_idx], 1, offset, work_dims,
nullptr, static_cast<uint32_t>(kernel_args[device_idx].size()),
kernel_args[device_idx].data(), nullptr, 1, &dep_event,
&kernel_event));
shared_last_event = kernel_event;
}
});
}
for (auto &th : threads) {
th.join();
}

// Enqueue read after all threads have finished submitting, waiting on the
// last event in the global chain.
ASSERT_SUCCESS(urEnqueueMemBufferRead(queues[0], buffer, false, 0,
buffer_size_bytes, out_vec.data(), 1,
&shared_last_event, &read_event));

ASSERT_SUCCESS(urEventWait(1, &read_event));

for (auto t = 0u; t < thread_count; ++t) {
urQueueRelease(thread_queues[t]);
}

for (auto &a : out_vec) {
ASSERT_EQ(a, fill_val + thread_count * iterations_per_thread);
}
}
Loading