Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk_v2/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,15 @@ if(WIN32)
list(APPEND FOUNDRY_LOCAL_PLATFORM_SOURCES
src/util/stacktrace_windows.cc
src/platform/windows/path.cc
src/platform/windows/file_io.cc
src/platform/windows/cross_process_file_lock.cc
)
else()
list(APPEND FOUNDRY_LOCAL_PLATFORM_SOURCES
src/util/stacktrace_posix.cc
src/platform/posix/path.cc
src/platform/posix/file_io.cc
src/platform/posix/cross_process_file_lock.cc
)
endif()

Expand Down
189 changes: 5 additions & 184 deletions sdk_v2/cpp/src/download/cross_process_file_lock.cc

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put this file and the header in src/platform? Feels a little confusing to have parts of the implementation here and parts in src/platform. This would match the ORT setup where the generic part of a class with platform specific implementation is in the platform directory.

Original file line number Diff line number Diff line change
@@ -1,199 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// Cross-platform orchestration for CrossProcessFileLock. The platform-specific
// pieces — the lock handle (State) and its releasing destructor,
// FormatProcessInfo, the CrossProcessFileLock destructor/constructor, and
// TryAcquireForDirectory — live in
// src/platform/{windows,posix}/cross_process_file_lock.cc.
#include "download/cross_process_file_lock.h"
#include "exception.h"
#include "logger.h"

#include <foundry_local/foundry_local_c.h>

#include <chrono>
#include <ctime>
#include <iomanip>
#include <sstream>
#include <string>
#include <thread>

#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#define NOMINMAX
#include <process.h>
#include <windows.h>
#else
#include <errno.h>
#include <fcntl.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <unistd.h>
#endif

namespace fl {

namespace {

constexpr const char* kLockFileName = ".download.lock";

/// `PID:<pid>,Time:<iso8601-utc>\n`
std::string FormatProcessInfo() {
#ifdef _WIN32
auto pid = static_cast<unsigned long>(_getpid());
#else
auto pid = static_cast<unsigned long>(getpid());
#endif
auto t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
std::tm tm{};
#ifdef _WIN32
gmtime_s(&tm, &t);
#else
gmtime_r(&t, &tm);
#endif
std::ostringstream oss;
oss << "PID:" << pid << ",Time:" << std::put_time(&tm, "%Y-%m-%dT%H:%M:%SZ") << '\n';
return oss.str();
}

} // namespace

// Platform-specific resource handle. The destructor here is the only thing
// that releases the lock; CrossProcessFileLock's destructor is defaulted.
#ifdef _WIN32
struct CrossProcessFileLock::State {
HANDLE handle;
~State() {
if (handle != INVALID_HANDLE_VALUE) {
// FILE_FLAG_DELETE_ON_CLOSE removes the file when the last handle closes.
CloseHandle(handle);
}
}
};
#else
struct CrossProcessFileLock::State {
int fd;
std::filesystem::path path;
~State() {
if (fd >= 0) {
// Unlink before close so the file disappears the instant the lock
// releases; a concurrent acquirer simply recreates it. This is the
// classic flock()+unlink() pattern, and it is safe here because every
// acquirer verifies, while holding the flock, that the inode it locked is
// still the one at `path` (see the fstat/stat check in
// TryAcquireForDirectory). An acquirer that raced in on the old inode
// between our unlink and a third party's recreate will see the inode
// mismatch and retry, so two processes never hold "the lock" at once.
// There is also no protected work between this unlink and close.
::unlink(path.c_str());
::close(fd);
}
}
};
#endif

CrossProcessFileLock::CrossProcessFileLock(std::filesystem::path path,
std::unique_ptr<State> state,
ILogger& logger)
: path_(std::move(path)), state_(std::move(state)), logger_(logger) {}

CrossProcessFileLock::~CrossProcessFileLock() {
// Release the OS handle first so the "released" log message is accurate.
state_.reset();
logger_.Log(LogLevel::Debug, "CrossProcessFileLock released: " + path_.string());
}

std::unique_ptr<CrossProcessFileLock> CrossProcessFileLock::TryAcquireForDirectory(
const std::filesystem::path& directory, ILogger& logger) {
std::error_code ec;
std::filesystem::create_directories(directory, ec);
// Best-effort: if create_directories failed, the platform open below will
// surface a clearer error message.

auto lock_path = directory / kLockFileName;
std::unique_ptr<State> state;

#ifdef _WIN32
// dwShareMode=0 blocks any other open (cross- and in-process) until this
// handle closes. FILE_FLAG_DELETE_ON_CLOSE pairs OPEN_ALWAYS into a
// self-cleaning lock that doesn't require unlink-then-close races.
auto wide = lock_path.wstring();
HANDLE handle = CreateFileW(wide.c_str(),
GENERIC_READ | GENERIC_WRITE,
0,
nullptr,
OPEN_ALWAYS,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_DELETE_ON_CLOSE,
nullptr);
if (handle == INVALID_HANDLE_VALUE) {
DWORD err = GetLastError();
if (err == ERROR_SHARING_VIOLATION || err == ERROR_LOCK_VIOLATION || err == ERROR_ACCESS_DENIED) {
// SHARING/LOCK_VIOLATION: another handle already holds the share-none
// lock. ACCESS_DENIED: the holder is mid-release — FILE_FLAG_DELETE_ON_CLOSE
// puts the file into STATUS_DELETE_PENDING during the close window, and a
// concurrent open of a delete-pending file is reported as ACCESS_DENIED.
// All three mean "another process has it"; treat as contention so the
// caller retries. (A genuine permission error also lands here and would
// poll until timeout, but the directory was just created successfully so
// that is improbable.)
return nullptr;
}
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL,
"CreateFileW failed for lock '" + lock_path.string() +
"' (GetLastError=" + std::to_string(err) + ")");
}

auto info = FormatProcessInfo();
DWORD written = 0;
WriteFile(handle, info.data(), static_cast<DWORD>(info.size()), &written, nullptr);
FlushFileBuffers(handle);

state = std::unique_ptr<State>(new State{handle});
#else
int fd = ::open(lock_path.c_str(), O_CREAT | O_RDWR | O_CLOEXEC, 0644);
if (fd < 0) {
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL,
"open failed for lock '" + lock_path.string() + "' (errno=" + std::to_string(errno) + ")");
}
if (::flock(fd, LOCK_EX | LOCK_NB) != 0) {
int err = errno;
::close(fd);
if (err == EWOULDBLOCK || err == EAGAIN) {
return nullptr;
}
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL,
"flock failed for '" + lock_path.string() + "' (errno=" + std::to_string(err) + ")");
}

// Robust-flock inode check. We now hold an exclusive flock on whatever inode
// `fd` refers to, but a releaser unlink()s the lock file in its destructor —
// so between our open() and flock() the path may have been unlinked and a
// third process may have recreated it. If so, we are holding a lock on an
// orphaned inode that guards nothing while the live file at `lock_path` is a
// different inode. Confirm the inode we locked is still the one at the path;
// if not, drop it and report contention so the caller retries against the
// live file. This closes the flock()+unlink() orphan-inode race, which is
// what lets two processes never both believe they hold the lock.
struct stat fd_stat {};
struct stat path_stat {};
if (::fstat(fd, &fd_stat) != 0 || ::stat(lock_path.c_str(), &path_stat) != 0 ||
fd_stat.st_dev != path_stat.st_dev || fd_stat.st_ino != path_stat.st_ino) {
::close(fd); // releases the flock on the stale / orphaned inode
return nullptr;
}

auto info = FormatProcessInfo();
// Best-effort: record this process's identity in the lock file for diagnostics.
// A failure here doesn't affect lock correctness, so it is only logged at Debug.
if (::ftruncate(fd, 0) != 0 || ::write(fd, info.data(), info.size()) < 0) {
logger.Log(LogLevel::Debug,
"CrossProcessFileLock: failed to write diagnostic process info to lock file '" +
lock_path.string() + "' (errno=" + std::to_string(errno) + ")");
}

state = std::unique_ptr<State>(new State{fd, lock_path});
#endif

logger.Log(LogLevel::Debug, "CrossProcessFileLock acquired: " + lock_path.string());
return std::unique_ptr<CrossProcessFileLock>(
new CrossProcessFileLock(std::move(lock_path), std::move(state), logger));
}

std::unique_ptr<CrossProcessFileLock> CrossProcessFileLock::WaitForDirectoryLock(
const std::filesystem::path& directory,
const CancellationPredicate& is_cancelled,
Expand Down
116 changes: 13 additions & 103 deletions sdk_v2/cpp/src/download/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,14 @@
#include "download/file_writer.h"
#include "exception.h"
#include "logger.h"
#include "platform/file_io.h"

#include <foundry_local/foundry_local_c.h>

#include <fstream>
#include <string>
#include <system_error>

#ifdef _WIN32
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <windows.h>
#else
#include <cerrno>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#endif

namespace fl {

namespace fs = std::filesystem;
Expand Down Expand Up @@ -69,112 +58,33 @@ void EnsureFileExistsAtSize(const fs::path& path, int64_t expected_size) {

FileWriter::FileWriter(ILogger& logger) : logger_(logger) {}

#ifdef _WIN32

FileWriter::~FileWriter() { Close(); }

void FileWriter::Open(const fs::path& path, int64_t expected_size) {
EnsureFileExistsAtSize(path, expected_size);
// FILE_SHARE_READ | FILE_SHARE_WRITE so the lock file / other tools can peek
// at the partial file without us erroring; positional WriteFile is safe
// regardless of share mode.
HANDLE h = ::CreateFileW(path.wstring().c_str(), GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE, nullptr, OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL, nullptr);
if (h == INVALID_HANDLE_VALUE) {
std::string error;
handle_ = platform::OpenWritableFile(path, error);
if (handle_ == platform::kInvalidFileHandle) {
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL,
"FileWriter open failed for " + path.string() + " (Win32 err " +
std::to_string(::GetLastError()) + ")");
"FileWriter open failed for " + path.string() + " (" + error + ")");
}
handle_ = h;
}

void FileWriter::WriteAt(int64_t offset, const uint8_t* data, size_t len) {
// Concurrent WriteFile calls with distinct OVERLAPPED offsets on the same
// handle are safe for non-overlapping ranges; the kernel orders them.
while (len > 0) {
OVERLAPPED ov{};
// Split the 64-bit file offset across the OVERLAPPED halves: the DWORD casts
// keep the low 32 bits in Offset and the high 32 bits in OffsetHigh.
ov.Offset = static_cast<DWORD>(static_cast<uint64_t>(offset));
ov.OffsetHigh = static_cast<DWORD>(static_cast<uint64_t>(offset) >> 32);
DWORD to_write = static_cast<DWORD>(len > 0x7FFFFFFFu ? 0x7FFFFFFFu : len);
DWORD written = 0;
if (!::WriteFile(handle_, data, to_write, &written, &ov)) {
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL,
"FileWriter write failed at offset " + std::to_string(offset) + " (Win32 err " +
std::to_string(::GetLastError()) + ")");
}
if (written == 0) {
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL,
"FileWriter short write at offset " + std::to_string(offset));
}
offset += static_cast<int64_t>(written);
data += written;
len -= written;
std::string error;
if (!platform::WriteFileAt(handle_, offset, data, len, error)) {
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL, "FileWriter " + error);
}
}

void FileWriter::Close() {
if (handle_ != nullptr) {
if (!::CloseHandle(handle_)) {
const DWORD err = ::GetLastError();
logger_.Log(LogLevel::Warning,
"FileWriter: CloseHandle failed (Win32 err " + std::to_string(err) + ")");
if (handle_ != platform::kInvalidFileHandle) {
std::string error;
if (!platform::CloseFile(handle_, error)) {
logger_.Log(LogLevel::Warning, "FileWriter: " + error);
}
handle_ = nullptr;
}
}

#else // POSIX

FileWriter::~FileWriter() { Close(); }

void FileWriter::Open(const fs::path& path, int64_t expected_size) {
EnsureFileExistsAtSize(path, expected_size);
fd_ = ::open(path.c_str(), O_RDWR | O_CLOEXEC);
if (fd_ < 0) {
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL,
"FileWriter open failed for " + path.string() + " (errno " +
std::to_string(errno) + ")");
handle_ = platform::kInvalidFileHandle;
}
}

void FileWriter::WriteAt(int64_t offset, const uint8_t* data, size_t len) {
while (len > 0) {
ssize_t n = ::pwrite(fd_, data, len, static_cast<off_t>(offset));
if (n < 0) {
if (errno == EINTR) continue;
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL,
"FileWriter pwrite failed at offset " + std::to_string(offset) + " (errno " +
std::to_string(errno) + ")");
}
if (n == 0) {
FL_THROW(FOUNDRY_LOCAL_ERROR_INTERNAL,
"FileWriter short pwrite at offset " + std::to_string(offset));
}
offset += n;
data += n;
len -= static_cast<size_t>(n);
}
}

void FileWriter::Close() {
if (fd_ >= 0) {
// A failing close() can surface a deferred write error (e.g. EIO, or ENOSPC
// on delayed allocation / a networked filesystem), so the file may be
// incomplete even though every pwrite returned success. Log it for
// diagnosis. Don't retry: on Linux the descriptor is freed even when close()
// returns EINTR, so a retry could close an unrelated, since-reused fd.
if (::close(fd_) != 0) {
const int err = errno;
logger_.Log(LogLevel::Warning,
"FileWriter: close failed (errno " + std::to_string(err) + ")");
}
fd_ = -1;
}
}

#endif

} // namespace fl
12 changes: 5 additions & 7 deletions sdk_v2/cpp/src/download/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class ILogger;
/// Workers in a single download claim disjoint chunks, so concurrent `WriteAt`
/// calls always target non-overlapping byte ranges. Backed by `pwrite` (POSIX)
/// or `WriteFile` + `OVERLAPPED` (Windows): the OS arbitrates concurrent writes
/// to disjoint ranges, so no user-space lock is taken.
/// to disjoint ranges, so no user-space lock is taken. The OS-specific calls
/// live in `src/platform/file_io.*`.
class FileWriter {
public:
explicit FileWriter(ILogger& logger);
Expand All @@ -38,12 +39,9 @@ class FileWriter {

private:
ILogger& logger_;
#ifdef _WIN32
// Win32 HANDLE. Holds a valid handle while open, nullptr otherwise.
void* handle_ = nullptr;
#else
int fd_ = -1;
#endif
// Native file handle (Win32 HANDLE or POSIX fd) as an integer; see
// src/platform/file_io.h. kInvalidFileHandle (-1) when not open.
std::intptr_t handle_ = -1;
};

} // namespace fl
Loading