Skip to content
Merged
13 changes: 5 additions & 8 deletions src/client/array/dc_array.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -2579,14 +2579,11 @@ adjust_array_size_cb(tse_task_t *task, void *data)
args->sgl->sg_nr = 1;
d_iov_set(&args->sgl->sg_iovs[0], props->buf, props->buf_len);

rc = tse_task_register_cbs(task, NULL, NULL, 0, adjust_array_size_cb, &props,
sizeof(props));
rc = tse_task_register_comp_cb_and_reinit(task, adjust_array_size_cb, &props,
sizeof(props), 0 /* delay */);
if (rc)
goto out;

rc = tse_task_reinit(task);
if (rc)
D_ERROR("FAILED to reinit task\n");
D_ERROR("FAILED to register comp cb and reinit task: " DF_RC "\n",
DP_RC(rc));

goto out;
}
Expand Down
6 changes: 4 additions & 2 deletions src/common/drpc.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* (C) Copyright 2018-2023 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -573,7 +573,9 @@ drpc_call(struct drpc *ctx, int flags, Drpc__Call *msg, Drpc__Response **resp)
return ret;

if (!(flags & R_SYNC)) {
response = drpc_response_create(msg);
response = drpc_response_create(msg);
if (response == NULL)
return -DER_NOMEM;
response->status = DRPC__STATUS__SUBMITTED;
*resp = response;
return 0;
Expand Down
64 changes: 52 additions & 12 deletions src/common/tse.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -384,8 +384,8 @@ tse_task_complete_locked(struct tse_task_private *dtp,
}

static int
register_cb(tse_task_t *task, bool is_comp, tse_task_cb_t cb,
void *arg, daos_size_t arg_size)
register_cb(tse_task_t *task, bool is_comp, tse_task_cb_t cb, void *arg, daos_size_t arg_size,
struct tse_task_cb **dtcp)
{
struct tse_task_private *dtp = tse_task2priv(task);
struct tse_task_cb *dtc;
Expand All @@ -411,9 +411,10 @@ register_cb(tse_task_t *task, bool is_comp, tse_task_cb_t cb,
d_list_add(&dtc->dtc_list, &dtp->dtp_comp_cb_list);
else /** MSC - don't see a need for more than 1 prep cb */
d_list_add_tail(&dtc->dtc_list, &dtp->dtp_prep_cb_list);

D_MUTEX_UNLOCK(&dtp->dtp_sched->dsp_lock);

if (dtcp != NULL)
*dtcp = dtc;
return 0;
}

Expand All @@ -422,7 +423,41 @@ tse_task_register_comp_cb(tse_task_t *task, tse_task_cb_t comp_cb,
void *arg, daos_size_t arg_size)
{
D_ASSERT(comp_cb != NULL);
return register_cb(task, true, comp_cb, arg, arg_size);
return register_cb(task, true, comp_cb, arg, arg_size, NULL);
}

static void
unregister_cb(tse_task_t *task, struct tse_task_cb *dtc)
{
struct tse_task_private *dtp;

if (dtc == NULL)
return;

dtp = tse_task2priv(task);
D_MUTEX_LOCK(&dtp->dtp_sched->dsp_lock);
d_list_del(&dtc->dtc_list);
D_MUTEX_UNLOCK(&dtp->dtp_sched->dsp_lock);
D_FREE(dtc);
}

int
tse_task_register_comp_cb_and_reinit(tse_task_t *task, tse_task_cb_t comp_cb, void *arg,
daos_size_t arg_size, uint64_t delay)
{
struct tse_task_cb *dtc = NULL;
int rc;

D_ASSERT(comp_cb != NULL);
rc = register_cb(task, true, comp_cb, arg, arg_size, &dtc);
if (rc != 0)
return rc;

rc = tse_task_reinit_with_delay(task, delay);
if (rc != 0)
unregister_cb(task, dtc);

return rc;
}

int
Expand All @@ -431,15 +466,20 @@ tse_task_register_cbs(tse_task_t *task, tse_task_cb_t prep_cb,
tse_task_cb_t comp_cb, void *comp_data,
daos_size_t comp_data_size)
{
int rc = 0;
int rc = 0;
struct tse_task_cb *dtc = NULL;

D_ASSERT(prep_cb != NULL || comp_cb != NULL);
if (prep_cb)
rc = register_cb(task, false, prep_cb, prep_data,
prep_data_size);
if (comp_cb && !rc)
rc = register_cb(task, true, comp_cb, comp_data,
comp_data_size);
if (prep_cb) {
rc = register_cb(task, false, prep_cb, prep_data, prep_data_size, &dtc);
if (rc != 0)
return rc;
}
if (comp_cb) {
rc = register_cb(task, true, comp_cb, comp_data, comp_data_size, NULL);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Looks this function could be used to register two callbacks, shouldn't we free the first one when the second registration failed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this should be fixed as well.

if (rc != 0)
unregister_cb(task, dtc);
}
return rc;
}

Expand Down
21 changes: 6 additions & 15 deletions src/container/cli.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
* (C) Copyright 2025-2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -717,26 +717,17 @@ pmap_refresh_cb(tse_task_t *task, void *data)
else
delay = 0;

rc = tse_task_register_comp_cb(task, pmap_refresh_cb, cb_arg,
sizeof(*cb_arg));
if (rc) {
D_ERROR(DF_UUID": pmap_refresh version (%d:%d), failed "
"to reg_comp_cb, "DF_RC"\n",
DP_UUID(pool->dp_pool), pm_ver,
cb_arg->pra_pm_ver, DP_RC(rc));
goto out;
}

cb_arg->pra_retry_nr++;
D_DEBUG(DB_TRACE, DF_UUID": pmap_refresh version (%d:%d), "
"in %d retry\n", DP_UUID(pool->dp_pool), pm_ver,
cb_arg->pra_pm_ver, cb_arg->pra_retry_nr);

rc = tse_task_reinit_with_delay(task, delay);
rc = tse_task_register_comp_cb_and_reinit(task, pmap_refresh_cb, cb_arg,
sizeof(*cb_arg), delay);
if (rc) {
D_ERROR(DF_UUID": pmap_refresh version (%d:%d), resched"
" failed, "DF_RC"\n", DP_UUID(pool->dp_pool),
pm_ver, cb_arg->pra_pm_ver, DP_RC(rc));
D_ERROR(DF_UUID ": pmap_refresh version (%d:%d), reg_comp_cb/reinit"
" failed, " DF_RC "\n",
DP_UUID(pool->dp_pool), pm_ver, cb_arg->pra_pm_ver, DP_RC(rc));
goto out;
}

Expand Down
19 changes: 19 additions & 0 deletions src/include/daos/tse.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2015-2024 Intel Corporation.
* (C) Copyright 2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -224,6 +225,24 @@ int
tse_task_register_comp_cb(tse_task_t *task, tse_task_cb_t comp_cb,
void *arg, size_t arg_size);

/**
* Atomically register a completion callback and reinitialize the task for
* retry with an optional scheduling delay. If the reinitialization fails the
* newly registered callback is removed and freed so the caller never leaks a
* dangling tse_task_cb node.
*
* \param[in] task Task to reinitialize for retry
* \param[in] comp_cb Completion callback to register for the retried run
* \param[in] arg Callback argument (copied internally, like tse_task_register_comp_cb)
* \param[in] arg_size Size of the argument
* \param[in] delay Scheduling delay in microseconds (0 = immediate)
*
* \return 0 on success, negative errno on failure.
*/
int
tse_task_register_comp_cb_and_reinit(tse_task_t *task, tse_task_cb_t comp_cb, void *arg,
size_t arg_size, uint64_t delay);

/**
* Mark task as completed.
*
Expand Down
9 changes: 7 additions & 2 deletions src/object/srv_cli.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* (C) Copyright 2017-2022 Intel Corporation.
* (C) Copyright 2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -72,8 +73,10 @@ dsc_obj_list_akey(daos_handle_t oh, daos_epoch_t epoch, daos_key_t *dkey,

rc = dc_obj_list_akey_task_create(oh, th, dkey, nr, kds, sgl, anchor,
NULL, dsc_scheduler(), &task);
if (rc)
if (rc) {
dc_tx_local_close(th);
return rc;
}

rc = tse_task_register_comp_cb(task, tx_close_cb, &th, sizeof(th));
if (rc) {
Expand Down Expand Up @@ -103,8 +106,10 @@ dsc_obj_fetch(daos_handle_t oh, daos_epoch_t epoch, daos_key_t *dkey,
rc = dc_obj_fetch_task_create(oh, th, 0, dkey, nr, extra_flag,
iods, sgls, maps, extra_arg, csum_iov,
NULL, dsc_scheduler(), &task);
if (rc)
if (rc) {
dc_tx_local_close(th);
return rc;
}

rc = tse_task_register_comp_cb(task, tx_close_cb, &th, sizeof(th));
if (rc) {
Expand Down
Loading