Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 130 additions & 13 deletions plugins/out_azure_blob/azure_blob.c
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,13 @@ static int create_blob(struct flb_azure_blob *ctx, const char *path_prefix, char
}

/* Prepare headers and authentication */
azb_http_client_setup(ctx, c, -1, FLB_TRUE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
ret = azb_http_client_setup(ctx, c, -1, FLB_TRUE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to setup HTTP client");
status = FLB_RETRY;
goto cleanup_create;
}

/* Send HTTP request */
ret = flb_http_do(c, &b_sent);
Expand Down Expand Up @@ -817,8 +822,13 @@ static int delete_blob(struct flb_azure_blob *ctx,
}

/* Prepare headers and authentication */
azb_http_client_setup(ctx, c, -1, FLB_TRUE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
ret = azb_http_client_setup(ctx, c, -1, FLB_TRUE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to setup HTTP client");
status = FLB_RETRY;
goto cleanup_delete;
}

/* Send HTTP request */
ret = flb_http_do(c, &b_sent);
Expand Down Expand Up @@ -1000,8 +1010,17 @@ static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
}

/* Prepare headers and authentication */
azb_http_client_setup(ctx, c, (ssize_t) payload_size, FLB_FALSE,
content_type, content_encoding);
ret = azb_http_client_setup(ctx, c, (ssize_t) payload_size, FLB_FALSE,
content_type, content_encoding);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to setup HTTP client");
if (compressed == FLB_TRUE) {
flb_free(payload_buf);
}
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
return FLB_RETRY;
}

/* Send HTTP request */
ret = flb_http_do(c, &b_sent);
Expand Down Expand Up @@ -1228,13 +1247,21 @@ static int create_container(struct flb_azure_blob *ctx, char *name)
NULL, 0, NULL, 0, NULL, 0);
if (!c) {
flb_plg_error(ctx->ins, "cannot create HTTP client context");
flb_sds_destroy(uri);
flb_upstream_conn_release(u_conn);
return FLB_FALSE;
}

/* Prepare headers and authentication */
azb_http_client_setup(ctx, c, -1, FLB_FALSE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
ret = azb_http_client_setup(ctx, c, -1, FLB_FALSE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to setup HTTP client");
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
flb_sds_destroy(uri);
return FLB_FALSE;
}

/* Send HTTP request */
ret = flb_http_do(c, &b_sent);
Expand Down Expand Up @@ -1320,21 +1347,30 @@ static int ensure_container(struct flb_azure_blob *ctx)
NULL, 0, NULL, 0, NULL, 0);
if (!c) {
flb_plg_error(ctx->ins, "cannot create HTTP client context");
flb_sds_destroy(uri);
flb_upstream_conn_release(u_conn);
return FLB_FALSE;
}
flb_http_strip_port_from_host(c);

/* Prepare headers and authentication */
azb_http_client_setup(ctx, c, -1, FLB_FALSE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
ret = azb_http_client_setup(ctx, c, -1, FLB_FALSE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to setup HTTP client");
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
flb_sds_destroy(uri);
return FLB_FALSE;
}

/* Send HTTP request */
ret = flb_http_do(c, &b_sent);
flb_sds_destroy(uri);

if (ret == -1) {
flb_plg_error(ctx->ins, "error requesting container properties");
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
return FLB_FALSE;
}
Expand Down Expand Up @@ -1372,6 +1408,7 @@ static int ensure_container(struct flb_azure_blob *ctx)
static int cb_azure_blob_init(struct flb_output_instance *ins,
struct flb_config *config, void *data)
{
int ret;
struct flb_azure_blob *ctx = NULL;
(void) ins;
(void) config;
Expand All @@ -1384,8 +1421,9 @@ static int cb_azure_blob_init(struct flb_output_instance *ins,
return -1;
}

ctx->ins = ins;

if (ctx->buffering_enabled == FLB_TRUE) {
ctx->ins = ins;
ctx->retry_time = 0;

/* Initialize local storage */
Expand All @@ -1399,14 +1437,17 @@ static int cb_azure_blob_init(struct flb_output_instance *ins,
/* validate 'total_file_size' */
if (ctx->file_size <= 0) {
flb_plg_error(ctx->ins, "Failed to parse upload_file_size");
azure_blob_store_exit(ctx);
return -1;
}
if (ctx->file_size < 1000000) {
flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB");
azure_blob_store_exit(ctx);
return -1;
}
if (ctx->file_size > MAX_FILE_SIZE) {
flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE);
azure_blob_store_exit(ctx);
return -1;
}
ctx->has_old_buffers = azure_blob_store_has_data(ctx);
Expand All @@ -1415,6 +1456,59 @@ static int cb_azure_blob_init(struct flb_output_instance *ins,
flb_plg_info(ctx->ins, "Using upload size %lu bytes", ctx->file_size);
}

/* Initialize OAuth2 context for service principal auth */
if (ctx->atype == AZURE_BLOB_AUTH_SERVICE_PRINCIPAL) {
ret = pthread_mutex_init(&ctx->token_mutex, NULL);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to initialize token mutex");
if (ctx->buffering_enabled == FLB_TRUE) {
azure_blob_store_exit(ctx);
}
flb_azure_blob_conf_destroy(ctx);
return -1;
}

flb_sds_t token_url;

token_url = flb_sds_create_size(256);
if (!token_url) {
flb_plg_error(ctx->ins, "failed to allocate token URL");
pthread_mutex_destroy(&ctx->token_mutex);
if (ctx->buffering_enabled == FLB_TRUE) {
azure_blob_store_exit(ctx);
}
flb_azure_blob_conf_destroy(ctx);
return -1;
}

ret = flb_sds_snprintf(&token_url, flb_sds_alloc(token_url),
"%s/%s/oauth2/v2.0/token",
AZURE_BLOB_DEFAULT_AUTHORITY_HOST, ctx->tenant_id);
if (ret < 0) {
flb_plg_error(ctx->ins, "failed to build token URL");
flb_sds_destroy(token_url);
pthread_mutex_destroy(&ctx->token_mutex);
if (ctx->buffering_enabled == FLB_TRUE) {
azure_blob_store_exit(ctx);
}
flb_azure_blob_conf_destroy(ctx);
return -1;
}

ctx->o = flb_oauth2_create(ctx->config, token_url, AZURE_BLOB_TOKEN_REFRESH);
flb_sds_destroy(token_url);

if (!ctx->o) {
flb_plg_error(ctx->ins, "failed to create OAuth2 context");
pthread_mutex_destroy(&ctx->token_mutex);
if (ctx->buffering_enabled == FLB_TRUE) {
azure_blob_store_exit(ctx);
}
flb_azure_blob_conf_destroy(ctx);
return -1;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}

flb_output_set_context(ins, ctx);

flb_output_set_http_debug_callbacks(ins);
Expand Down Expand Up @@ -1691,7 +1785,7 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context
const char *commit_prefix = azb_commit_prefix_with_fallback(ctx, path_prefix);

ret = azb_block_blob_commit_file_parts(ctx, file_id, file_path, part_ids, commit_prefix);
if (ret == -1) {
if (ret != FLB_OK) {
flb_plg_error(ctx->ins, "cannot commit blob file parts for file id=%" PRIu64 " path=%s",
file_id, file_path);
}
Expand Down Expand Up @@ -2390,6 +2484,11 @@ static int cb_azure_blob_exit(void *data, struct flb_config *config)
ctx->u = NULL;
}

/* Destroy token mutex for service principal auth */
if (ctx->atype == AZURE_BLOB_AUTH_SERVICE_PRINCIPAL) {
pthread_mutex_destroy(&ctx->token_mutex);
}

flb_azure_blob_conf_destroy(ctx);
return 0;
}
Expand Down Expand Up @@ -2514,7 +2613,7 @@ static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "auth_type", "key",
0, FLB_TRUE, offsetof(struct flb_azure_blob, auth_type),
"Set the auth type: key or sas"
"Set the auth type: key, sas, or service_principal"
},

{
Expand All @@ -2523,6 +2622,24 @@ static struct flb_config_map config_map[] = {
"Azure Blob SAS token"
},

{
FLB_CONFIG_MAP_STR, "tenant_id", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, tenant_id),
"Azure AD tenant ID (required for service_principal auth)"
},

{
FLB_CONFIG_MAP_STR, "client_id", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, client_id),
"Azure AD client ID (required for service_principal auth)"
},

{
FLB_CONFIG_MAP_STR, "client_secret", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, client_secret),
"Azure AD client secret (required for service_principal auth)"
},

{
FLB_CONFIG_MAP_STR, "database_file", NULL,
0, FLB_TRUE, offsetof(struct flb_azure_blob, database_file),
Expand Down
16 changes: 16 additions & 0 deletions plugins/out_azure_blob/azure_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_sqldb.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_oauth2.h>

/* Content-Type */
#define AZURE_BLOB_CT "Content-Type"
Expand Down Expand Up @@ -53,6 +54,12 @@

#define AZURE_BLOB_AUTH_KEY 0
#define AZURE_BLOB_AUTH_SAS 1
#define AZURE_BLOB_AUTH_SERVICE_PRINCIPAL 2

/* OAuth2 defaults for service principal authentication */
#define AZURE_BLOB_DEFAULT_AUTHORITY_HOST "https://login.microsoftonline.com"
#define AZURE_BLOB_OAUTH_SCOPE "https://storage.azure.com/.default"
#define AZURE_BLOB_TOKEN_REFRESH 3000 /* refresh token every 50 minutes */

struct flb_azure_blob {
int auto_create_container;
Expand All @@ -69,6 +76,11 @@ struct flb_azure_blob {
flb_sds_t date_key;
flb_sds_t auth_type;
flb_sds_t sas_token;

/* Service Principal authentication fields */
flb_sds_t tenant_id;
flb_sds_t client_id;
flb_sds_t client_secret;
flb_sds_t database_file;
size_t part_size;
time_t upload_parts_timeout;
Expand Down Expand Up @@ -125,6 +137,10 @@ struct flb_azure_blob {
unsigned char *decoded_sk; /* decoded shared key */
size_t decoded_sk_size; /* size of decoded shared key */

/* Service Principal OAuth2 context */
struct flb_oauth2 *o;
pthread_mutex_t token_mutex;

#ifdef FLB_HAVE_SQLDB
/*
* SQLite by default is not built with multi-threading enabled, and
Expand Down
14 changes: 11 additions & 3 deletions plugins/out_azure_blob/azure_blob_blockblob.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,24 @@ int azb_block_blob_put_block_list(struct flb_azure_blob *ctx, flb_sds_t uri, flb
}

/* Prepare headers and authentication */
azb_http_client_setup(ctx, c, flb_sds_len(payload),
FLB_FALSE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
ret = azb_http_client_setup(ctx, c, flb_sds_len(payload),
FLB_FALSE,
AZURE_BLOB_CT_NONE, AZURE_BLOB_CE_NONE);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to setup HTTP client");
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
return FLB_RETRY;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

/* Send HTTP request */
ret = flb_http_do(c, &b_sent);

/* Validate HTTP status */
if (ret == -1) {
flb_plg_error(ctx->ins, "error sending block_blob");
flb_http_client_destroy(c);
flb_upstream_conn_release(u_conn);
return FLB_RETRY;
}

Expand Down
Loading
Loading