Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
33ad7d6
Add Unix domain socket control interface for TUI (#482)
wesm Mar 9, 2026
48c3343
Fix control socket safety: stale detection, directory creation, metad…
wesm Mar 9, 2026
9533e66
Narrow stale socket detection to ECONNREFUSED only
wesm Mar 9, 2026
3040bb5
Simplify isConnRefused to use errors.Is directly
wesm Mar 9, 2026
1610803
Fix control socket selection and metadata desync
wesm Mar 9, 2026
5d00505
Fix control socket security and cross-platform issues
wesm Mar 9, 2026
d83fc20
Use errors.Is for EPERM check, extract runtime-info builder
wesm Mar 9, 2026
5ee5730
Fix data race in TestControlSocketRoundtrip
wesm Mar 9, 2026
56b3436
Fix response data races, safe cleanup, and Windows test compat
wesm Mar 9, 2026
1986b8d
Migrate control socket tests to testify, document testify usage
wesm Mar 9, 2026
a4fa58a
Close pipe read end after bubbletea Run exits
wesm Mar 10, 2026
d480663
Add --no-quit flag and quit control command
wesm Mar 16, 2026
097151d
Resolve display names in set-filter control command
wesm Mar 16, 2026
3043bfe
Resolve set-filter repo names from /api/repos instead of jobs cache
wesm Mar 16, 2026
36958a9
Omit q/quit from help text when --no-quit is active
wesm Mar 16, 2026
1e57d21
Don't overwrite repoNames from branch-filtered modal data
wesm Mar 16, 2026
7d67563
Refresh repoNames on reconnect and unfiltered modal, add tasks help test
wesm Mar 16, 2026
e2afd59
Use explicit flag for branch-filtered reposMsg instead of inferring
wesm Mar 16, 2026
5fb717f
Add test verifying branchNone produces unfiltered reposMsg
wesm Mar 16, 2026
81b668f
Check branch param key absence, not just empty value
wesm Mar 16, 2026
0a8b1c5
Cover Program.Send with response timeout during startup
wesm Mar 16, 2026
8929174
Defer control socket until event loop is running
wesm Mar 16, 2026
5dea88c
Replace thread-unsafe umask with chmod, fix socket unlink race
wesm Mar 17, 2026
9b8f22d
Tighten pre-existing socket directory permissions to 0700
wesm Mar 17, 2026
45752f1
Scope dir tightening to managed path, drop stale filter metadata
wesm Mar 17, 2026
a48a90a
Restore parent directory creation for custom socket paths
wesm Mar 17, 2026
11f4151
Fix partial filter mutation, socket double-unlink, and dir fallthrough
wesm Mar 17, 2026
9b5c66c
Add clear-filter atomicity test for locked branch + repo request
wesm Mar 17, 2026
7655dec
Assert filterStack and cmd in clear-filter atomicity test
wesm Mar 17, 2026
7bbddc2
Clear Closed/Verdict on rerun so job stays visible under hideClosed
wesm Mar 17, 2026
4425efe
Clear Closed/Verdict in both rerun paths, restore on failure
wesm Mar 17, 2026
fc86613
Refresh runtime metadata when daemon address changes on reconnect
wesm Mar 17, 2026
7b17083
Set controlSocket via message after listener succeeds
wesm Mar 17, 2026
08dfbe4
Wait for socket cleanup before Run returns
wesm Mar 17, 2026
d0732f3
Prevent deadlock if program exits before first Update
wesm Mar 17, 2026
97ca7db
Use SetUnlinkOnClose to prevent socket race, clear stale selection
wesm Mar 17, 2026
decd7df
Restore selection on failed close/cancel rollback, add socket test
wesm Mar 17, 2026
5d039c3
Fix socket race test to exercise real scenario, add keyboard cancel t…
wesm Mar 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ CLI (roborev) -> HTTP API -> Daemon -> Worker Pool -> Agent adapters

## Testing

Use `testify` (`github.com/stretchr/testify`) for all test assertions. Use `require.*` for fatal preconditions and `assert.*` for non-fatal checks. Do not use raw `if`/`t.Errorf`/`t.Fatalf` patterns.

- After any Go code changes, run `go fmt ./...` and `go vet ./...` before committing.
- Fast test pass: `go test ./...`
- Integration tests: `go test -tags=integration ./...`
Expand Down
3 changes: 3 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ roborev tui # Terminal UI
- `test` agent is always available (no PATH lookup)
- Worker pool test hooks enable deterministic synchronization
- Table-driven tests are preferred
- Use `testify` (`assert`/`require`) for all test assertions -- do not use raw `if`/`t.Errorf`/`t.Fatalf` patterns
- `require.*` for fatal preconditions (test cannot continue if this fails), `assert.*` for non-fatal checks
- In tests with more than three assertions, prefer `assert := assert.New(t)` shorthand

## Conventions

Expand Down
4 changes: 2 additions & 2 deletions cmd/roborev/tui/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func TestTUICancelJobSuccess(t *testing.T) {
}
_, m := mockServerModel(t, expectJSONPost(t, "/api/job/cancel", cancelRequest{JobID: 42}, map[string]any{"success": true}))
oldFinishedAt := time.Now().Add(-1 * time.Hour)
cmd := m.cancelJob(42, storage.JobStatusRunning, &oldFinishedAt)
cmd := m.cancelJob(42, storage.JobStatusRunning, &oldFinishedAt, false)
msg := cmd()

result := assertMsgType[cancelResultMsg](t, msg)
Expand All @@ -530,7 +530,7 @@ func TestTUICancelJobNotFound(t *testing.T) {
w.WriteHeader(http.StatusNotFound)
json.NewEncoder(w).Encode(map[string]string{"error": "not found"})
})
cmd := m.cancelJob(99, storage.JobStatusQueued, nil)
cmd := m.cancelJob(99, storage.JobStatusQueued, nil, false)
msg := cmd()

result := assertMsgType[cancelResultMsg](t, msg)
Expand Down
54 changes: 46 additions & 8 deletions cmd/roborev/tui/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,22 +144,60 @@ func (m model) markParentClosed(parentJobID int64) tea.Cmd {
}
}

// cancelJob sends a cancel request to the server
func (m model) cancelJob(jobID int64, oldStatus storage.JobStatus, oldFinishedAt *time.Time) tea.Cmd {
// cancelJob sends a cancel request to the server.
// restoreSelection tells the result handler to reselect the job
// if the request fails and the optimistic update is rolled back.
func (m model) cancelJob(
jobID int64, oldStatus storage.JobStatus,
oldFinishedAt *time.Time, restoreSelection bool,
) tea.Cmd {
return func() tea.Msg {
err := m.postJSON("/api/job/cancel", map[string]any{"job_id": jobID}, nil)
return cancelResultMsg{jobID: jobID, oldState: oldStatus, oldFinishedAt: oldFinishedAt, err: err}
err := m.postJSON(
"/api/job/cancel",
map[string]any{"job_id": jobID}, nil,
)
return cancelResultMsg{
jobID: jobID,
oldState: oldStatus,
oldFinishedAt: oldFinishedAt,
restoreSelection: restoreSelection,
err: err,
}
}
}

// rerunJob sends a rerun request to the server for failed/canceled jobs
func (m model) rerunJob(jobID int64, oldStatus storage.JobStatus, oldStartedAt, oldFinishedAt *time.Time, oldError string) tea.Cmd {
// rerunJob sends a rerun request to the server for failed/canceled jobs.
func (m model) rerunJob(snap rerunSnapshot) tea.Cmd {
return func() tea.Msg {
err := m.postJSON("/api/job/rerun", map[string]any{"job_id": jobID}, nil)
return rerunResultMsg{jobID: jobID, oldState: oldStatus, oldStartedAt: oldStartedAt, oldFinishedAt: oldFinishedAt, oldError: oldError, err: err}
err := m.postJSON(
"/api/job/rerun",
map[string]any{"job_id": snap.jobID}, nil,
)
return rerunResultMsg{
jobID: snap.jobID,
oldState: snap.oldStatus,
oldStartedAt: snap.oldStartedAt,
oldFinishedAt: snap.oldFinishedAt,
oldError: snap.oldError,
oldClosed: snap.oldClosed,
oldVerdict: snap.oldVerdict,
err: err,
}
}
}

// rerunSnapshot captures job state before an optimistic rerun
// update so it can be rolled back if the server request fails.
type rerunSnapshot struct {
jobID int64
oldStatus storage.JobStatus
oldStartedAt *time.Time
oldFinishedAt *time.Time
oldError string
oldClosed *bool
oldVerdict *string
}

func (m model) submitComment(jobID int64, text string) tea.Cmd {
return func() tea.Msg {
commenter := os.Getenv("USER")
Expand Down
251 changes: 251 additions & 0 deletions cmd/roborev/tui/control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package tui

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"os"
"path/filepath"
"syscall"
"time"

tea "github.com/charmbracelet/bubbletea"
)

const controlResponseTimeout = 3 * time.Second

// removeStaleSocket checks whether socketPath is a leftover Unix
// socket from a previous crash and removes it. Returns an error if
// the path exists but is not a socket (protecting regular files from
// accidental deletion via a mistyped --control-socket).
func removeStaleSocket(socketPath string) error {
fi, err := os.Lstat(socketPath)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return fmt.Errorf("stat socket path: %w", err)
}
if fi.Mode().Type()&os.ModeSocket == 0 {
return fmt.Errorf(
"%s already exists and is not a socket", socketPath,
)
}
// It's a socket — try to connect to see if it's still live.
conn, dialErr := net.DialTimeout("unix", socketPath, 500*time.Millisecond)
if dialErr == nil {
conn.Close()
return fmt.Errorf(
"%s is already in use by another listener", socketPath,
)
}
// Only treat ECONNREFUSED as proof that nothing is listening.
// Other dial errors (wrong socket type, permission denied, etc.)
// are ambiguous and could indicate a live non-stream socket.
if !isConnRefused(dialErr) {
return fmt.Errorf(
"%s: cannot determine socket state: %w",
socketPath, dialErr,
)
}
// ECONNREFUSED — nothing is listening. Safe to remove.
if err := os.Remove(socketPath); err != nil {
return fmt.Errorf("remove stale socket: %w", err)
}
return nil
}

// isConnRefused returns true when the error chain contains
// ECONNREFUSED, which means a socket exists but nothing is listening.
func isConnRefused(err error) bool {
return errors.Is(err, syscall.ECONNREFUSED)
}

// startControlListener creates a Unix domain socket and starts
// accepting connections. Each connection receives one JSON command,
// dispatches it through the tea.Program, and returns a JSON response.
// Returns a cleanup function that closes the listener and removes
// the socket file.
// ensureSocketDir creates the socket parent directory with
// owner-only permissions. MkdirAll is a no-op on existing
// directories, so we always chmod explicitly to tighten
// pre-existing dirs (e.g. ~/.roborev created with 0755).
// This must only be called for managed directories (the
// default data dir), never for arbitrary user-supplied paths.
func ensureSocketDir(dir string) error {
if err := os.MkdirAll(dir, 0700); err != nil {
return fmt.Errorf("create socket directory: %w", err)
}
if err := os.Chmod(dir, 0700); err != nil {
return fmt.Errorf("chmod socket directory: %w", err)
}
return nil
}

func startControlListener(
socketPath string, p *tea.Program,
) (func(), error) {
// Ensure the parent directory exists for custom socket paths.
// Permission tightening is handled separately by ensureSocketDir
// for the default managed directory only.
if err := os.MkdirAll(filepath.Dir(socketPath), 0755); err != nil {
return nil, fmt.Errorf("create socket directory: %w", err)
}

// Only remove an existing path if it is a stale Unix socket.
// Refusing to remove regular files prevents data loss from
// a mistyped --control-socket path.
if err := removeStaleSocket(socketPath); err != nil {
return nil, err
}

ln, err := net.Listen("unix", socketPath)
if err != nil {
return nil, fmt.Errorf("listen on %s: %w", socketPath, err)
}

// Restrict socket permissions to owner-only.
if err := os.Chmod(socketPath, 0600); err != nil {
ln.Close()
return nil, fmt.Errorf("chmod socket: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())

go acceptLoop(ctx, ln, p)

// Disable automatic unlinking so ln.Close() does not remove
// whatever is at socketPath — a successor process may have
// already bound a new socket there.
ln.(*net.UnixListener).SetUnlinkOnClose(false)

cleanup := func() {
cancel()
os.Remove(socketPath)
ln.Close()
}
return cleanup, nil
}

func acceptLoop(ctx context.Context, ln net.Listener, p *tea.Program) {
for {
conn, err := ln.Accept()
if err != nil {
// Check if listener was closed (normal shutdown)
select {
case <-ctx.Done():
return
default:
}
log.Printf("control: accept error: %v", err)
// Back off on transient errors (e.g. EMFILE) to
// avoid a tight CPU-pegging loop.
time.Sleep(100 * time.Millisecond)
continue
}
go handleControlConn(ctx, conn, p)
}
}

func handleControlConn(
ctx context.Context, conn net.Conn, p *tea.Program,
) {
defer conn.Close()

// Set read deadline to prevent hung connections
if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
return
}

scanner := bufio.NewScanner(conn)
scanner.Buffer(make([]byte, 64*1024), 64*1024)
if !scanner.Scan() {
writeError(conn, "empty request")
return
}

var req controlRequest
if err := json.Unmarshal(scanner.Bytes(), &req); err != nil {
writeError(conn, "invalid JSON: "+err.Error())
return
}

resp := dispatchCommand(ctx, req, p)
writeResponse(conn, resp)
}

func dispatchCommand(
ctx context.Context, req controlRequest, p *tea.Program,
) controlResponse {
isQuery, isMutation := isControlCommand(req.Command)

switch {
case isQuery:
return queryViaProgram(ctx, p, req)
case isMutation:
return mutateViaProgram(ctx, p, req)
default:
return controlResponse{
Error: fmt.Sprintf("unknown command: %s", req.Command),
}
}
}

// queryViaProgram sends a controlQueryMsg through the program and
// waits for the Update handler to write the response. The control
// listener is only started after the event loop is running (via the
// ready channel in Run), so p.Send will not block here.
func queryViaProgram(
ctx context.Context, p *tea.Program, req controlRequest,
) controlResponse {
respCh := make(chan controlResponse, 1)
p.Send(controlQueryMsg{req: req, respCh: respCh})

select {
case resp := <-respCh:
return resp
case <-ctx.Done():
return controlResponse{Error: "TUI is shutting down"}
case <-time.After(controlResponseTimeout):
return controlResponse{Error: "response timeout"}
}
}

// mutateViaProgram sends a controlMutationMsg through the program
// and waits for the Update handler to write the response.
func mutateViaProgram(
ctx context.Context, p *tea.Program, req controlRequest,
) controlResponse {
respCh := make(chan controlResponse, 1)
p.Send(controlMutationMsg{req: req, respCh: respCh})

select {
case resp := <-respCh:
return resp
case <-ctx.Done():
return controlResponse{Error: "TUI is shutting down"}
case <-time.After(controlResponseTimeout):
return controlResponse{Error: "response timeout"}
}
}

func writeResponse(conn net.Conn, resp controlResponse) {
data, err := json.Marshal(resp)
if err != nil {
writeError(conn, "marshal error: "+err.Error())
return
}
data = append(data, '\n')
_, _ = conn.Write(data)
}

func writeError(conn net.Conn, msg string) {
resp := controlResponse{Error: msg}
data, _ := json.Marshal(resp)
data = append(data, '\n')
_, _ = conn.Write(data)
}
Loading
Loading