diff --git a/plugins/in_tail/tail_file.c b/plugins/in_tail/tail_file.c index 3f44ea9d046..cf677ff8e05 100644 --- a/plugins/in_tail/tail_file.c +++ b/plugins/in_tail/tail_file.c @@ -1179,7 +1179,8 @@ static inline int flb_tail_file_exists(struct stat *st, * file in question. */ static int set_file_position(struct flb_tail_config *ctx, - struct flb_tail_file *file) + struct flb_tail_file *file, + int explicit_offset) { int64_t ret; @@ -1231,30 +1232,38 @@ static int set_file_position(struct flb_tail_config *ctx, } #endif - if (ctx->read_from_head == FLB_TRUE) { - /* no need to seek, offset position is already zero */ - return 0; - } - - if (file->offset > 0) { + /* A pre-set offset (e.g. from an aged-out re-pickup) must be honoured + * even when read_from_head is true; the flag only governs truly new + * files that have no prior read position. */ + if (explicit_offset) { ret = lseek(file->fd, file->offset, SEEK_SET); if (ret == -1) { flb_errno(); return -1; } - } - else { - ret = lseek(file->fd, 0, SEEK_END); - if (ret == -1) { - flb_errno(); - return -1; + if (file->decompression_context == NULL) { + file->stream_offset = ret; } - file->offset = ret; + return 0; } + if (ctx->read_from_head == FLB_TRUE) { + /* no need to seek, offset position is already zero */ + return 0; + } + + ret = lseek(file->fd, 0, SEEK_END); + + if (ret == -1) { + flb_errno(); + return -1; + } + + file->offset = ret; + if (file->decompression_context == NULL) { file->stream_offset = ret; } @@ -1586,7 +1595,7 @@ int flb_tail_file_append(char *path, struct stat *st, int mode, } /* Set the file position (database offset, head or tail) */ - ret = set_file_position(ctx, file); + ret = set_file_position(ctx, file, (offset != -1)); if (ret == -1) { goto err_fs_remove; } @@ -2438,6 +2447,13 @@ static int check_purge_deleted_file(struct flb_tail_config *ctx, file->name, strlen(file->name), file->inode); + /* Preserve read offset for re-pickup if mtime is later refreshed. + * Use flb_tail_file_db_offset() rather than file->offset so that + * any buffered but unparsed bytes are not skipped on re-pickup. */ + flb_tail_scan_register_ignored_file_size(ctx, + file->name, + strlen(file->name), + flb_tail_file_db_offset(file)); flb_tail_file_remove(file); return FLB_TRUE; } diff --git a/plugins/in_tail/tail_scan.c b/plugins/in_tail/tail_scan.c index 2495c4bc3fb..82dcda4bffa 100644 --- a/plugins/in_tail/tail_scan.c +++ b/plugins/in_tail/tail_scan.c @@ -32,8 +32,11 @@ void flb_tail_scan_register_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length, size_t size) { - flb_hash_table_add(ctx->ignored_file_sizes, path, path_length, (void *) size, 0); - + /* + * Store size+1 so that an offset of zero is distinguishable from + * "no entry" (a stored pointer of NULL) when fetched back. + */ + flb_hash_table_add(ctx->ignored_file_sizes, path, path_length, (void *)(size + 1), 0); } void flb_tail_scan_unregister_ignored_file_size(struct flb_tail_config *ctx, const char *path, size_t path_length) @@ -48,10 +51,13 @@ ssize_t flb_tail_scan_fetch_ignored_file_size(struct flb_tail_config *ctx, const result = (ssize_t) flb_hash_table_get_ptr(ctx->ignored_file_sizes, path, path_length); if (result == 0) { - result = -1; + /* Key not found (or stored value was genuinely zero before the +1 + * encoding was introduced — treat as absent). */ + return -1; } - return result; + /* Undo the +1 bias applied on registration. */ + return result - 1; } void flb_tail_scan_register_aged_out_inode(struct flb_tail_config *ctx, diff --git a/plugins/in_tail/tail_scan_glob.c b/plugins/in_tail/tail_scan_glob.c index 93da8ecbc56..25173c2c14a 100644 --- a/plugins/in_tail/tail_scan_glob.c +++ b/plugins/in_tail/tail_scan_glob.c @@ -250,10 +250,22 @@ static int tail_scan_path(const char *path, struct flb_tail_config *ctx) strlen(globbuf.gl_pathv[i]), &aged_out_inode) == 0) { if (aged_out_inode == (uint64_t) st.st_ino) { - flb_plg_debug(ctx->ins, - "excluded=%s (ignore_active_older_files)", - globbuf.gl_pathv[i]); - continue; + mtime = flb_tail_stat_mtime(&st); + if (mtime > 0 && (now - ctx->ignore_older) > mtime) { + flb_plg_debug(ctx->ins, + "excluded=%s (ignore_active_older_files)", + globbuf.gl_pathv[i]); + continue; + } + } + else { + /* Different inode at the same path: the stored offset + * belongs to the old file and must not be applied to the + * replacement file. */ + flb_tail_scan_unregister_ignored_file_size( + ctx, + globbuf.gl_pathv[i], + strlen(globbuf.gl_pathv[i])); } flb_tail_scan_unregister_aged_out_inode( @@ -290,6 +302,11 @@ static int tail_scan_path(const char *path, struct flb_tail_config *ctx) ctx, globbuf.gl_pathv[i], strlen(globbuf.gl_pathv[i])); + + /* Discard stale offset if the file was truncated in place. */ + if (ignored_file_size > (ssize_t) st.st_size) { + ignored_file_size = -1; + } } /* Append file to list */ diff --git a/plugins/in_tail/tail_scan_win32.c b/plugins/in_tail/tail_scan_win32.c index b917fa8a933..c5349ccf7b2 100644 --- a/plugins/in_tail/tail_scan_win32.c +++ b/plugins/in_tail/tail_scan_win32.c @@ -109,9 +109,17 @@ static int tail_register_file(const char *target, struct flb_tail_config *ctx, strlen(path), &aged_out_inode) == 0) { if (aged_out_inode == (uint64_t) st.st_ino) { - flb_plg_debug(ctx->ins, "excluded=%s (ignore_active_older_files)", - path); - return -1; + mtime = flb_tail_stat_mtime(&st); + if (mtime > 0 && (ts - ctx->ignore_older) > mtime) { + flb_plg_debug(ctx->ins, "excluded=%s (ignore_active_older_files)", + path); + return -1; + } + } + else { + /* Different inode at the same path: the stored offset belongs to + * the old file and must not be applied to the replacement file. */ + flb_tail_scan_unregister_ignored_file_size(ctx, path, strlen(path)); } flb_tail_scan_unregister_aged_out_inode(ctx, path, strlen(path)); @@ -127,6 +135,11 @@ static int tail_register_file(const char *target, struct flb_tail_config *ctx, ctx, path, strlen(path)); + + /* Discard stale offset if the file was truncated in place. */ + if (ignored_file_size > (ssize_t) st.st_size) { + ignored_file_size = -1; + } } return flb_tail_file_append(path, &st, FLB_TAIL_STATIC, ignored_file_size, ctx); diff --git a/tests/runtime/in_tail.c b/tests/runtime/in_tail.c index 9a523fa78ab..e686e1e9a4f 100644 --- a/tests/runtime/in_tail.c +++ b/tests/runtime/in_tail.c @@ -2038,6 +2038,194 @@ void flb_test_in_tail_ignore_active_older_files() test_tail_ctx_destroy(ctx); } +/* + * Verify that a file excluded by ignore_active_older_files is re-picked up + * once its mtime is refreshed by a new write. + * + * Sequence: + * 1. Write msg1 → engine reads it (count = 1) + * 2. Wait 4 s → purge fires (rotate_wait=1s), file is >2s old; inode + * registered as aged-out and file removed from monitoring + * 3. Write msg2 → mtime is now fresh + * 4. Wait 3 s → scan fires (refresh_interval=1s), sees fresh mtime; + * unregisters aged-out entry and re-adds file at the + * stored offset (file->offset saved at age-out time); + * engine reads only msg2 (count += 1) + * 5. Assert count == 2 + */ +void flb_test_in_tail_ignore_active_older_files_reread_on_update() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"source_file_reread.log"}; + char *path = "source_file_reread.log"; + char *msg = "TEST LINE"; + const int expected = 2; + const int expected_before_rotate = 1; + int ret; + int num; + int unused; + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + return; + } + + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", path, + "ignore_older", "2s", + "rotate_wait", "1s", + "refresh_interval", "1s", + "read_newly_discovered_files_from_head", "false", + "ignore_active_older_files", "on", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + if (!TEST_CHECK(ret == 0)) { + test_tail_ctx_destroy(ctx); + return; + } + + /* Write first message and allow it to be flushed */ + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + return; + } + + /* Wait until msg1 is consumed before starting the aging clock. */ + wait_expected_num_with_timeout(5000, expected_before_rotate, &num); + if (!TEST_CHECK(num == expected_before_rotate)) { + TEST_MSG("msg1 not consumed in time. got=%d", num); + test_tail_ctx_destroy(ctx); + return; + } + + /* + * Wait long enough for the purge callback (rotate_wait=1s) to fire and + * detect that the file's mtime is older than ignore_older=2s, which + * removes the file from monitoring and registers its inode as aged-out. + */ + flb_time_msleep(4000); + + /* Append new content: this updates mtime so the file is no longer old */ + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + return; + } + + /* + * Wait for the scan callback (refresh_interval=1s) to re-evaluate the + * aged-out entry, find the fresh mtime, unregister the entry, and + * re-add the file. The file is re-added at the stored offset (the read + * position saved when the file was aged out), so only msg2 — the content + * that refreshed the mtime — is flushed (count += 1). + */ + wait_expected_num_with_timeout(5000, expected, &num); + + if (!TEST_CHECK(num == expected)) { + TEST_MSG("output num error. expect=%d got=%d", expected, num); + } + + test_tail_ctx_destroy(ctx); +} + +void flb_test_in_tail_ignore_active_older_files_reread_on_update_default_read_from_head() +{ + struct flb_lib_out_cb cb_data; + struct test_tail_ctx *ctx; + char *file[] = {"source_file_reread_default.log"}; + char *path = "source_file_reread_default.log"; + char *msg = "TEST LINE"; + const int expected = 2; + const int expected_before_rotate = 1; + int ret; + int num; + int unused; + + clear_output_num(); + + cb_data.cb = cb_count_msgpack; + cb_data.data = &unused; + + ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + return; + } + + /* + * Do not set read_newly_discovered_files_from_head — leave it at its + * default (true). The fix in set_file_position must honour the saved + * offset even when ctx->read_from_head is true, so only msg2 is flushed + * on re-pickup rather than replaying msg1 from the start. + */ + ret = flb_input_set(ctx->flb, ctx->o_ffd, + "path", path, + "ignore_older", "2s", + "rotate_wait", "1s", + "refresh_interval", "1s", + "ignore_active_older_files", "on", + NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx->flb); + if (!TEST_CHECK(ret == 0)) { + test_tail_ctx_destroy(ctx); + return; + } + + /* Write first message and allow it to be flushed */ + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + return; + } + + /* Wait until msg1 is consumed before starting the aging clock. */ + wait_expected_num_with_timeout(5000, expected_before_rotate, &num); + if (!TEST_CHECK(num == expected_before_rotate)) { + TEST_MSG("msg1 not consumed in time. got=%d", num); + test_tail_ctx_destroy(ctx); + return; + } + + /* + * Wait long enough for the purge callback (rotate_wait=1s) to fire and + * age out the file. + */ + flb_time_msleep(4000); + + /* Append new content: updates mtime so the file is no longer old */ + ret = write_msg(ctx, msg, strlen(msg)); + if (!TEST_CHECK(ret > 0)) { + test_tail_ctx_destroy(ctx); + return; + } + + /* + * The scan callback re-adds the file from the stored offset. With the + * default read_newly_discovered_files_from_head=true, set_file_position + * must still seek to the saved offset so msg1 is not replayed. + * Total expected: 1 (msg1) + 1 (msg2) = 2. + */ + wait_expected_num_with_timeout(5000, expected, &num); + + if (!TEST_CHECK(num == expected)) { + TEST_MSG("output num error. expect=%d got=%d", expected, num); + } + + test_tail_ctx_destroy(ctx); +} + void flb_test_inotify_watcher_false() { struct flb_lib_out_cb cb_data; @@ -2693,6 +2881,8 @@ TEST_LIST = { {"skip_empty_lines_crlf", flb_test_skip_empty_lines_crlf}, {"ignore_older", flb_test_ignore_older}, {"ignore_active_older_files", flb_test_in_tail_ignore_active_older_files}, + {"ignore_active_older_files_reread_on_update", flb_test_in_tail_ignore_active_older_files_reread_on_update}, + {"ignore_active_older_files_reread_on_update_default_read_from_head", flb_test_in_tail_ignore_active_older_files_reread_on_update_default_read_from_head}, #ifdef FLB_HAVE_INOTIFY {"inotify_watcher_false", flb_test_inotify_watcher_false}, #endif /* FLB_HAVE_INOTIFY */