diff --git a/dynamodb-eventbridge-scheduler/README.md b/dynamodb-eventbridge-scheduler/README.md new file mode 100644 index 000000000..5de410668 --- /dev/null +++ b/dynamodb-eventbridge-scheduler/README.md @@ -0,0 +1,194 @@ +# Dynamic Amazon EventBridge Scheduler from Amazon DynamoDB Streams + +This pattern demonstrates how to dynamically create, update, and delete Amazon EventBridge Scheduler schedules based on changes in a DynamoDB table using DynamoDB Streams. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +2. Change directory to the pattern directory: + ``` + cd serverless-patterns/dynamodb-eventbridge-scheduler + ``` +3. From the command line, use AWS SAM to build and deploy the AWS resources for the pattern as specified in the template.yml file: + ``` + sam build + sam deploy --guided + ``` +4. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Allow SAM CLI to create IAM roles with the required permissions. + + Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults. + +5. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. + +## How it works + +This pattern creates a serverless solution that automatically manages EventBridge Scheduler schedules based on DynamoDB table changes: + +1. **DynamoDB Table** (`ScheduleConfigs`) - Stores schedule configurations with streams enabled +2. **Stream Processor Lambda** - Automatically triggered by DynamoDB Stream events to: + - Create schedules when items are inserted + - Update schedules when items are modified + - Delete schedules when items are removed +3. **EventBridge Scheduler** - Executes schedules at specified times +4. **Target Lambda** - The function invoked by EventBridge Scheduler when schedules fire +5. **Auto-Test Schedule** - Automatically creates a test schedule 2 minutes after deployment + +### Architecture Flow + +``` +User inserts item into DynamoDB + ↓ +DynamoDB Stream captures change + ↓ +StreamProcessorFunction triggered + ↓ +Creates/Updates/Deletes EventBridge Schedule + ↓ +EventBridge Scheduler invokes TargetLambdaFunction at scheduled time +``` + +### DynamoDB Item Structure + +```json +{ + "scheduleId": "unique-schedule-id", + "scheduleExpression": "at(2026-02-15T10:00:00)", + "payload": "{\"key\": \"value\"}", + "enabled": true +} +``` + +**Required Fields:** +- `scheduleId` (String) - Unique identifier for the schedule +- `scheduleExpression` (String) - EventBridge Scheduler expression (rate, cron, or at) + +**Optional Fields:** +- `payload` (String) - JSON string passed to the target Lambda +- `enabled` (Boolean) - Whether the schedule is enabled (default: true) + +## Testing + +### Verify Auto-Created Test Schedule + +After deployment, a test schedule is automatically created that fires 5 minutes later (UTC time): + +1. Check if the schedule was created: +```bash +aws scheduler get-schedule --name auto-test-schedule +``` + +2. View the schedule in EventBridge Console: + - Navigate to EventBridge → Scheduler → Schedules + - Look for `auto-test-schedule` + - Note the "Next invocation" time (displayed in your local timezone) + +3. After 5 minutes, check the Target Lambda logs: +```bash +aws logs tail /aws/lambda/ScheduledTaskExecutor --follow +``` + +**Note:** All schedule times use UTC timezone. EventBridge Scheduler expressions use the format `at(YYYY-MM-DDTHH:MM:SS)` in UTC. + +### Create Your Own Schedule + +1. Get the DynamoDB table name from the stack outputs: +```bash +aws cloudformation describe-stacks --stack-name \ + --query 'Stacks[0].Outputs[?OutputKey==`TableName`].OutputValue' \ + --output text +``` + +2. Insert a new schedule (set time to a few minutes in the future in UTC): +```bash +aws dynamodb put-item \ + --table-name ScheduleConfigs \ + --item '{ + "scheduleId": {"S": "my-test-schedule"}, + "scheduleExpression": {"S": "at(2026-02-12T20:00:00)"}, + "payload": {"S": "{\"message\": \"Hello from my schedule\"}"}, + "enabled": {"BOOL": true} + }' +``` + +**Important:** The time must be in UTC and in the future. To get current UTC time: +```bash +date -u +"%Y-%m-%dT%H:%M:%S" +``` + +3. Verify the schedule was created: +```bash +aws scheduler get-schedule --name my-test-schedule +``` + +4. Check StreamProcessor logs to see the creation: +```bash +aws logs tail /aws/lambda/ScheduleStreamProcessor --follow +``` + +### Update a Schedule + +```bash +aws dynamodb update-item \ + --table-name ScheduleConfigs \ + --key '{"scheduleId": {"S": "my-test-schedule"}}' \ + --update-expression "SET scheduleExpression = :expr" \ + --expression-attribute-values '{":expr": {"S": "at(2026-02-12T21:00:00)"}}' +``` + +**Note:** Time must be in UTC and in the future. + +### Delete a Schedule + +```bash +aws dynamodb delete-item \ + --table-name ScheduleConfigs \ + --key '{"scheduleId": {"S": "my-test-schedule"}}' +``` + +Verify deletion: +```bash +aws scheduler get-schedule --name my-test-schedule +# Should return ResourceNotFoundException +``` + +## Cleanup + +1. Delete the stack + ```bash + sam delete + ``` + +2. Confirm the stack has been deleted + ```bash + aws cloudformation list-stacks --query "StackSummaries[?contains(StackName,'STACK_NAME')].StackStatus" + ``` + +## Resources + +- [Amazon EventBridge Scheduler](https://docs.aws.amazon.com/scheduler/latest/UserGuide/what-is-scheduler.html) +- [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) +- [AWS SAM Developer Guide](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/what-is-sam.html) + +This pattern was contributed by Luigi Napoleone Capasso + +---- +Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/dynamodb-eventbridge-scheduler/dynamodb-eventbridge-scheduler.json b/dynamodb-eventbridge-scheduler/dynamodb-eventbridge-scheduler.json new file mode 100644 index 000000000..ee2f2b8fa --- /dev/null +++ b/dynamodb-eventbridge-scheduler/dynamodb-eventbridge-scheduler.json @@ -0,0 +1,109 @@ +{ + "title": "Dynamic Amazon EventBridge Scheduler from Amazon DynamoDB Streams", + "description": "Automatically create, update, and delete EventBridge Scheduler schedules based on DynamoDB table changes using DynamoDB Streams.", + "language": "Python", + "level": "200", + "framework": "AWS SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to dynamically manage EventBridge Scheduler schedules by storing schedule configurations in a DynamoDB table.", + "When items are inserted, updated, or deleted from the DynamoDB table, a Lambda function is automatically triggered via DynamoDB Streams.", + "The Lambda function then creates, updates, or deletes the corresponding EventBridge Scheduler schedule.", + "When the scheduled time arrives, EventBridge Scheduler invokes the target Lambda function with the configured payload.", + "This pattern includes an auto-test feature that creates a sample schedule 2 minutes after deployment to verify the solution works end-to-end." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/dynamodb-eventbridge-scheduler", + "templateURL": "serverless-patterns/dynamodb-eventbridge-scheduler", + "projectFolder": "dynamodb-eventbridge-scheduler", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon EventBridge Scheduler", + "link": "https://docs.aws.amazon.com/scheduler/latest/UserGuide/what-is-scheduler.html" + }, + { + "text": "DynamoDB Streams", + "link": "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html" + }, + { + "text": "AWS Lambda Event Source Mapping", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html" + }, + { + "text": "Schedule expressions for EventBridge Scheduler", + "link": "https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Luigi Napoleone Capasso", + "bio": "Technical Account Manager @ AWS", + "linkedin": "" + } + ], + "patternArch": { + "icon1": { + "x": 15, + "y": 50, + "service": "dynamodb", + "label": "Amazon DynamoDB" + }, + "icon2": { + "x": 40, + "y": 50, + "service": "lambda", + "label": "AWS Lambda" + }, + "icon3": { + "x": 65, + "y": 50, + "service": "eventbridge", + "label": "EventBridge" + }, + "icon4": { + "x": 90, + "y": 50, + "service": "lambda", + "label": "AWS Lambda" + }, + "line1": { + "from": "icon1", + "to": "icon2", + "label": "streams" + }, + "line2": { + "from": "icon2", + "to": "icon3", + "label": "schedules" + }, + "line3": { + "from": "icon3", + "to": "icon4", + "label": "invokes" + } + } +} diff --git a/dynamodb-eventbridge-scheduler/example-pattern.json b/dynamodb-eventbridge-scheduler/example-pattern.json new file mode 100644 index 000000000..8e8717a64 --- /dev/null +++ b/dynamodb-eventbridge-scheduler/example-pattern.json @@ -0,0 +1,109 @@ +{ + "title": "Dynamic Amazon EventBridge Scheduler from Amazon DynamoDB Streams", + "description": "Automatically create, update, and delete EventBridge Scheduler schedules based on DynamoDB table changes using DynamoDB Streams.", + "language": "Python", + "level": "200", + "framework": "AWS SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to dynamically manage EventBridge Scheduler schedules by storing schedule configurations in a DynamoDB table.", + "When items are inserted, updated, or deleted from the DynamoDB table, a Lambda function is automatically triggered via DynamoDB Streams.", + "The Lambda function then creates, updates, or deletes the corresponding EventBridge Scheduler schedule.", + "When the scheduled time arrives, EventBridge Scheduler invokes the target Lambda function with the configured payload.", + "This pattern includes an auto-test feature that creates a sample schedule 2 minutes after deployment to verify the solution works end-to-end." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/dynamodb-eventbridge-scheduler", + "templateURL": "serverless-patterns/dynamodb-eventbridge-scheduler", + "projectFolder": "dynamodb-eventbridge-scheduler", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon EventBridge Scheduler", + "link": "https://docs.aws.amazon.com/scheduler/latest/UserGuide/what-is-scheduler.html" + }, + { + "text": "DynamoDB Streams", + "link": "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html" + }, + { + "text": "AWS Lambda Event Source Mapping", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html" + }, + { + "text": "Schedule expressions for EventBridge Scheduler", + "link": "https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Luigi Napoleone Capasso", + "bio": "Technical Account Manager @ AWS", + "linkedin": "" + } + ], + "patternArch": { + "icon1": { + "x": 20, + "y": 50, + "service": "dynamodb", + "label": "DynamoDB Table with Streams" + }, + "icon2": { + "x": 50, + "y": 50, + "service": "lambda", + "label": "Stream Processor Lambda" + }, + "icon3": { + "x": 80, + "y": 30, + "service": "eventbridge", + "label": "EventBridge Scheduler" + }, + "icon4": { + "x": 80, + "y": 70, + "service": "lambda", + "label": "Target Lambda" + }, + "line1": { + "from": "icon1", + "to": "icon2", + "label": "Stream triggers" + }, + "line2": { + "from": "icon2", + "to": "icon3", + "label": "Manages schedules" + }, + "line3": { + "from": "icon3", + "to": "icon4", + "label": "Invokes at scheduled time" + } + } +} diff --git a/dynamodb-eventbridge-scheduler/template.yaml b/dynamodb-eventbridge-scheduler/template.yaml new file mode 100644 index 000000000..c9635749d --- /dev/null +++ b/dynamodb-eventbridge-scheduler/template.yaml @@ -0,0 +1,255 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: 'Dynamic Amazon EventBridge Scheduler creation from Amazon DynamoDB Streams' + +Resources: + # DynamoDB Table with Streams enabled + ScheduleConfigTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: ScheduleConfigs + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: scheduleId + AttributeType: S + KeySchema: + - AttributeName: scheduleId + KeyType: HASH + StreamSpecification: + StreamViewType: NEW_AND_OLD_IMAGES + + # Stream Processor Lambda + StreamProcessorFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: ScheduleStreamProcessor + Runtime: python3.12 + Handler: index.lambda_handler + Timeout: 60 + Environment: + Variables: + SCHEDULER_ROLE_ARN: !GetAtt SchedulerExecutionRole.Arn + TARGET_LAMBDA_ARN: !GetAtt TargetLambdaFunction.Arn + Policies: + - DynamoDBStreamReadPolicy: + TableName: !Ref ScheduleConfigTable + StreamName: !Select [3, !Split ["/", !GetAtt ScheduleConfigTable.StreamArn]] + - Statement: + - Effect: Allow + Action: + - scheduler:CreateSchedule + - scheduler:UpdateSchedule + - scheduler:DeleteSchedule + - scheduler:GetSchedule + Resource: !Sub 'arn:aws:scheduler:${AWS::Region}:${AWS::AccountId}:schedule/default/*' + - Effect: Allow + Action: iam:PassRole + Resource: !GetAtt SchedulerExecutionRole.Arn + Events: + DynamoDBStream: + Type: DynamoDB + Properties: + Stream: !GetAtt ScheduleConfigTable.StreamArn + StartingPosition: LATEST + MaximumBatchingWindowInSeconds: 1 + BatchSize: 10 + InlineCode: | + import boto3 + import json + import os + + scheduler = boto3.client('scheduler') + SCHEDULER_ROLE_ARN = os.environ['SCHEDULER_ROLE_ARN'] + TARGET_LAMBDA_ARN = os.environ['TARGET_LAMBDA_ARN'] + + def lambda_handler(event, context): + for record in event['Records']: + try: + event_name = record['eventName'] + + if event_name == 'INSERT': + item = record['dynamodb']['NewImage'] + create_schedule(item) + elif event_name == 'MODIFY': + item = record['dynamodb']['NewImage'] + update_schedule(item) + elif event_name == 'REMOVE': + item = record['dynamodb']['OldImage'] + delete_schedule(item) + except Exception as e: + print(f"Error processing record: {e}") + print(f"Record data: {json.dumps(record, default=str)}") + raise + + def validate_item(item): + required_fields = ['scheduleId', 'scheduleExpression'] + missing = [f for f in required_fields if f not in item] + if missing: + raise ValueError(f"Missing required fields: {missing}. Item: {json.dumps(item, default=str)}") + + def create_schedule(item): + validate_item(item) + scheduler.create_schedule( + Name=item['scheduleId']['S'], + ScheduleExpression=item['scheduleExpression']['S'], + FlexibleTimeWindow={'Mode': 'OFF'}, + Target={ + 'Arn': TARGET_LAMBDA_ARN, + 'RoleArn': SCHEDULER_ROLE_ARN, + 'Input': item.get('payload', {}).get('S', '{}') + }, + State='ENABLED' if item.get('enabled', {}).get('BOOL', True) else 'DISABLED' + ) + print(f"Created schedule: {item['scheduleId']['S']}") + + def update_schedule(item): + validate_item(item) + scheduler.update_schedule( + Name=item['scheduleId']['S'], + ScheduleExpression=item['scheduleExpression']['S'], + FlexibleTimeWindow={'Mode': 'OFF'}, + Target={ + 'Arn': TARGET_LAMBDA_ARN, + 'RoleArn': SCHEDULER_ROLE_ARN, + 'Input': item.get('payload', {}).get('S', '{}') + }, + State='ENABLED' if item.get('enabled', {}).get('BOOL', True) else 'DISABLED' + ) + print(f"Updated schedule: {item['scheduleId']['S']}") + + def delete_schedule(item): + if 'scheduleId' not in item: + print(f"Cannot delete: missing scheduleId in item") + return + try: + scheduler.delete_schedule(Name=item['scheduleId']['S']) + print(f"Deleted schedule: {item['scheduleId']['S']}") + except scheduler.exceptions.ResourceNotFoundException: + print(f"Schedule not found: {item['scheduleId']['S']}") + + # IAM Role for EventBridge Scheduler to invoke target Lambda + SchedulerExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: scheduler.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: InvokeLambda + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: lambda:InvokeFunction + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:*' + + # Example Target Lambda (the one that gets invoked by schedules) + TargetLambdaFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: ScheduledTaskExecutor + Runtime: python3.12 + Handler: index.lambda_handler + InlineCode: | + import json + + def lambda_handler(event, context): + print(f"Scheduled task executed with payload: {json.dumps(event)}") + + return {'statusCode': 200, 'body': 'Task completed'} + + # Custom Resource to auto-create a test schedule + TestScheduleCreator: + Type: AWS::Serverless::Function + Properties: + Runtime: python3.12 + Handler: index.handler + Timeout: 60 + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref ScheduleConfigTable + InlineCode: | + import boto3 + import json + import cfnresponse + from datetime import datetime, timedelta, timezone + + dynamodb = boto3.resource('dynamodb') + + def handler(event, context): + try: + if event['RequestType'] == 'Create': + table = dynamodb.Table(event['ResourceProperties']['TableName']) + + # Create a schedule 5 minutes from now (UTC) + schedule_time = datetime.now(timezone.utc) + timedelta(minutes=5) + schedule_expression = schedule_time.strftime('at(%Y-%m-%dT%H:%M:%S)') + + print(f"Current UTC time: {datetime.now(timezone.utc)}") + print(f"Schedule will fire at: {schedule_time} UTC") + print(f"Schedule expression: {schedule_expression}") + + table.put_item(Item={ + 'scheduleId': 'auto-test-schedule', + 'scheduleExpression': schedule_expression, + 'payload': json.dumps({ + 'message': 'Auto-created test schedule', + 'scheduledTime': schedule_time.isoformat(), + 'createdAt': datetime.now(timezone.utc).isoformat() + }), + 'enabled': True + }) + + print(f"Successfully created test schedule in DynamoDB") + cfnresponse.send(event, context, cfnresponse.SUCCESS, { + 'ScheduleTime': schedule_time.isoformat(), + 'ScheduleExpression': schedule_expression + }) + else: + cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) + except Exception as e: + print(f"Error creating test schedule: {e}") + import traceback + traceback.print_exc() + cfnresponse.send(event, context, cfnresponse.FAILED, {'Error': str(e)}) + + TriggerTestSchedule: + Type: Custom::TestSchedule + Properties: + ServiceToken: !GetAtt TestScheduleCreator.Arn + TableName: !Ref ScheduleConfigTable + +Outputs: + TableName: + Description: DynamoDB Table Name + Value: !Ref ScheduleConfigTable + + StreamProcessorFunctionArn: + Description: Stream Processor Lambda ARN + Value: !GetAtt StreamProcessorFunction.Arn + + TargetLambdaArn: + Description: Target Lambda ARN + Value: !GetAtt TargetLambdaFunction.Arn + + TestScheduleId: + Description: Auto-created test schedule ID + Value: auto-test-schedule + + TestScheduleNote: + Description: Important timing information + Value: "The test schedule fires 5 minutes after deployment in UTC timezone. Check EventBridge Scheduler console for exact time." + + ExampleDynamoDBItem: + Description: Example item to insert into DynamoDB + Value: !Sub | + { + "scheduleId": "user-123-reminder", + "scheduleExpression": "at(2026-02-15T10:00:00)", + "payload": "{\"userId\": \"123\", \"action\": \"send-reminder\"}", + "enabled": true + } \ No newline at end of file