Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,8 @@ static inline int flb_tail_file_exists(struct stat *st,
* file in question.
*/
static int set_file_position(struct flb_tail_config *ctx,
struct flb_tail_file *file)
struct flb_tail_file *file,
int explicit_offset)
{
int64_t ret;

Expand Down Expand Up @@ -1231,30 +1232,38 @@ static int set_file_position(struct flb_tail_config *ctx,
}
#endif

if (ctx->read_from_head == FLB_TRUE) {
/* no need to seek, offset position is already zero */
return 0;
}

if (file->offset > 0) {
/* A pre-set offset (e.g. from an aged-out re-pickup) must be honoured
* even when read_from_head is true; the flag only governs truly new
* files that have no prior read position. */
if (explicit_offset) {
ret = lseek(file->fd, file->offset, SEEK_SET);

if (ret == -1) {
flb_errno();
return -1;
}
}
else {
ret = lseek(file->fd, 0, SEEK_END);

if (ret == -1) {
flb_errno();
return -1;
if (file->decompression_context == NULL) {
file->stream_offset = ret;
}

file->offset = ret;
return 0;
}

if (ctx->read_from_head == FLB_TRUE) {
/* no need to seek, offset position is already zero */
return 0;
}

ret = lseek(file->fd, 0, SEEK_END);

if (ret == -1) {
flb_errno();
return -1;
}

file->offset = ret;

if (file->decompression_context == NULL) {
file->stream_offset = ret;
}
Expand Down Expand Up @@ -1586,7 +1595,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
}

/* Set the file position (database offset, head or tail) */
ret = set_file_position(ctx, file);
ret = set_file_position(ctx, file, (offset != -1));
if (ret == -1) {
goto err_fs_remove;
}
Expand Down Expand Up @@ -2438,6 +2447,13 @@ static int check_purge_deleted_file(struct flb_tail_config *ctx,
file->name,
strlen(file->name),
file->inode);
/* Preserve read offset for re-pickup if mtime is later refreshed.
* Use flb_tail_file_db_offset() rather than file->offset so that
* any buffered but unparsed bytes are not skipped on re-pickup. */
flb_tail_scan_register_ignored_file_size(ctx,
file->name,
strlen(file->name),
flb_tail_file_db_offset(file));
flb_tail_file_remove(file);
return FLB_TRUE;
}
Expand Down
14 changes: 10 additions & 4 deletions plugins/in_tail/tail_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@

void flb_tail_scan_register_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length, size_t size)
{
flb_hash_table_add(ctx->ignored_file_sizes, path, path_length, (void *) size, 0);

/*
* Store size+1 so that an offset of zero is distinguishable from
* "no entry" (a stored pointer of NULL) when fetched back.
*/
flb_hash_table_add(ctx->ignored_file_sizes, path, path_length, (void *)(size + 1), 0);
}

void flb_tail_scan_unregister_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length)
Expand All @@ -48,10 +51,13 @@ ssize_t flb_tail_scan_fetch_ignored_file_size(struct flb_tail_config *ctx, const
result = (ssize_t) flb_hash_table_get_ptr(ctx->ignored_file_sizes, path, path_length);

if (result == 0) {
result = -1;
/* Key not found (or stored value was genuinely zero before the +1
* encoding was introduced — treat as absent). */
return -1;
}

return result;
/* Undo the +1 bias applied on registration. */
return result - 1;
}

void flb_tail_scan_register_aged_out_inode(struct flb_tail_config *ctx,
Expand Down
25 changes: 21 additions & 4 deletions plugins/in_tail/tail_scan_glob.c
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,22 @@ static int tail_scan_path(const char *path, struct flb_tail_config *ctx)
strlen(globbuf.gl_pathv[i]),
&aged_out_inode) == 0) {
if (aged_out_inode == (uint64_t) st.st_ino) {
flb_plg_debug(ctx->ins,
"excluded=%s (ignore_active_older_files)",
globbuf.gl_pathv[i]);
continue;
mtime = flb_tail_stat_mtime(&st);
if (mtime > 0 && (now - ctx->ignore_older) > mtime) {
flb_plg_debug(ctx->ins,
"excluded=%s (ignore_active_older_files)",
globbuf.gl_pathv[i]);
continue;
}
}
else {
/* Different inode at the same path: the stored offset
* belongs to the old file and must not be applied to the
* replacement file. */
flb_tail_scan_unregister_ignored_file_size(
ctx,
globbuf.gl_pathv[i],
strlen(globbuf.gl_pathv[i]));
}

flb_tail_scan_unregister_aged_out_inode(
Comment thread
cosmo0920 marked this conversation as resolved.
Expand Down Expand Up @@ -290,6 +302,11 @@ static int tail_scan_path(const char *path, struct flb_tail_config *ctx)
ctx,
globbuf.gl_pathv[i],
strlen(globbuf.gl_pathv[i]));

/* Discard stale offset if the file was truncated in place. */
if (ignored_file_size > (ssize_t) st.st_size) {
ignored_file_size = -1;
}
}

/* Append file to list */
Expand Down
19 changes: 16 additions & 3 deletions plugins/in_tail/tail_scan_win32.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,17 @@ static int tail_register_file(const char *target, struct flb_tail_config *ctx,
strlen(path),
&aged_out_inode) == 0) {
if (aged_out_inode == (uint64_t) st.st_ino) {
flb_plg_debug(ctx->ins, "excluded=%s (ignore_active_older_files)",
path);
return -1;
mtime = flb_tail_stat_mtime(&st);
if (mtime > 0 && (ts - ctx->ignore_older) > mtime) {
flb_plg_debug(ctx->ins, "excluded=%s (ignore_active_older_files)",
path);
return -1;
}
}
else {
/* Different inode at the same path: the stored offset belongs to
* the old file and must not be applied to the replacement file. */
flb_tail_scan_unregister_ignored_file_size(ctx, path, strlen(path));
}

flb_tail_scan_unregister_aged_out_inode(ctx, path, strlen(path));
Expand All @@ -127,6 +135,11 @@ static int tail_register_file(const char *target, struct flb_tail_config *ctx,
ctx,
path,
strlen(path));

/* Discard stale offset if the file was truncated in place. */
if (ignored_file_size > (ssize_t) st.st_size) {
ignored_file_size = -1;
}
}

return flb_tail_file_append(path, &st, FLB_TAIL_STATIC, ignored_file_size, ctx);
Expand Down
190 changes: 190 additions & 0 deletions tests/runtime/in_tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -2038,6 +2038,194 @@ void flb_test_in_tail_ignore_active_older_files()
test_tail_ctx_destroy(ctx);
}

/*
* Verify that a file excluded by ignore_active_older_files is re-picked up
* once its mtime is refreshed by a new write.
*
* Sequence:
* 1. Write msg1 → engine reads it (count = 1)
* 2. Wait 4 s → purge fires (rotate_wait=1s), file is >2s old; inode
* registered as aged-out and file removed from monitoring
* 3. Write msg2 → mtime is now fresh
* 4. Wait 3 s → scan fires (refresh_interval=1s), sees fresh mtime;
* unregisters aged-out entry and re-adds file at the
* stored offset (file->offset saved at age-out time);
* engine reads only msg2 (count += 1)
* 5. Assert count == 2
*/
void flb_test_in_tail_ignore_active_older_files_reread_on_update()
{
struct flb_lib_out_cb cb_data;
struct test_tail_ctx *ctx;
char *file[] = {"source_file_reread.log"};
char *path = "source_file_reread.log";
char *msg = "TEST LINE";
const int expected = 2;
const int expected_before_rotate = 1;
int ret;
int num;
int unused;

clear_output_num();

cb_data.cb = cb_count_msgpack;
cb_data.data = &unused;

ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
return;
}

ret = flb_input_set(ctx->flb, ctx->o_ffd,
"path", path,
"ignore_older", "2s",
"rotate_wait", "1s",
"refresh_interval", "1s",
"read_newly_discovered_files_from_head", "false",
"ignore_active_older_files", "on",
NULL);
TEST_CHECK(ret == 0);

ret = flb_start(ctx->flb);
if (!TEST_CHECK(ret == 0)) {
test_tail_ctx_destroy(ctx);
return;
}

/* Write first message and allow it to be flushed */
ret = write_msg(ctx, msg, strlen(msg));
if (!TEST_CHECK(ret > 0)) {
test_tail_ctx_destroy(ctx);
return;
}

/* Wait until msg1 is consumed before starting the aging clock. */
wait_expected_num_with_timeout(5000, expected_before_rotate, &num);
if (!TEST_CHECK(num == expected_before_rotate)) {
TEST_MSG("msg1 not consumed in time. got=%d", num);
test_tail_ctx_destroy(ctx);
return;
}

/*
* Wait long enough for the purge callback (rotate_wait=1s) to fire and
* detect that the file's mtime is older than ignore_older=2s, which
* removes the file from monitoring and registers its inode as aged-out.
*/
flb_time_msleep(4000);
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/* Append new content: this updates mtime so the file is no longer old */
ret = write_msg(ctx, msg, strlen(msg));
if (!TEST_CHECK(ret > 0)) {
test_tail_ctx_destroy(ctx);
return;
}

/*
* Wait for the scan callback (refresh_interval=1s) to re-evaluate the
* aged-out entry, find the fresh mtime, unregister the entry, and
* re-add the file. The file is re-added at the stored offset (the read
* position saved when the file was aged out), so only msg2 — the content
* that refreshed the mtime — is flushed (count += 1).
*/
wait_expected_num_with_timeout(5000, expected, &num);

if (!TEST_CHECK(num == expected)) {
TEST_MSG("output num error. expect=%d got=%d", expected, num);
}

test_tail_ctx_destroy(ctx);
}

void flb_test_in_tail_ignore_active_older_files_reread_on_update_default_read_from_head()
{
struct flb_lib_out_cb cb_data;
struct test_tail_ctx *ctx;
char *file[] = {"source_file_reread_default.log"};
char *path = "source_file_reread_default.log";
char *msg = "TEST LINE";
const int expected = 2;
const int expected_before_rotate = 1;
int ret;
int num;
int unused;

clear_output_num();

cb_data.cb = cb_count_msgpack;
cb_data.data = &unused;

ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
return;
}

/*
* Do not set read_newly_discovered_files_from_head — leave it at its
* default (true). The fix in set_file_position must honour the saved
* offset even when ctx->read_from_head is true, so only msg2 is flushed
* on re-pickup rather than replaying msg1 from the start.
*/
ret = flb_input_set(ctx->flb, ctx->o_ffd,
"path", path,
"ignore_older", "2s",
"rotate_wait", "1s",
"refresh_interval", "1s",
"ignore_active_older_files", "on",
NULL);
TEST_CHECK(ret == 0);

ret = flb_start(ctx->flb);
if (!TEST_CHECK(ret == 0)) {
test_tail_ctx_destroy(ctx);
return;
}

/* Write first message and allow it to be flushed */
ret = write_msg(ctx, msg, strlen(msg));
if (!TEST_CHECK(ret > 0)) {
test_tail_ctx_destroy(ctx);
return;
}

/* Wait until msg1 is consumed before starting the aging clock. */
wait_expected_num_with_timeout(5000, expected_before_rotate, &num);
if (!TEST_CHECK(num == expected_before_rotate)) {
TEST_MSG("msg1 not consumed in time. got=%d", num);
test_tail_ctx_destroy(ctx);
return;
}

/*
* Wait long enough for the purge callback (rotate_wait=1s) to fire and
* age out the file.
*/
flb_time_msleep(4000);

/* Append new content: updates mtime so the file is no longer old */
ret = write_msg(ctx, msg, strlen(msg));
if (!TEST_CHECK(ret > 0)) {
test_tail_ctx_destroy(ctx);
return;
}

/*
* The scan callback re-adds the file from the stored offset. With the
* default read_newly_discovered_files_from_head=true, set_file_position
* must still seek to the saved offset so msg1 is not replayed.
* Total expected: 1 (msg1) + 1 (msg2) = 2.
*/
wait_expected_num_with_timeout(5000, expected, &num);

if (!TEST_CHECK(num == expected)) {
TEST_MSG("output num error. expect=%d got=%d", expected, num);
}

test_tail_ctx_destroy(ctx);
}

void flb_test_inotify_watcher_false()
{
struct flb_lib_out_cb cb_data;
Expand Down Expand Up @@ -2693,6 +2881,8 @@ TEST_LIST = {
{"skip_empty_lines_crlf", flb_test_skip_empty_lines_crlf},
{"ignore_older", flb_test_ignore_older},
{"ignore_active_older_files", flb_test_in_tail_ignore_active_older_files},
{"ignore_active_older_files_reread_on_update", flb_test_in_tail_ignore_active_older_files_reread_on_update},
{"ignore_active_older_files_reread_on_update_default_read_from_head", flb_test_in_tail_ignore_active_older_files_reread_on_update_default_read_from_head},
#ifdef FLB_HAVE_INOTIFY
{"inotify_watcher_false", flb_test_inotify_watcher_false},
#endif /* FLB_HAVE_INOTIFY */
Expand Down
Loading