diff --git a/README.md b/README.md index a06fb83..c1548ba 100644 --- a/README.md +++ b/README.md @@ -1,169 +1,465 @@ -# goqueue +# ๐Ÿš€ GoQueue - Universal Go Message Queue Library -GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn. +[![Go Reference](https://pkg.go.dev/badge/github.com/bxcodec/goqueue.svg)](https://pkg.go.dev/github.com/bxcodec/goqueue) +[![Go Report Card](https://goreportcard.com/badge/github.com/bxcodec/goqueue)](https://goreportcard.com/report/github.com/bxcodec/goqueue) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) +[![GitHub stars](https://img.shields.io/github/stars/bxcodec/goqueue)](https://github.com/bxcodec/goqueue/stargazers) -## Index +**One library to rule them all** - A powerful, extensible, and developer-friendly Go wrapper that simplifies message queue operations across multiple platforms. Build robust, scalable applications with consistent queue operations, regardless of your underlying message broker. -- [Support](#support) -- [Getting Started](#getting-started) -- [Example](#example) -- [Advance Setups](#advance-setups) -- [Contribution](#contribution) +## โœจ Why GoQueue? -## Support +๐ŸŽฏ **Universal Interface** - Write once, run anywhere. Switch between queue providers without changing your code +โšก **Production Ready** - Built-in retry mechanisms, dead letter queues, and error handling +๐Ÿ›ก๏ธ **Type Safe** - Strongly typed interfaces with comprehensive error handling +๐Ÿ”ง **Extensible** - Plugin architecture for custom middleware and queue providers +๐Ÿ“Š **Observable** - Built-in logging and middleware support for monitoring +๐Ÿš€ **Developer Experience** - Intuitive API design with sensible defaults -You can file an [Issue](https://github.com/bxcodec/goqueue/issues/new). -See documentation in [Go.Dev](https://pkg.go.dev/github.com/bxcodec/goqueue?tab=doc) +--- + +## ๐Ÿ“‹ Table of Contents + +- [๐Ÿš€ Quick Start](#-quick-start) +- [๐Ÿ’ซ Features](#-features) +- [๐Ÿ› ๏ธ Installation](#๏ธ-installation) +- [๐Ÿ“– Basic Usage](#-basic-usage) +- [๐Ÿ”ง Advanced Features](#-advanced-features) +- [๐ŸŽฎ Examples](#-examples) +- [๐Ÿ—๏ธ Architecture](#๏ธ-architecture) +- [๐Ÿ“š Documentation](#-documentation) +- [๐Ÿค Contributing](#-contributing) +- [๐Ÿ“„ License](#-license) + +--- -## Getting Started +## ๐Ÿš€ Quick Start -#### Install +Get up and running in less than 5 minutes: -```shell +```bash go get -u github.com/bxcodec/goqueue ``` -# Example - ```go package main import ( "context" - "encoding/json" - "fmt" - "time" - - amqp "github.com/rabbitmq/amqp091-go" + "log" "github.com/bxcodec/goqueue" "github.com/bxcodec/goqueue/consumer" + "github.com/bxcodec/goqueue/publisher" "github.com/bxcodec/goqueue/interfaces" - "github.com/bxcodec/goqueue/middleware" - "github.com/bxcodec/goqueue/options" - consumerOpts "github.com/bxcodec/goqueue/options/consumer" - publisherOpts "github.com/bxcodec/goqueue/options/publisher" - "github.com/bxcodec/goqueue/publisher" ) -func initExchange(ch *amqp.Channel, exchangeName string) error { - return ch.ExchangeDeclare( - exchangeName, - "topic", - true, - false, - false, - false, - nil, - ) +func main() { + + // Create queue service + queueSvc := goqueue.NewQueueService( + options.WithConsumer(myConsumer), + options.WithPublisher(myPublisher), + options.WithMessageHandler(handleMessage), + ) + + // Publish a message + queueSvc.Publish(context.Background(), interfaces.Message{ + Data: map[string]interface{}{"hello": "world"}, + Action: "user.created", + Topic: "users", + }) + + // Start consuming + queueSvc.Start(context.Background()) } -func main() { +func handleMessage(ctx context.Context, m interfaces.InboundMessage) error { + log.Printf("Received: %v", m.Data) + return m.Ack(ctx) // Acknowledge successful processing +} +``` - // Initialize the RMQ connection - rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/" - rmqConn, err := amqp.Dial(rmqDSN) - if err != nil { - panic(err) - } +--- + +## ๐Ÿ’ซ Features + +### ๐ŸŽฏ **Core Features** + +- **Multi-Provider Support**: Currently supports RabbitMQ (more coming soon!) +- **Unified API**: Consistent interface across all queue providers +- **Type Safety**: Strongly typed message structures +- **Context Support**: Full Go context integration for cancellation and timeouts + +### ๐Ÿ›ก๏ธ **Reliability & Resilience** + +- **Automatic Retries**: Configurable retry mechanisms with exponential backoff +- **Dead Letter Queues**: Handle failed messages gracefully +- **Circuit Breaker**: Built-in protection against cascading failures +- **Graceful Shutdown**: Clean resource cleanup on application termination + +### ๐Ÿ”ง **Advanced Capabilities** + +- **Middleware System**: Extensible pipeline for message processing +- **Custom Serialization**: Support for JSON, Protocol Buffers, and custom formats +- **Message Routing**: Flexible topic and routing key patterns +- **Batching**: Efficient batch message processing +- **Connection Pooling**: Optimized connection management + +### ๐Ÿ“Š **Observability** + +- **Structured Logging**: Built-in zerolog integration +- **Metrics Ready**: Hooks for Prometheus, StatsD, and custom metrics +- **Tracing Support**: OpenTelemetry compatible +- **Health Checks**: Built-in health check endpoints + +--- + +## ๐Ÿ› ๏ธ Installation + +```bash +# Install the core library +go get -u github.com/bxcodec/goqueue +``` + +### Requirements + +- Go 1.21 or higher +- Message broker (RabbitMQ supported, more coming soon) + +--- + +## ๐Ÿ“– Basic Usage + +### ๐Ÿš€ Publisher Example + +```go +package main - // Initialize the Publisher - rmqPub := publisher.NewPublisher( +import ( + "context" + "github.com/bxcodec/goqueue/publisher" + publisherOpts "github.com/bxcodec/goqueue/options/publisher" + amqp "github.com/rabbitmq/amqp091-go" +) + +func main() { + // Connect to RabbitMQ + conn, _ := amqp.Dial("amqp://localhost:5672/") + + // Create publisher + pub := publisher.NewPublisher( publisherOpts.PublisherPlatformRabbitMQ, publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ - Conn: rmqConn, + Conn: conn, PublisherChannelPoolSize: 5, }), - publisherOpts.WithPublisherID("publisher_id"), - publisherOpts.WithMiddlewares( - middleware.HelloWorldMiddlewareExecuteBeforePublisher(), - middleware.HelloWorldMiddlewareExecuteAfterPublisher(), - ), - ) - - publisherChannel, err := rmqConn.Channel() + publisherOpts.WithPublisherID("my-service"), + ) + + // Publish message + err := pub.Publish(context.Background(), interfaces.Message{ + Data: map[string]interface{}{"user_id": 123, "action": "signup"}, + Action: "user.created", + Topic: "users", + }) if err != nil { - panic(err) - } + log.Fatal(err) + } +} +``` - defer publisherChannel.Close() - initExchange(publisherChannel, "goqueue") +### ๐Ÿ“จ Consumer Example - consumerChannel, err := rmqConn.Channel() - if err != nil { - panic(err) - } - defer consumerChannel.Close() - rmqConsumer := consumer.NewConsumer( +```go +package main + +import ( + "context" + "github.com/bxcodec/goqueue/consumer" + consumerOpts "github.com/bxcodec/goqueue/options/consumer" +) + +func main() { + // Create consumer + cons := consumer.NewConsumer( consumerOpts.ConsumerPlatformRabbitMQ, - consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern( - consumerChannel, - publisherChannel, - "goqueue", // exchange name - []string{"goqueue.payments.#"}, // routing keys pattern - )), - consumerOpts.WithConsumerID("consumer_id"), - consumerOpts.WithMiddlewares( - middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(), - middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(), - ), + consumerOpts.WithQueueName("user-events"), consumerOpts.WithMaxRetryFailedMessage(3), - consumerOpts.WithBatchMessageSize(1), - consumerOpts.WithQueueName("consumer_queue"), - ) - - queueSvc := goqueue.NewQueueService( - options.WithConsumer(rmqConsumer), - options.WithPublisher(rmqPub), - options.WithMessageHandler(handler()), - ) - go func() { - for i := 0; i < 10; i++ { - data := map[string]interface{}{ - "message": fmt.Sprintf("Hello World %d", i), - } - jbyt, _ := json.Marshal(data) - err := queueSvc.Publish(context.Background(), interfaces.Message{ - Data: data, - Action: "goqueue.payments.create", - Topic: "goqueue", - }) - if err != nil { - panic(err) - } - fmt.Println("Message Sent: ", string(jbyt)) - } - }() - - // change to context.Background() if you want to run it forever - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - err = queueSvc.Start(ctx) - if err != nil { - panic(err) - } + consumerOpts.WithBatchMessageSize(10), + ) + + // Start consuming + cons.Consume(context.Background(), messageHandler, metadata) } -func handler() interfaces.InboundMessageHandlerFunc { - return func(ctx context.Context, m interfaces.InboundMessage) (err error) { - data := m.Data - jbyt, _ := json.Marshal(data) - fmt.Println("Message Received: ", string(jbyt)) - return m.Ack(ctx) - } +func messageHandler(ctx context.Context, msg interfaces.InboundMessage) error { + // Process your message + userData := msg.Data.(map[string]interface{}) + + // Business logic here + if err := processUser(userData); err != nil { + // Retry with exponential backoff + return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + // Acknowledge successful processing + return msg.Ack(ctx) } +``` + +--- + +## ๐Ÿ”ง Advanced Features + +### ๐Ÿ”„ Retry Mechanisms +GoQueue provides sophisticated retry mechanisms with multiple strategies: + +```go +// Exponential backoff retry +return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + +// Custom retry logic +return msg.RetryWithDelayFn(ctx, func(retryCount int64) int64 { + return retryCount * 2 // Custom delay calculation +}) + +// Move to dead letter queue after max retries +return msg.MoveToDeadLetterQueue(ctx) ``` -## Advance Setups +### ๐Ÿ”Œ Middleware System -### RabbitMQ -- Retry Concept +Extend functionality with custom middleware: -![Goqueue Retry Architecture RabbitMQ](misc/images/rabbitmq-retry.png) -Src: [Excalidraw Link](https://link.excalidraw.com/readonly/9sphJpzXzQIAVov3z8G7) +```go +// Custom logging middleware +func LoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + start := time.Now() + err := next(ctx, m) + log.Printf("Message processed in %v", time.Since(start)) + return err + } + } +} + +// Apply middleware +cons := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithMiddlewares( + LoggingMiddleware(), + MetricsMiddleware(), + AuthMiddleware(), + ), +) +``` + +### ๐ŸŽ›๏ธ Configuration Options -## Contribution +Fine-tune your queue behavior: + +```go +cons := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithQueueName("high-priority-queue"), + consumerOpts.WithMaxRetryFailedMessage(5), + consumerOpts.WithBatchMessageSize(50), + consumerOpts.WithConsumerID("worker-01"), + consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{ + ConsumerChannel: channel, + ReQueueChannel: requeueChannel, + QueueDeclareConfig: &consumerOpts.RabbitMQQueueDeclareConfig{ + Durable: true, + AutoDelete: false, + Exclusive: false, + }, + }), +) +``` --- -To contrib to this project, you can open a PR or an issue. +## ๐ŸŽฎ Examples + +### ๐Ÿ“ Complete Examples + +Explore our comprehensive examples: + +- **[Basic Usage](examples/rabbitmq/basic/)** - Simple publish/consume example +- **[With Retries](examples/rabbitmq/withretries/)** - Advanced retry mechanisms +- **[Middleware](examples/middleware/)** - Custom middleware implementation +- **[Production Setup](examples/production/)** - Production-ready configuration + +### ๐Ÿฐ RabbitMQ Quick Setup + +Start RabbitMQ with Docker: + +```bash +# Clone the repository +git clone https://github.com/bxcodec/goqueue.git +cd goqueue/examples/rabbitmq/basic + +# Start RabbitMQ +docker-compose up -d + +# Run the example +go run main.go +``` + +### ๐Ÿ”„ Retry Architecture + +![GoQueue Retry Architecture](misc/images/rabbitmq-retry.png) + +_Automatic retry mechanism with exponential backoff and dead letter queue_ + +--- + +## ๐Ÿ—๏ธ Architecture + +### ๐ŸŽฏ Design Principles + +- **Interface Segregation**: Clean, focused interfaces for different responsibilities +- **Dependency Injection**: Easy testing and swappable implementations +- **Error Handling**: Comprehensive error types and recovery mechanisms +- **Performance**: Optimized for high-throughput scenarios +- **Extensibility**: Plugin architecture for custom providers and middleware + +### ๐Ÿงฉ Core Components + +![Core Concept](misc/images/core-concept.png) + +### ๐Ÿ“ฆ Provider Support + +| Provider | Status | Features | +| -------------- | --------------- | -------------------- | +| RabbitMQ | ๐Ÿ”„ Beta Version | Full feature support | +| Google Pub/Sub | ๐Ÿ“‹ Planned | Coming soon | +| AWS SQS | ๐Ÿ“‹ Planned | Coming soon | +| Redis Streams | ๐Ÿ“‹ Planned | Coming soon | + +--- + +## ๐Ÿ”ง Configuration + +### ๐Ÿ“ Logging Setup + +GoQueue uses structured logging with zerolog: + +```go +import "github.com/bxcodec/goqueue" + +// Setup basic logging (automatic when importing consumer/publisher) +// OR setup with custom configuration: +goqueue.SetupLoggingWithDefaults() // Pretty console output for development +``` + +--- + +## ๐Ÿงช Testing + +Run the test suite: + +```bash +# Unit tests +make test + +# Integration tests with RabbitMQ +make integration-test + +``` + +--- + +## ๐Ÿ“š Documentation + +### ๐Ÿ“– Component Documentation + +Explore our comprehensive guides for each system component: + +| Component | Description | Documentation | +| --------------------- | ------------------------------------------- | ----------------------------------------------- | +| ๐Ÿ”Œ **Middleware** | Extend functionality with custom logic | [๐Ÿ“– Middleware Guide](docs/MIDDLEWARE.md) | +| ๐Ÿ“จ **Consumer** | Reliable message consumption and processing | [๐Ÿ“– Consumer Guide](docs/CONSUMER.md) | +| ๐Ÿ“ค **Publisher** | High-performance message publishing | [๐Ÿ“– Publisher Guide](docs/PUBLISHER.md) | +| ๐Ÿ”„ **RabbitMQ Retry** | Advanced retry mechanisms for RabbitMQ | [๐Ÿ“– Retry Architecture](docs/RABBITMQ-RETRY.md) | + +### ๐ŸŽฏ Quick Links + +- **[๐Ÿ“š Full Documentation Index](docs/README.md)** - Complete documentation overview +- **[๐Ÿ”ง API Reference](https://pkg.go.dev/github.com/bxcodec/goqueue)** - Go package documentation +- **[๐ŸŽฎ Examples](examples/)** - Working code examples +- **[๐Ÿ› Troubleshooting](docs/README.md#troubleshooting)** - Common issues and solutions + +--- + +## ๐Ÿค Contributing + +We welcome contributions! Here's how to get started: + +### ๐Ÿš€ Quick Contribution Guide + +1. **Fork** the repository +2. **Create** a feature branch (`git checkout -b feature/amazing-feature`) +3. **Commit** your changes (`git commit -m 'Add amazing feature'`) +4. **Push** to the branch (`git push origin feature/amazing-feature`) +5. **Open** a Pull Request + +### ๐Ÿ“‹ Development Setup + +```bash +# Clone your fork +git clone https://github.com/yourusername/goqueue.git +cd goqueue + +# Install dependencies +go mod download + +# Run tests +make test + +# Run linting +make lint + +``` + +### ๐ŸŽฏ Contribution Areas + +- ๐Ÿ”Œ **New Queue Providers** (Google Pub/Sub, SQS+SNS) +- ๐Ÿ› ๏ธ **Middleware Components** (Metrics, Tracing, Auth) +- ๐Ÿ“š **Documentation & Examples** +- ๐Ÿงช **Testing & Benchmarks** +- ๐Ÿ› **Bug Fixes & Improvements** + +--- + +## ๐Ÿ“ž Support & Community + +- ๐Ÿ“– **Documentation**: [pkg.go.dev/github.com/bxcodec/goqueue](https://pkg.go.dev/github.com/bxcodec/goqueue) +- ๐Ÿ› **Issues**: [GitHub Issues](https://github.com/bxcodec/goqueue/issues) +- ๐Ÿ’ฌ **Discussions**: [GitHub Discussions](https://github.com/bxcodec/goqueue/discussions) +- ๐Ÿ“ง **Email**: [iman@tumorang.com](mailto:iman@tumorang.com) + +--- + +## ๐Ÿ“„ License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. + +--- + +## ๐Ÿ™ Acknowledgments + +- Thanks to all [contributors](https://github.com/bxcodec/goqueue/contributors) +- Inspired by the Go community's best practices +- Built with โค๏ธ for the Go ecosystem + +--- + +
+ +[๐Ÿš€ Get Started](#-quick-start) โ€ข [๐Ÿ“– Documentation](https://pkg.go.dev/github.com/bxcodec/goqueue) โ€ข [๐Ÿค Contribute](#-contributing) + +
diff --git a/docs/CONSUMER.md b/docs/CONSUMER.md new file mode 100644 index 0000000..51e4739 --- /dev/null +++ b/docs/CONSUMER.md @@ -0,0 +1,684 @@ +# ๐Ÿ“จ GoQueue Consumer System + +The GoQueue Consumer system provides a robust, scalable way to consume and process messages from various queue platforms. It handles the complexity of message acknowledgment, retries, and error handling while providing a simple, unified interface. + +## ๐Ÿ“– Table of Contents + +- [๐Ÿ“จ GoQueue Consumer System](#-goqueue-consumer-system) + - [๐Ÿ“– Table of Contents](#-table-of-contents) + - [๐ŸŽฏ Overview](#-overview) + - [๐Ÿ—๏ธ Architecture](#๏ธ-architecture) + - [๐Ÿš€ Quick Start](#-quick-start) + - [โš™๏ธ Configuration](#๏ธ-configuration) + - [๐Ÿ“ Message Handling](#-message-handling) + - [๐Ÿ”„ Retry Mechanisms](#-retry-mechanisms) + - [๐Ÿ›ก๏ธ Error Handling](#๏ธ-error-handling) + - [๐Ÿ“Š Monitoring & Observability](#-monitoring--observability) + - [๐Ÿงช Testing](#-testing-consumers) + - [๐Ÿ’ก Best Practices](#-best-practices) + - [๐Ÿ”ง Troubleshooting](#-troubleshooting) + +--- + +## ๐ŸŽฏ Overview + +The Consumer system in GoQueue provides: + +- **๐Ÿ”„ Automatic Retries** with configurable strategies +- **โšก Concurrent Processing** with controllable parallelism +- **๐Ÿ›ก๏ธ Error Handling** with dead letter queue support +- **๐Ÿ“Š Built-in Observability** with logging and metrics hooks +- **๐Ÿ”Œ Middleware Support** for extending functionality +- **๐ŸŽ›๏ธ Flexible Configuration** for different use cases + +## ๐Ÿ—๏ธ Architecture + +``` +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ Queue Platform โ”‚ +โ”‚ (RabbitMQ) โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + โ”‚ + โ–ผ +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ GoQueue โ”‚ +โ”‚ Consumer โ”‚ +โ”‚ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ Middleware โ”‚ โ”‚ +โ”‚ โ”‚ Chain โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ Message โ”‚ โ”‚ +โ”‚ โ”‚ Handler โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ Retry โ”‚ โ”‚ +โ”‚ โ”‚ Logic โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +``` + +--- + +## ๐Ÿš€ Quick Start + +### Basic Consumer Setup + +```go +package main + +import ( + "context" + "log" + + "github.com/bxcodec/goqueue/consumer" + "github.com/bxcodec/goqueue/interfaces" + consumerOpts "github.com/bxcodec/goqueue/options/consumer" + amqp "github.com/rabbitmq/amqp091-go" +) + +func main() { + // Connect to RabbitMQ + conn, err := amqp.Dial("amqp://localhost:5672/") + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + // Create channels + consumerChannel, _ := conn.Channel() + requeueChannel, _ := conn.Channel() + + // Create consumer + consumer := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithQueueName("user-events"), + consumerOpts.WithMaxRetryFailedMessage(3), + consumerOpts.WithBatchMessageSize(10), + consumerOpts.WithRabbitMQConsumerConfig( + consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern( + consumerChannel, + requeueChannel, + "user-exchange", + []string{"user.created", "user.updated"}, + ), + ), + ) + + // Define message handler + handler := func(ctx context.Context, m interfaces.InboundMessage) error { + log.Printf("Processing message: %+v", m.Data) + + // Your business logic here + if err := processMessage(m); err != nil { + // Retry with exponential backoff + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + // Acknowledge successful processing + return m.Ack(ctx) + } + + // Start consuming + metadata := map[string]interface{}{ + "consumer_id": "user-service-01", + "version": "1.0.0", + } + + err = consumer.Consume(context.Background(), + interfaces.InboundMessageHandlerFunc(handler), metadata) + if err != nil { + log.Fatal(err) + } +} +``` + +--- + +## โš™๏ธ Configuration + +### Consumer Options + +```go +consumer := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + + // Basic Configuration + consumerOpts.WithQueueName("my-queue"), + consumerOpts.WithConsumerID("service-01"), + consumerOpts.WithBatchMessageSize(50), + consumerOpts.WithMaxRetryFailedMessage(5), + + // Middleware + consumerOpts.WithMiddlewares( + middleware.LoggingMiddleware(), + middleware.MetricsMiddleware(), + middleware.AuthenticationMiddleware(), + ), + + // Platform-specific configuration + consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{ + ConsumerChannel: channel, + ReQueueChannel: requeueChannel, + QueueDeclareConfig: &consumerOpts.RabbitMQQueueDeclareConfig{ + Durable: true, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, + }, + QueueBindConfig: &consumerOpts.RabbitMQQueueBindConfig{ + RoutingKeys: []string{"user.*", "order.created"}, + ExchangeName: "main-exchange", + NoWait: false, + Args: nil, + }, + }), +) +``` + +### Configuration Options Explained + +| Option | Description | Default | +| ----------------------- | -------------------------------------------- | -------------- | +| `QueueName` | Name of the queue to consume from | Required | +| `ConsumerID` | Unique identifier for this consumer instance | Auto-generated | +| `BatchMessageSize` | Number of messages to prefetch | 1 | +| `MaxRetryFailedMessage` | Maximum retry attempts | 3 | +| `Middlewares` | List of middleware functions | Empty | + +--- + +## ๐Ÿ“ Message Handling + +### Message Structure + +```go +type InboundMessage struct { + Message // Embedded message data + RetryCount int64 // Current retry attempt + Metadata map[string]any // Platform-specific metadata + + // Control functions + Ack func(ctx context.Context) error + Nack func(ctx context.Context) error + MoveToDeadLetterQueue func(ctx context.Context) error + RetryWithDelayFn func(ctx context.Context, delayFn DelayFn) error +} + +type Message struct { + ID string `json:"id"` + Topic string `json:"topic"` + Action string `json:"action"` + Data interface{} `json:"data"` + Headers map[string]interface{} `json:"headers"` + Timestamp time.Time `json:"timestamp"` + ContentType string `json:"contentType"` +} +``` + +### Handler Patterns + +#### 1. Simple Handler + +```go +func simpleHandler(ctx context.Context, m interfaces.InboundMessage) error { + log.Printf("Received: %s - %s", m.Action, m.ID) + + // Process message + return processBusinessLogic(m.Data) +} +``` + +#### 2. Handler with Error Handling + +```go +func errorHandler(ctx context.Context, m interfaces.InboundMessage) error { + defer func() { + if r := recover(); r != nil { + log.Printf("Panic recovered: %v", r) + m.MoveToDeadLetterQueue(ctx) + } + }() + + // Validate message + if err := validateMessage(m); err != nil { + log.Printf("Invalid message: %v", err) + return m.MoveToDeadLetterQueue(ctx) + } + + // Process with retries + if err := processMessage(m); err != nil { + if isRetryableError(err) { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + return m.MoveToDeadLetterQueue(ctx) + } + + return m.Ack(ctx) +} +``` + +#### 3. Route-based Handler + +```go +func routeHandler(ctx context.Context, m interfaces.InboundMessage) error { + switch m.Action { + case "user.created": + return handleUserCreated(ctx, m) + case "user.updated": + return handleUserUpdated(ctx, m) + case "user.deleted": + return handleUserDeleted(ctx, m) + default: + log.Printf("Unknown action: %s", m.Action) + return m.MoveToDeadLetterQueue(ctx) + } +} + +func handleUserCreated(ctx context.Context, m interfaces.InboundMessage) error { + var user User + if err := json.Unmarshal(m.Data.([]byte), &user); err != nil { + return m.MoveToDeadLetterQueue(ctx) + } + + if err := userService.Create(ctx, user); err != nil { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + return m.Ack(ctx) +} +``` + +--- + +## ๐Ÿ”„ Retry Mechanisms + +### Built-in Delay Functions + +```go +// Exponential backoff: 1s, 2s, 4s, 8s, 16s... +return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + +// Linear backoff: 1s, 2s, 3s, 4s, 5s... +return m.RetryWithDelayFn(ctx, interfaces.LinearBackoffDelayFn) + +// Fixed delay: 5s, 5s, 5s, 5s... +return m.RetryWithDelayFn(ctx, func(retryCount int64) int64 { + return 5 // 5 seconds +}) +``` + +### Custom Delay Functions + +```go +// Custom exponential with jitter +func customDelayFn(retryCount int64) int64 { + baseDelay := time.Duration(retryCount) * time.Second + jitter := time.Duration(rand.Int63n(1000)) * time.Millisecond + return int64((baseDelay + jitter) / time.Millisecond) +} + +// Fibonacci backoff +func fibonacciDelayFn(retryCount int64) int64 { + fib := fibonacci(retryCount) + return fib * 1000 // Convert to milliseconds +} + +// Usage +return m.RetryWithDelayFn(ctx, customDelayFn) +``` + +### Conditional Retries + +```go +func smartRetryHandler(ctx context.Context, m interfaces.InboundMessage) error { + err := processMessage(m) + if err == nil { + return m.Ack(ctx) + } + + switch { + case isTemporaryError(err): + // Retry temporary errors + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + case isValidationError(err): + // Don't retry validation errors + log.Printf("Validation error: %v", err) + return m.MoveToDeadLetterQueue(ctx) + case isRateLimitError(err): + // Longer delay for rate limits + return m.RetryWithDelayFn(ctx, func(retryCount int64) int64 { + return 60 * 1000 // 60 seconds + }) + default: + // Unknown error, move to DLQ + return m.MoveToDeadLetterQueue(ctx) + } +} +``` + +--- + +## ๐Ÿ›ก๏ธ Error Handling + +### Error Categories + +```go +type ErrorType int + +const ( + ErrorTypeTemporary ErrorType = iota + ErrorTypePermanent + ErrorTypeValidation + ErrorTypeRateLimit + ErrorTypeAuth +) + +func categorizeError(err error) ErrorType { + switch { + case isNetworkError(err): + return ErrorTypeTemporary + case isValidationError(err): + return ErrorTypeValidation + case isAuthError(err): + return ErrorTypeAuth + case isRateLimitError(err): + return ErrorTypeRateLimit + default: + return ErrorTypePermanent + } +} +``` + +### Error Handler Middleware + +```go +func ErrorHandlingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + defer func() { + if r := recover(); r != nil { + log.Error(). + Interface("panic", r). + Str("message_id", m.ID). + Msg("Panic in message handler") + + // Send panic to monitoring + sendPanicToMonitoring(r, m) + + // Move to DLQ + m.MoveToDeadLetterQueue(ctx) + } + }() + + err := next(ctx, m) + if err != nil { + // Log error with context + log.Error(). + Err(err). + Str("message_id", m.ID). + Str("action", m.Action). + Int64("retry_count", m.RetryCount). + Msg("Message processing failed") + + // Send to error tracking + sendToErrorTracking(err, m) + } + + return err + } + } +} +``` + +--- + +## ๐Ÿ“Š Monitoring & Observability + +### Metrics Collection + +```go +func MetricsMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + start := time.Now() + + // Increment processed counter + messagesProcessed.WithLabelValues(m.Topic, m.Action).Inc() + + err := next(ctx, m) + + // Record duration + duration := time.Since(start).Seconds() + processingDuration.WithLabelValues(m.Topic, m.Action).Observe(duration) + + // Record result + if err != nil { + messagesErrors.WithLabelValues(m.Topic, m.Action).Inc() + } else { + messagesSuccess.WithLabelValues(m.Topic, m.Action).Inc() + } + + return err + } + } +} +``` + +### Health Checks + +```go +type ConsumerHealth struct { + consumer consumer.Consumer + lastMessage time.Time + mu sync.RWMutex +} + +func (h *ConsumerHealth) HealthCheck() error { + h.mu.RLock() + defer h.mu.RUnlock() + + // Check if we've received messages recently + if time.Since(h.lastMessage) > 5*time.Minute { + return errors.New("no messages received in last 5 minutes") + } + + return nil +} + +func (h *ConsumerHealth) UpdateLastMessage() { + h.mu.Lock() + h.lastMessage = time.Now() + h.mu.Unlock() +} +``` + +--- + +## ๐Ÿงช Testing Consumers + +### Unit Testing + +```go +func TestMessageHandler(t *testing.T) { + // Create test message + msg := interfaces.InboundMessage{ + Message: interfaces.Message{ + ID: "test-123", + Topic: "test", + Action: "test.action", + Data: map[string]interface{}{"key": "value"}, + }, + RetryCount: 0, + Ack: func(ctx context.Context) error { + return nil + }, + Nack: func(ctx context.Context) error { + return nil + }, + MoveToDeadLetterQueue: func(ctx context.Context) error { + return nil + }, + RetryWithDelayFn: func(ctx context.Context, delayFn interfaces.DelayFn) error { + return nil + }, + } + + // Test handler + err := myHandler(context.Background(), msg) + assert.NoError(t, err) +} +``` + +### Integration Testing + +```go +func TestConsumerIntegration(t *testing.T) { + // Setup test infrastructure + testContainer := setupRabbitMQContainer(t) + defer testContainer.Cleanup() + + // Create consumer + consumer := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithQueueName("test-queue"), + ) + + // Test message handling + processed := make(chan bool, 1) + handler := func(ctx context.Context, m interfaces.InboundMessage) error { + processed <- true + return m.Ack(ctx) + } + + // Start consumer + go consumer.Consume(context.Background(), + interfaces.InboundMessageHandlerFunc(handler), nil) + + // Publish test message + publishTestMessage(t, "test-queue", testMessage) + + // Wait for processing + select { + case <-processed: + // Success + case <-time.After(5 * time.Second): + t.Fatal("Message not processed within timeout") + } +} +``` + +--- + +## ๐Ÿ’ก Best Practices + +### 1. **Graceful Shutdown** + +```go +func gracefulShutdown(consumer consumer.Consumer) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + <-c + log.Info().Msg("Shutting down consumer...") + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := consumer.Stop(ctx); err != nil { + log.Error().Err(err).Msg("Error stopping consumer") + } +} +``` + +### 2. **Idempotent Processing** + +```go +func idempotentHandler(ctx context.Context, m interfaces.InboundMessage) error { + // Check if message already processed + if isProcessed(m.ID) { + log.Info().Str("message_id", m.ID).Msg("Message already processed") + return m.Ack(ctx) + } + + // Process message + if err := processMessage(m); err != nil { + return err + } + + // Mark as processed + markAsProcessed(m.ID) + + return m.Ack(ctx) +} +``` + +### 3. **Circuit Breaker** + +```go +func circuitBreakerHandler(ctx context.Context, m interfaces.InboundMessage) error { + if circuitBreaker.IsOpen() { + // Circuit is open, reject message + return m.RetryWithDelayFn(ctx, func(retryCount int64) int64 { + return 60 * 1000 // Wait 1 minute + }) + } + + err := processMessage(m) + if err != nil { + circuitBreaker.RecordFailure() + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + circuitBreaker.RecordSuccess() + return m.Ack(ctx) +} +``` + +--- + +## ๐Ÿ”ง Troubleshooting + +### Common Issues + +1. **Messages Not Being Consumed** + + - Check queue name and bindings + - Verify consumer is running + - Check connection status + +2. **High Memory Usage** + + - Reduce batch size + - Implement message pooling + - Check for memory leaks in handlers + +3. **Slow Processing** + + - Profile handler performance + - Check database connections + - Consider parallel processing + +4. **Messages Going to DLQ** + - Check error logs + - Validate message format + - Review retry logic + +### Debug Configuration + +```go +consumer := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithQueueName("debug-queue"), + consumerOpts.WithMiddlewares( + DebugMiddleware(), + LoggingMiddleware(), + ), +) +``` + +--- + +The GoQueue Consumer system provides a powerful foundation for building robust, scalable message processing applications. By following these patterns and best practices, you can build reliable systems that handle failures gracefully and scale with your needs. diff --git a/docs/MIDDLEWARE.md b/docs/MIDDLEWARE.md new file mode 100644 index 0000000..e37de38 --- /dev/null +++ b/docs/MIDDLEWARE.md @@ -0,0 +1,828 @@ +# ๐Ÿ”Œ GoQueue Middleware System + +The GoQueue middleware system provides a powerful and flexible way to extend the functionality of your message processing pipeline. Middleware allows you to inject custom logic before and after message publishing/consuming operations without modifying the core business logic. + +## ๐Ÿ“– Table of Contents + +- [๐Ÿ”Œ GoQueue Middleware System](#-goqueue-middleware-system) + - [๐Ÿ“– Table of Contents](#-table-of-contents) + - [๐ŸŽฏ Overview](#-overview) + - [๐Ÿ—๏ธ How It Works](#๏ธ-how-it-works) + - [๐Ÿ“ Types of Middleware](#-types-of-middleware) + - [๐Ÿš€ Basic Usage](#-basic-usage) + - [๐Ÿ› ๏ธ Creating Custom Middleware](#๏ธ-creating-custom-middleware) + - [๐Ÿ“Š Built-in Middleware Examples](#-built-in-middleware-examples) + - [๐ŸŽจ Advanced Patterns](#-advanced-patterns) + - [๐Ÿ“ˆ Performance Considerations](#-performance-considerations) + - [๐Ÿงช Testing Middleware](#-testing-middleware) + - [๐Ÿ’ก Best Practices](#-best-practices) + - [๐Ÿ”ง Troubleshooting](#-troubleshooting) + +--- + +## ๐ŸŽฏ Overview + +Middleware in GoQueue follows the **decorator pattern**, wrapping your core message handlers with additional functionality. This allows you to: + +- **๐Ÿ” Monitor & Log** message processing +- **๐Ÿ“Š Collect Metrics** and performance data +- **๐Ÿ›ก๏ธ Handle Authentication** and authorization +- **โšก Implement Rate Limiting** +- **๐Ÿ”„ Add Retry Logic** with custom strategies +- **๐Ÿ” Trace Requests** across services +- **๐Ÿงช Test Message Flows** with mock data + +## ๐Ÿ—๏ธ How It Works + +GoQueue middleware uses function composition to create a pipeline of operations: + +``` +Request โ†’ Middleware 1 โ†’ Middleware 2 โ†’ Handler โ†’ Middleware 2 โ†’ Middleware 1 โ†’ Response +``` + +Each middleware can: + +1. **Inspect/modify** the message before processing +2. **Execute custom logic** before calling the next handler +3. **Process the result** after the handler executes +4. **Handle errors** and implement retry logic + +--- + +## ๐Ÿ“ Types of Middleware + +### 1. **Consumer Middleware** (`InboundMessageHandlerMiddlewareFunc`) + +Processes incoming messages before they reach your business logic. + +```go +type InboundMessageHandlerMiddlewareFunc func( + next InboundMessageHandlerFunc, +) InboundMessageHandlerFunc +``` + +### 2. **Publisher Middleware** (`PublisherMiddlewareFunc`) + +Processes outgoing messages before they are sent to the queue. + +```go +type PublisherMiddlewareFunc func( + next PublisherFunc, +) PublisherFunc +``` + +--- + +## ๐Ÿš€ Basic Usage + +### Adding Middleware to Consumer + +```go +package main + +import ( + "github.com/bxcodec/goqueue/consumer" + "github.com/bxcodec/goqueue/middleware" + consumerOpts "github.com/bxcodec/goqueue/options/consumer" +) + +func main() { + consumer := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithQueueName("user-events"), + consumerOpts.WithMiddlewares( + // Middleware executes in order + middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(), + LoggingMiddleware(), + MetricsMiddleware(), + middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(), + ), + ) +} +``` + +### Adding Middleware to Publisher + +```go +package main + +import ( + "github.com/bxcodec/goqueue/publisher" + "github.com/bxcodec/goqueue/middleware" + publisherOpts "github.com/bxcodec/goqueue/options/publisher" +) + +func main() { + pub := publisher.NewPublisher( + publisherOpts.PublisherPlatformRabbitMQ, + publisherOpts.WithMiddlewares( + middleware.HelloWorldMiddlewareExecuteBeforePublisher(), + ValidationMiddleware(), + CompressionMiddleware(), + middleware.HelloWorldMiddlewareExecuteAfterPublisher(), + ), + ) +} +``` + +--- + +## ๐Ÿ› ๏ธ Creating Custom Middleware + +### Consumer Middleware Example + +```go +package main + +import ( + "context" + "time" + + "github.com/rs/zerolog/log" + "github.com/bxcodec/goqueue/interfaces" +) + +// LoggingMiddleware logs message processing details +func LoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + start := time.Now() + + // Log before processing + log.Info(). + Str("message_id", m.ID). + Str("action", m.Action). + Str("topic", m.Topic). + Msg("Processing message") + + // Call the next handler + err := next(ctx, m) + + // Log after processing + duration := time.Since(start) + logEvent := log.Info(). + Str("message_id", m.ID). + Dur("duration", duration) + + if err != nil { + logEvent = log.Error(). + Str("message_id", m.ID). + Dur("duration", duration). + Err(err) + logEvent.Msg("Message processing failed") + } else { + logEvent.Msg("Message processed successfully") + } + + return err + } + } +} + +// MetricsMiddleware collects processing metrics +func MetricsMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + start := time.Now() + + // Increment counter + messagesProcessed.WithLabelValues(m.Topic, m.Action).Inc() + + err := next(ctx, m) + + // Record duration + duration := time.Since(start).Seconds() + processingDuration.WithLabelValues(m.Topic, m.Action).Observe(duration) + + // Record result + if err != nil { + messagesErrors.WithLabelValues(m.Topic, m.Action).Inc() + } else { + messagesSuccess.WithLabelValues(m.Topic, m.Action).Inc() + } + + return err + } + } +} + +// AuthenticationMiddleware validates message authentication +func AuthenticationMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + // Extract auth token from headers + authToken, ok := m.Headers["authorization"].(string) + if !ok { + log.Error().Str("message_id", m.ID).Msg("Missing authorization header") + return errors.New("unauthorized: missing auth token") + } + + // Validate token + user, err := validateToken(authToken) + if err != nil { + log.Error(). + Str("message_id", m.ID). + Err(err). + Msg("Invalid authorization token") + return fmt.Errorf("unauthorized: %w", err) + } + + // Add user to context + ctx = context.WithValue(ctx, "user", user) + + return next(ctx, m) + } + } +} +``` + +### Publisher Middleware Example + +```go +// ValidationMiddleware validates outgoing messages +func ValidationMiddleware() interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, m interfaces.Message) error { + // Validate required fields + if m.Topic == "" { + return errors.New("validation error: topic is required") + } + + if m.Action == "" { + return errors.New("validation error: action is required") + } + + if m.Data == nil { + return errors.New("validation error: data is required") + } + + // Validate data structure based on action + if err := validateMessageData(m.Action, m.Data); err != nil { + return fmt.Errorf("validation error: %w", err) + } + + log.Info(). + Str("topic", m.Topic). + Str("action", m.Action). + Msg("Message validation passed") + + return next(ctx, m) + } + } +} + +// CompressionMiddleware compresses large message payloads +func CompressionMiddleware() interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, m interfaces.Message) error { + // Serialize data to check size + dataBytes, err := json.Marshal(m.Data) + if err != nil { + return fmt.Errorf("compression middleware: %w", err) + } + + // Compress if data is large (> 1KB) + if len(dataBytes) > 1024 { + compressed, err := compressData(dataBytes) + if err != nil { + return fmt.Errorf("compression failed: %w", err) + } + + // Update message with compressed data + m.Data = compressed + if m.Headers == nil { + m.Headers = make(map[string]interface{}) + } + m.Headers["compression"] = "gzip" + + log.Info(). + Int("original_size", len(dataBytes)). + Int("compressed_size", len(compressed)). + Msg("Message compressed") + } + + return next(ctx, m) + } + } +} +``` + +--- + +## ๐Ÿ“Š Built-in Middleware Examples + +### 1. **Error Handling Middleware** + +```go +func ErrorHandlingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + defer func() { + if r := recover(); r != nil { + log.Error(). + Interface("panic", r). + Str("message_id", m.ID). + Msg("Panic recovered in message handler") + + // Send to dead letter queue + m.MoveToDeadLetterQueue(ctx) + } + }() + + err := next(ctx, m) + if err != nil { + // Log error with context + log.Error(). + Err(err). + Str("message_id", m.ID). + Str("action", m.Action). + Interface("data", m.Data). + Msg("Message processing error") + + // Send to monitoring system + sendToErrorTracking(err, m) + } + + return err + } + } +} +``` + +### 2. **Rate Limiting Middleware** + +```go +func RateLimitingMiddleware(limit int, window time.Duration) interfaces.InboundMessageHandlerMiddlewareFunc { + limiter := make(map[string]*time.Timer) + counter := make(map[string]int) + mu := sync.RWMutex{} + + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + key := fmt.Sprintf("%s:%s", m.Topic, m.Action) + + mu.Lock() + count := counter[key] + + if count >= limit { + mu.Unlock() + log.Warn(). + Str("key", key). + Int("limit", limit). + Msg("Rate limit exceeded") + + // Requeue with delay + return m.RetryWithDelayFn(ctx, func(retryCount int64) int64 { + return 30 // 30 second delay + }) + } + + counter[key]++ + + // Reset counter after window + if limiter[key] == nil { + limiter[key] = time.AfterFunc(window, func() { + mu.Lock() + delete(counter, key) + delete(limiter, key) + mu.Unlock() + }) + } + + mu.Unlock() + + return next(ctx, m) + } + } +} +``` + +### 3. **Tracing Middleware** + +```go +func TracingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + // Extract trace context from headers + traceID := extractTraceID(m.Headers) + if traceID == "" { + traceID = generateTraceID() + } + + // Create span + ctx, span := tracer.Start(ctx, "message.process", + trace.WithAttributes( + attribute.String("message.id", m.ID), + attribute.String("message.topic", m.Topic), + attribute.String("message.action", m.Action), + attribute.String("trace.id", traceID), + ), + ) + defer span.End() + + // Add trace context to message + if m.Headers == nil { + m.Headers = make(map[string]interface{}) + } + m.Headers["trace_id"] = traceID + + err := next(ctx, m) + + if err != nil { + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + } else { + span.SetStatus(codes.Ok, "Message processed successfully") + } + + return err + } + } +} +``` + +--- + +## ๐ŸŽจ Advanced Patterns + +### 1. **Conditional Middleware** + +```go +func ConditionalMiddleware(condition func(interfaces.InboundMessage) bool, + middleware interfaces.InboundMessageHandlerMiddlewareFunc) interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + if condition(m) { + // Apply middleware + return middleware(next)(ctx, m) + } + // Skip middleware + return next(ctx, m) + } + } +} + +// Usage +middleware := ConditionalMiddleware( + func(m interfaces.InboundMessage) bool { + return m.Topic == "payments" // Only apply to payment messages + }, + AuthenticationMiddleware(), +) +``` + +### 2. **Circuit Breaker Middleware** + +```go +func CircuitBreakerMiddleware(threshold int, timeout time.Duration) interfaces.InboundMessageHandlerMiddlewareFunc { + var ( + failures int64 + lastFailure time.Time + mu sync.RWMutex + ) + + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + mu.RLock() + currentFailures := failures + lastFail := lastFailure + mu.RUnlock() + + // Check if circuit is open + if currentFailures >= int64(threshold) && time.Since(lastFail) < timeout { + log.Warn(). + Int64("failures", currentFailures). + Dur("timeout", timeout). + Msg("Circuit breaker is open, rejecting message") + + return fmt.Errorf("circuit breaker open: too many failures") + } + + err := next(ctx, m) + + mu.Lock() + if err != nil { + failures++ + lastFailure = time.Now() + } else { + // Reset on success + failures = 0 + } + mu.Unlock() + + return err + } + } +} +``` + +### 3. **Batching Middleware** + +```go +func BatchingMiddleware(batchSize int, flushInterval time.Duration) interfaces.InboundMessageHandlerMiddlewareFunc { + type batchItem struct { + ctx context.Context + msg interfaces.InboundMessage + result chan error + } + + batch := make([]batchItem, 0, batchSize) + mu := sync.Mutex{} + + // Flush timer + ticker := time.NewTicker(flushInterval) + go func() { + for range ticker.C { + mu.Lock() + if len(batch) > 0 { + processBatch(batch) + batch = batch[:0] + } + mu.Unlock() + } + }() + + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + result := make(chan error, 1) + + mu.Lock() + batch = append(batch, batchItem{ctx, m, result}) + + if len(batch) >= batchSize { + processBatch(batch) + batch = batch[:0] + } + mu.Unlock() + + return <-result + } + } +} +``` + +--- + +## ๐Ÿ“ˆ Performance Considerations + +### 1. **Middleware Order Matters** + +- Place **fast, filtering** middleware first (auth, validation) +- Place **expensive operations** last (database calls, external APIs) +- Consider **early returns** to avoid unnecessary processing + +### 2. **Memory Management** + +```go +// โŒ Bad: Creates new logger for each message +func BadLoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + logger := log.With().Str("message_id", m.ID).Logger() // New logger each time + // ... + } + } +} + +// โœ… Good: Reuse logger with context +func GoodLoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + log.Info().Str("message_id", m.ID).Msg("Processing") // Direct usage + // ... + } + } +} +``` + +### 3. **Async Operations** + +```go +func AsyncMetricsMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + metricsChan := make(chan MetricEvent, 1000) + + // Background worker + go func() { + for metric := range metricsChan { + sendToMetricsSystem(metric) + } + }() + + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + start := time.Now() + err := next(ctx, m) + + // Non-blocking metrics send + select { + case metricsChan <- MetricEvent{ + Duration: time.Since(start), + Topic: m.Topic, + Action: m.Action, + Error: err, + }: + default: + // Drop metric if channel is full + } + + return err + } + } +} +``` + +--- + +## ๐Ÿงช Testing Middleware + +### Unit Testing + +```go +func TestLoggingMiddleware(t *testing.T) { + // Create test message + msg := interfaces.InboundMessage{ + Message: interfaces.Message{ + ID: "test-123", + Topic: "test-topic", + Action: "test.action", + Data: map[string]interface{}{"key": "value"}, + }, + } + + // Mock handler + called := false + handler := func(ctx context.Context, m interfaces.InboundMessage) error { + called = true + return nil + } + + // Apply middleware + middleware := LoggingMiddleware() + wrappedHandler := middleware(handler) + + // Execute + err := wrappedHandler(context.Background(), msg) + + // Assertions + assert.NoError(t, err) + assert.True(t, called) +} +``` + +### Integration Testing + +```go +func TestMiddlewareChain(t *testing.T) { + // Create consumer with middleware + consumer := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithMiddlewares( + LoggingMiddleware(), + MetricsMiddleware(), + AuthenticationMiddleware(), + ), + ) + + // Test message processing + // ... integration test logic +} +``` + +--- + +## ๐Ÿ’ก Best Practices + +### 1. **Keep Middleware Focused** + +Each middleware should have a single responsibility: + +```go +// โœ… Good: Single responsibility +func LoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { ... } +func MetricsMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { ... } + +// โŒ Bad: Multiple responsibilities +func LoggingAndMetricsMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { ... } +``` + +### 2. **Handle Errors Gracefully** + +```go +func ResilientMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + // Always call next, even if middleware logic fails + defer func() { + if r := recover(); r != nil { + log.Error().Interface("panic", r).Msg("Middleware panic recovered") + } + }() + + // Middleware logic here... + // Don't block the main flow on auxiliary operations + + return next(ctx, m) + } + } +} +``` + +### 3. **Use Context for Request-Scoped Data** + +```go +func ContextEnrichmentMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + // Add request ID to context + requestID := generateRequestID() + ctx = context.WithValue(ctx, "request_id", requestID) + + // Add to message headers for downstream services + if m.Headers == nil { + m.Headers = make(map[string]interface{}) + } + m.Headers["request_id"] = requestID + + return next(ctx, m) + } + } +} +``` + +### 4. **Configuration via Options** + +```go +type LoggingConfig struct { + Level string + Fields []string + Structured bool +} + +func LoggingMiddleware(config LoggingConfig) interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + // Use config for customizable behavior + // ... + } + } +} +``` + +--- + +## ๐Ÿ”ง Troubleshooting + +### Common Issues + +1. **Middleware Not Executing** + + - Check middleware order + - Ensure middleware returns a function + - Verify middleware is properly registered + +2. **Performance Degradation** + + - Profile middleware execution time + - Check for blocking operations + - Consider async processing for heavy operations + +3. **Context Cancellation** + + - Always respect context cancellation + - Use `ctx.Done()` in long-running operations + +4. **Memory Leaks** + - Avoid storing message references + - Clean up resources in defer statements + - Use object pools for frequently created objects + +### Debug Middleware + +```go +func DebugMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + log.Debug(). + Str("middleware", "debug"). + Str("message_id", m.ID). + Interface("headers", m.Headers). + Interface("data", m.Data). + Msg("Message received in debug middleware") + + err := next(ctx, m) + + log.Debug(). + Str("middleware", "debug"). + Str("message_id", m.ID). + Err(err). + Msg("Message processed in debug middleware") + + return err + } + } +} +``` + +--- + +The middleware system in GoQueue provides powerful extensibility while maintaining clean separation of concerns. By following these patterns and best practices, you can build robust, maintainable message processing pipelines that scale with your application needs. diff --git a/docs/PUBLISHER.md b/docs/PUBLISHER.md new file mode 100644 index 0000000..e1f27c2 --- /dev/null +++ b/docs/PUBLISHER.md @@ -0,0 +1,1224 @@ +# ๐Ÿ“ค GoQueue Publisher System + +The GoQueue Publisher system provides a robust, high-performance way to publish messages to various queue platforms. It handles connection management, message serialization, error handling, and provides extensibility through middleware. + +## ๐Ÿ“– Table of Contents + +- [๐Ÿ“ค GoQueue Publisher System](#-goqueue-publisher-system) + - [๐Ÿ“– Table of Contents](#-table-of-contents) + - [๐ŸŽฏ Overview](#-overview) + - [๐Ÿ—๏ธ Architecture](#๏ธ-architecture) + - [๐Ÿš€ Quick Start](#-quick-start) + - [โš™๏ธ Configuration](#๏ธ-configuration) + - [๐Ÿ“ Message Publishing](#-message-publishing) + - [๐Ÿ”Œ Middleware System](#-middleware-system) + - [๐Ÿ“Š Connection Management](#-connection-management) + - [๐Ÿ›ก๏ธ Error Handling](#๏ธ-error-handling) + - [๐Ÿ“ˆ Performance Optimization](#-performance-optimization) + - [๐ŸŽจ Advanced Usage](#-advanced-usage) + - [๐Ÿงช Testing Publishers](#-testing-publishers) + - [๐Ÿ“Š Monitoring & Observability](#-monitoring--observability) + - [๐Ÿ’ก Best Practices](#-best-practices) + - [๐Ÿ”ง Troubleshooting](#-troubleshooting) + +--- + +## ๐ŸŽฏ Overview + +The Publisher system in GoQueue provides: + +- **๐Ÿš€ High Performance** with connection pooling and batching +- **๐Ÿ”Œ Middleware Support** for extending functionality +- **๐Ÿ›ก๏ธ Error Handling** with retry and circuit breaker patterns +- **๐Ÿ“Š Built-in Observability** with logging and metrics hooks +- **โšก Async Publishing** with optional delivery confirmations +- **๐ŸŽ›๏ธ Flexible Configuration** for different use cases + +## ๐Ÿ—๏ธ Architecture + +``` +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ Application โ”‚ +โ”‚ Code โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + โ”‚ + โ–ผ +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ GoQueue โ”‚ +โ”‚ Publisher โ”‚ +โ”‚ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ Middleware โ”‚ โ”‚ +โ”‚ โ”‚ Chain โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ Serializer โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ Connection โ”‚ โ”‚ +โ”‚ โ”‚ Pool โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + โ”‚ + โ–ผ +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ Queue Platform โ”‚ +โ”‚ (RabbitMQ) โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +``` + +--- + +## ๐Ÿš€ Quick Start + +### Basic Publisher Setup + +```go +package main + +import ( + "context" + "log" + + "github.com/bxcodec/goqueue/publisher" + "github.com/bxcodec/goqueue/interfaces" + publisherOpts "github.com/bxcodec/goqueue/options/publisher" + amqp "github.com/rabbitmq/amqp091-go" +) + +func main() { + // Connect to RabbitMQ + conn, err := amqp.Dial("amqp://localhost:5672/") + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + // Create publisher + pub := publisher.NewPublisher( + publisherOpts.PublisherPlatformRabbitMQ, + publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ + Conn: conn, + PublisherChannelPoolSize: 10, + }), + publisherOpts.WithPublisherID("user-service"), + ) + defer pub.Close(context.Background()) + + // Publish a message + message := interfaces.Message{ + ID: "msg-123", + Topic: "users", + Action: "user.created", + Data: map[string]interface{}{ + "user_id": 12345, + "email": "user@example.com", + "name": "John Doe", + }, + Headers: map[string]interface{}{ + "source": "user-service", + "correlation": "req-456", + }, + } + + err = pub.Publish(context.Background(), message) + if err != nil { + log.Printf("Failed to publish message: %v", err) + } else { + log.Printf("Message published successfully: %s", message.ID) + } +} +``` + +--- + +## โš™๏ธ Configuration + +### Publisher Options + +```go +pub := publisher.NewPublisher( + publisherOpts.PublisherPlatformRabbitMQ, + + // Basic Configuration + publisherOpts.WithPublisherID("service-01"), + + // Middleware + publisherOpts.WithMiddlewares( + middleware.ValidationMiddleware(), + middleware.CompressionMiddleware(), + middleware.MetricsMiddleware(), + middleware.LoggingMiddleware(), + ), + + // Platform-specific configuration + publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ + Conn: connection, + PublisherChannelPoolSize: 20, + ExchangeName: "main-exchange", + Mandatory: false, + Immediate: false, + DefaultHeaders: map[string]interface{}{ + "version": "1.0", + "service": "my-service", + }, + }), +) +``` + +### Configuration Options Explained + +| Option | Description | Default | +| -------------------------- | --------------------------------------------- | -------------- | +| `PublisherID` | Unique identifier for this publisher instance | Auto-generated | +| `PublisherChannelPoolSize` | Number of channels in the connection pool | 5 | +| `ExchangeName` | Default exchange for publishing | "" | +| `Mandatory` | Return unroutable messages | false | +| `Immediate` | Return undeliverable messages | false | +| `DefaultHeaders` | Headers added to all messages | Empty | + +--- + +## ๐Ÿ“ Message Publishing + +### Message Structure + +```go +type Message struct { + ID string `json:"id"` // Unique message identifier + Topic string `json:"topic"` // Message topic/exchange + Action string `json:"action"` // Action type/routing key + Data interface{} `json:"data"` // Message payload + Headers map[string]interface{} `json:"headers"` // Additional metadata + Timestamp time.Time `json:"timestamp"` // Message timestamp + ContentType string `json:"contentType"` // Content type (JSON, etc.) +} +``` + +### Publishing Patterns + +#### 1. Simple Publishing + +```go +func publishUserEvent(pub publisher.Publisher, userID int, action string) error { + message := interfaces.Message{ + ID: generateMessageID(), + Topic: "users", + Action: action, + Data: map[string]interface{}{ + "user_id": userID, + "timestamp": time.Now(), + }, + } + + return pub.Publish(context.Background(), message) +} +``` + +#### 2. Batch Publishing + +```go +func publishBatch(pub publisher.Publisher, messages []interfaces.Message) error { + ctx := context.Background() + + for _, msg := range messages { + if err := pub.Publish(ctx, msg); err != nil { + return fmt.Errorf("failed to publish message %s: %w", msg.ID, err) + } + } + + return nil +} +``` + +#### 3. Publishing with Context + +```go +func publishWithTimeout(pub publisher.Publisher, msg interfaces.Message) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return pub.Publish(ctx, msg) +} +``` + +#### 4. Publishing with Correlation + +```go +func publishWithCorrelation(pub publisher.Publisher, correlationID string, data interface{}) error { + message := interfaces.Message{ + ID: generateMessageID(), + Topic: "events", + Action: "data.processed", + Data: data, + Headers: map[string]interface{}{ + "correlation_id": correlationID, + "reply_to": "response-queue", + }, + } + + return pub.Publish(context.Background(), message) +} +``` + +### Message Builders + +```go +type MessageBuilder struct { + message interfaces.Message +} + +func NewMessageBuilder() *MessageBuilder { + return &MessageBuilder{ + message: interfaces.Message{ + ID: generateMessageID(), + Timestamp: time.Now(), + Headers: make(map[string]interface{}), + }, + } +} + +func (b *MessageBuilder) Topic(topic string) *MessageBuilder { + b.message.Topic = topic + return b +} + +func (b *MessageBuilder) Action(action string) *MessageBuilder { + b.message.Action = action + return b +} + +func (b *MessageBuilder) Data(data interface{}) *MessageBuilder { + b.message.Data = data + return b +} + +func (b *MessageBuilder) Header(key string, value interface{}) *MessageBuilder { + b.message.Headers[key] = value + return b +} + +func (b *MessageBuilder) Build() interfaces.Message { + return b.message +} + +// Usage +message := NewMessageBuilder(). + Topic("users"). + Action("user.created"). + Data(userData). + Header("source", "user-service"). + Build() +``` + +--- + +## ๐Ÿ”Œ Middleware System + +### Built-in Middleware + +#### 1. Validation Middleware + +```go +func ValidationMiddleware() interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, m interfaces.Message) error { + // Validate required fields + if m.Topic == "" { + return errors.New("validation error: topic is required") + } + + if m.Action == "" { + return errors.New("validation error: action is required") + } + + if m.Data == nil { + return errors.New("validation error: data is required") + } + + // Validate data structure + if err := validateMessageData(m.Action, m.Data); err != nil { + return fmt.Errorf("validation error: %w", err) + } + + return next(ctx, m) + } + } +} +``` + +#### 2. Compression Middleware + +```go +func CompressionMiddleware(threshold int) interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, m interfaces.Message) error { + // Serialize data to check size + dataBytes, err := json.Marshal(m.Data) + if err != nil { + return fmt.Errorf("compression middleware: %w", err) + } + + // Compress if data is large + if len(dataBytes) > threshold { + compressed, err := compressData(dataBytes) + if err != nil { + return fmt.Errorf("compression failed: %w", err) + } + + // Update message + m.Data = base64.StdEncoding.EncodeToString(compressed) + if m.Headers == nil { + m.Headers = make(map[string]interface{}) + } + m.Headers["encoding"] = "gzip+base64" + m.Headers["original_size"] = len(dataBytes) + + log.Info(). + Int("original_size", len(dataBytes)). + Int("compressed_size", len(compressed)). + Msg("Message compressed") + } + + return next(ctx, m) + } + } +} +``` + +#### 3. Metrics Middleware + +```go +func MetricsMiddleware() interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, m interfaces.Message) error { + start := time.Now() + + // Increment counter + messagesPublished.WithLabelValues(m.Topic, m.Action).Inc() + + err := next(ctx, m) + + // Record duration + duration := time.Since(start).Seconds() + publishDuration.WithLabelValues(m.Topic, m.Action).Observe(duration) + + // Record result + if err != nil { + publishErrors.WithLabelValues(m.Topic, m.Action).Inc() + } else { + publishSuccess.WithLabelValues(m.Topic, m.Action).Inc() + } + + return err + } + } +} +``` + +#### 4. Encryption Middleware + +```go +func EncryptionMiddleware(key []byte) interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, m interfaces.Message) error { + // Encrypt sensitive data + if shouldEncrypt(m) { + encrypted, err := encryptData(m.Data, key) + if err != nil { + return fmt.Errorf("encryption failed: %w", err) + } + + m.Data = encrypted + if m.Headers == nil { + m.Headers = make(map[string]interface{}) + } + m.Headers["encrypted"] = true + m.Headers["algorithm"] = "AES-256-GCM" + } + + return next(ctx, m) + } + } +} +``` + +--- + +## ๐Ÿ“Š Connection Management + +### Connection Pooling + +```go +type ConnectionPool struct { + channels chan *amqp.Channel + conn *amqp.Connection + size int + mu sync.RWMutex +} + +func NewConnectionPool(conn *amqp.Connection, size int) *ConnectionPool { + pool := &ConnectionPool{ + channels: make(chan *amqp.Channel, size), + conn: conn, + size: size, + } + + // Pre-create channels + for i := 0; i < size; i++ { + ch, err := conn.Channel() + if err != nil { + log.Error().Err(err).Msg("Failed to create channel") + continue + } + pool.channels <- ch + } + + return pool +} + +func (p *ConnectionPool) GetChannel() (*amqp.Channel, error) { + select { + case ch := <-p.channels: + return ch, nil + case <-time.After(1 * time.Second): + return nil, errors.New("timeout getting channel from pool") + } +} + +func (p *ConnectionPool) ReturnChannel(ch *amqp.Channel) { + select { + case p.channels <- ch: + default: + // Pool is full, close the channel + ch.Close() + } +} +``` + +### Connection Health Monitoring + +```go +type HealthMonitor struct { + conn *amqp.Connection + callback func(bool) + mu sync.RWMutex + healthy bool +} + +func NewHealthMonitor(conn *amqp.Connection, callback func(bool)) *HealthMonitor { + monitor := &HealthMonitor{ + conn: conn, + callback: callback, + healthy: true, + } + + go monitor.monitor() + return monitor +} + +func (h *HealthMonitor) monitor() { + for { + select { + case <-h.conn.NotifyClose(make(chan *amqp.Error)): + h.setHealthy(false) + h.callback(false) + case <-time.After(30 * time.Second): + // Periodic health check + if h.conn.IsClosed() { + h.setHealthy(false) + h.callback(false) + } else if !h.IsHealthy() { + h.setHealthy(true) + h.callback(true) + } + } + } +} + +func (h *HealthMonitor) IsHealthy() bool { + h.mu.RLock() + defer h.mu.RUnlock() + return h.healthy +} + +func (h *HealthMonitor) setHealthy(healthy bool) { + h.mu.Lock() + h.healthy = healthy + h.mu.Unlock() +} +``` + +--- + +## ๐Ÿ›ก๏ธ Error Handling + +### Retry Strategies + +```go +type RetryConfig struct { + MaxAttempts int + InitialDelay time.Duration + MaxDelay time.Duration + Multiplier float64 +} + +func RetryMiddleware(config RetryConfig) interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, m interfaces.Message) error { + var err error + delay := config.InitialDelay + + for attempt := 0; attempt < config.MaxAttempts; attempt++ { + err = next(ctx, m) + if err == nil { + return nil + } + + // Check if error is retryable + if !isRetryableError(err) { + return err + } + + // Don't retry on last attempt + if attempt == config.MaxAttempts-1 { + break + } + + // Wait before retry + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + + // Exponential backoff + delay = time.Duration(float64(delay) * config.Multiplier) + if delay > config.MaxDelay { + delay = config.MaxDelay + } + + log.Warn(). + Err(err). + Int("attempt", attempt+1). + Dur("delay", delay). + Str("message_id", m.ID). + Msg("Retrying message publish") + } + + return fmt.Errorf("failed to publish after %d attempts: %w", + config.MaxAttempts, err) + } + } +} +``` + +### Circuit Breaker + +```go +type CircuitBreaker struct { + threshold int + timeout time.Duration + failures int64 + lastFailure time.Time + state string // "closed", "open", "half-open" + mu sync.RWMutex +} + +func CircuitBreakerMiddleware(threshold int, timeout time.Duration) interfaces.PublisherMiddlewareFunc { + cb := &CircuitBreaker{ + threshold: threshold, + timeout: timeout, + state: "closed", + } + + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, m interfaces.Message) error { + if !cb.AllowRequest() { + return errors.New("circuit breaker is open") + } + + err := next(ctx, m) + + if err != nil { + cb.RecordFailure() + } else { + cb.RecordSuccess() + } + + return err + } + } +} + +func (cb *CircuitBreaker) AllowRequest() bool { + cb.mu.RLock() + defer cb.mu.RUnlock() + + switch cb.state { + case "closed": + return true + case "open": + return time.Since(cb.lastFailure) > cb.timeout + case "half-open": + return true + default: + return false + } +} + +func (cb *CircuitBreaker) RecordFailure() { + cb.mu.Lock() + defer cb.mu.Unlock() + + cb.failures++ + cb.lastFailure = time.Now() + + if cb.failures >= int64(cb.threshold) { + cb.state = "open" + log.Warn().Msg("Circuit breaker opened") + } +} + +func (cb *CircuitBreaker) RecordSuccess() { + cb.mu.Lock() + defer cb.mu.Unlock() + + cb.failures = 0 + cb.state = "closed" +} +``` + +--- + +## ๐Ÿ“ˆ Performance Optimization + +### Async Publishing + +```go +type AsyncPublisher struct { + publisher publisher.Publisher + queue chan PublishRequest + workers int +} + +type PublishRequest struct { + Message interfaces.Message + Context context.Context + Response chan error +} + +func NewAsyncPublisher(pub publisher.Publisher, workers int) *AsyncPublisher { + ap := &AsyncPublisher{ + publisher: pub, + queue: make(chan PublishRequest, 1000), + workers: workers, + } + + // Start workers + for i := 0; i < workers; i++ { + go ap.worker() + } + + return ap +} + +func (ap *AsyncPublisher) worker() { + for req := range ap.queue { + err := ap.publisher.Publish(req.Context, req.Message) + req.Response <- err + close(req.Response) + } +} + +func (ap *AsyncPublisher) PublishAsync(ctx context.Context, msg interfaces.Message) <-chan error { + response := make(chan error, 1) + + req := PublishRequest{ + Message: msg, + Context: ctx, + Response: response, + } + + select { + case ap.queue <- req: + return response + case <-ctx.Done(): + response <- ctx.Err() + close(response) + return response + } +} + +// Usage +errChan := asyncPublisher.PublishAsync(ctx, message) +go func() { + if err := <-errChan; err != nil { + log.Error().Err(err).Msg("Async publish failed") + } +}() +``` + +### Message Batching + +```go +type BatchPublisher struct { + publisher publisher.Publisher + batchSize int + flushTime time.Duration + batch []interfaces.Message + mu sync.Mutex + lastFlush time.Time +} + +func NewBatchPublisher(pub publisher.Publisher, batchSize int, flushTime time.Duration) *BatchPublisher { + bp := &BatchPublisher{ + publisher: pub, + batchSize: batchSize, + flushTime: flushTime, + batch: make([]interfaces.Message, 0, batchSize), + lastFlush: time.Now(), + } + + // Auto-flush timer + go bp.autoFlush() + + return bp +} + +func (bp *BatchPublisher) Publish(ctx context.Context, msg interfaces.Message) error { + bp.mu.Lock() + defer bp.mu.Unlock() + + bp.batch = append(bp.batch, msg) + + if len(bp.batch) >= bp.batchSize { + return bp.flush(ctx) + } + + return nil +} + +func (bp *BatchPublisher) flush(ctx context.Context) error { + if len(bp.batch) == 0 { + return nil + } + + batch := make([]interfaces.Message, len(bp.batch)) + copy(batch, bp.batch) + bp.batch = bp.batch[:0] + bp.lastFlush = time.Now() + + // Publish batch + for _, msg := range batch { + if err := bp.publisher.Publish(ctx, msg); err != nil { + return err + } + } + + return nil +} + +func (bp *BatchPublisher) autoFlush() { + ticker := time.NewTicker(bp.flushTime) + defer ticker.Stop() + + for range ticker.C { + bp.mu.Lock() + if time.Since(bp.lastFlush) >= bp.flushTime && len(bp.batch) > 0 { + bp.flush(context.Background()) + } + bp.mu.Unlock() + } +} +``` + +--- + +## ๐ŸŽจ Advanced Usage + +### Message Routing + +```go +type Router struct { + routes map[string]publisher.Publisher + fallback publisher.Publisher +} + +func NewRouter(fallback publisher.Publisher) *Router { + return &Router{ + routes: make(map[string]publisher.Publisher), + fallback: fallback, + } +} + +func (r *Router) AddRoute(pattern string, pub publisher.Publisher) { + r.routes[pattern] = pub +} + +func (r *Router) Publish(ctx context.Context, msg interfaces.Message) error { + // Find matching route + for pattern, pub := range r.routes { + if matched, _ := filepath.Match(pattern, msg.Topic); matched { + return pub.Publish(ctx, msg) + } + } + + // Use fallback + return r.fallback.Publish(ctx, msg) +} + +// Usage +router := NewRouter(defaultPublisher) +router.AddRoute("user.*", userPublisher) +router.AddRoute("order.*", orderPublisher) +router.AddRoute("payment.*", paymentPublisher) +``` + +### Priority Publishing + +```go +type PriorityPublisher struct { + high publisher.Publisher + normal publisher.Publisher + low publisher.Publisher +} + +func (pp *PriorityPublisher) Publish(ctx context.Context, msg interfaces.Message) error { + priority := getPriority(msg) + + switch priority { + case "high": + return pp.high.Publish(ctx, msg) + case "low": + return pp.low.Publish(ctx, msg) + default: + return pp.normal.Publish(ctx, msg) + } +} + +func getPriority(msg interfaces.Message) string { + if priority, ok := msg.Headers["priority"].(string); ok { + return priority + } + + // Default priority based on action + switch { + case strings.HasPrefix(msg.Action, "alert."): + return "high" + case strings.HasPrefix(msg.Action, "analytics."): + return "low" + default: + return "normal" + } +} +``` + +--- + +## ๐Ÿงช Testing Publishers + +### Unit Testing + +```go +func TestPublisher(t *testing.T) { + // Create mock publisher + mockPub := &mockPublisher{} + + // Test message + msg := interfaces.Message{ + ID: "test-123", + Topic: "test", + Action: "test.action", + Data: map[string]interface{}{"key": "value"}, + } + + // Test publishing + err := mockPub.Publish(context.Background(), msg) + assert.NoError(t, err) + + // Verify mock was called + mockPub.AssertExpectations(t) +} + +type mockPublisher struct { + mock.Mock +} + +func (m *mockPublisher) Publish(ctx context.Context, msg interfaces.Message) error { + args := m.Called(ctx, msg) + return args.Error(0) +} + +func (m *mockPublisher) Close(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} +``` + +### Integration Testing + +```go +func TestPublisherIntegration(t *testing.T) { + // Setup test infrastructure + testContainer := setupRabbitMQContainer(t) + defer testContainer.Cleanup() + + // Create publisher + pub := publisher.NewPublisher( + publisherOpts.PublisherPlatformRabbitMQ, + publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ + Conn: testContainer.Connection, + }), + ) + defer pub.Close(context.Background()) + + // Test message + msg := interfaces.Message{ + ID: "integration-test-123", + Topic: "test-topic", + Action: "test.action", + Data: map[string]interface{}{"test": true}, + } + + // Publish message + err := pub.Publish(context.Background(), msg) + assert.NoError(t, err) + + // Verify message was published + verifyMessagePublished(t, testContainer, msg) +} +``` + +--- + +## ๐Ÿ“Š Monitoring & Observability + +### Metrics Collection + +```go +var ( + publishedMessages = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "goqueue_messages_published_total", + Help: "Total number of messages published", + }, + []string{"topic", "action", "status"}, + ) + + publishDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "goqueue_publish_duration_seconds", + Help: "Time spent publishing messages", + }, + []string{"topic", "action"}, + ) +) + +func init() { + prometheus.MustRegister(publishedMessages) + prometheus.MustRegister(publishDuration) +} +``` + +### Health Checks + +```go +type PublisherHealth struct { + publisher publisher.Publisher + lastPublish time.Time + mu sync.RWMutex +} + +func (h *PublisherHealth) HealthCheck() error { + // Test publish + testMsg := interfaces.Message{ + ID: "health-check", + Topic: "health", + Action: "ping", + Data: map[string]interface{}{"timestamp": time.Now()}, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return h.publisher.Publish(ctx, testMsg) +} +``` + +--- + +## ๐Ÿ’ก Best Practices + +### 1. **Message Design** + +```go +// โœ… Good: Clear, structured message +message := interfaces.Message{ + ID: generateUUIDv4(), + Topic: "user-events", + Action: "user.profile.updated", + Data: UserProfileUpdatedEvent{ + UserID: 12345, + Changes: []string{"email", "name"}, + UpdatedBy: "admin", + Timestamp: time.Now(), + }, + Headers: map[string]interface{}{ + "version": "1.0", + "source": "user-service", + "correlation": request.ID, + "content-type": "application/json", + }, +} + +// โŒ Bad: Unclear, unstructured message +message := interfaces.Message{ + Topic: "events", + Action: "update", + Data: "user123|email@example.com|John Doe", +} +``` + +### 2. **Error Handling** + +```go +func publishWithRetry(pub publisher.Publisher, msg interfaces.Message) error { + const maxRetries = 3 + var err error + + for i := 0; i < maxRetries; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err = pub.Publish(ctx, msg) + cancel() + + if err == nil { + return nil + } + + // Check if error is retryable + if !isRetryableError(err) { + return err + } + + // Exponential backoff + time.Sleep(time.Duration(1<Consumer"] --> B["Main Queue"] + B --> C["Message
Processing"] + C --> D{Success?} + D -->|Yes| E[ACK] + D -->|No| F{Retry?} + F -->|Yes| G["Retry
Exchange"] + F -->|No| H[DLQ] + F -->|Max Retry| H + G --> I["Retry Queue N
(TTL: delay)"] + I -->|TTL Expires| J["Retry Dead
Letter Exchange"] + J --> B + H -.->|Max Retries Exceeded| K[Final Failure] +``` + +--- + +## ๐Ÿ“Š Retry Flow Diagram + +Here's the detailed flow of how messages are processed with retry logic: + +```mermaid +graph TD + A[Message Received] --> B[Process Message] + B --> C{Processing Successful?} + C -->|Yes| D[Acknowledge Message] + C -->|No| E{Retry Count < Max?} + E -->|Yes| F[Calculate Delay] + F --> G[Publish to Retry Exchange] + G --> H[Route to Retry Queue N] + H --> I[Wait for TTL Expiry] + I --> J[Route back to Main Queue] + J --> A + E -->|No| K[Move to Dead Letter Queue] + D --> L[End] + K --> L +``` + +--- + +## โš™๏ธ How It Works + +![Retry](../misc/images/rabbitmq-retry.png) + +TLDR; + +GoQueue will spawn retry queues equal to `MaxRetryFailedMessage` (default: 3) with different TTL configurations for delays. By default, it uses linear delay (1s, 2s, 3s, etc.) but can be configured with exponential backoff (1s, 2s, 4s, 8s) or custom delay functions. Each retry queue is named `{queueName}__retry.{attemptNumber}` and has TTL based on the calculated delay. + +### 1. Initial Message Processing + +```go +func (r *rabbitMQ) Consume(ctx context.Context, h interfaces.InboundMessageHandler, meta map[string]interface{}) error { + for { + select { + case <-ctx.Done(): + return nil + case receivedMsg := <-r.msgReceiver: + // Process message + msg, err := buildMessage(meta, receivedMsg) + if err != nil { + // Handle invalid message format + receivedMsg.Nack(false, false) + continue + } + + // Check retry count + retryCount := extractHeaderInt(receivedMsg.Headers, headerKey.RetryCount) + if retryCount > r.option.MaxRetryFailedMessage { + // Move to dead letter queue + receivedMsg.Nack(false, false) + continue + } + + // Create inbound message with retry capabilities + inboundMsg := interfaces.InboundMessage{ + Message: msg, + RetryCount: retryCount, + RetryWithDelayFn: r.requeueMessageWithDLQ(meta, msg, receivedMsg), + } + + // Process with handler + err = h.HandleMessage(ctx, inboundMsg) + if err != nil { + log.Error().Err(err).Msg("Message processing failed") + } + } + } +} +``` + +### 2. Retry Mechanism + +```go +func (r *rabbitMQ) requeueMessageWithDLQ(consumerMeta map[string]interface{}, msg interfaces.Message, receivedMsg amqp.Delivery) func(ctx context.Context, delayFn interfaces.DelayFn) error { + return func(ctx context.Context, delayFn interfaces.DelayFn) error { + if delayFn == nil { + delayFn = interfaces.DefaultDelayFn + } + + // Increment retry count + retries := extractHeaderInt(receivedMsg.Headers, headerKey.RetryCount) + retries++ + + // Calculate delay + delayInSeconds := delayFn(retries) + + // Determine retry queue + routingKey := getRetryRoutingKey(r.option.QueueName, retries) + + // Prepare headers + headers := receivedMsg.Headers + headers[headerKey.OriginalTopicName] = msg.Topic + headers[headerKey.OriginalActionName] = msg.Action + headers[headerKey.RetryCount] = retries + + // Publish to retry exchange + err := r.requeueChannel.PublishWithContext( + ctx, + r.retryExchangeName, // Retry exchange + routingKey, // Retry queue routing key + false, + false, + amqp.Publishing{ + Headers: headers, + ContentType: receivedMsg.ContentType, + Body: receivedMsg.Body, + Timestamp: time.Now(), + AppId: r.tagName, + Expiration: fmt.Sprintf("%d", delayInSeconds*millisecondsMultiplier), + }, + ) + + if err != nil { + // Fallback: move to DLQ + return receivedMsg.Nack(false, false) + } + + // Acknowledge original message + return receivedMsg.Ack(false) + } +} +``` + +### 3. Retry Queue Setup + +```go +func (r *rabbitMQ) initRetryModule() { + // 1. Declare retry exchange + err := r.consumerChannel.ExchangeDeclare( + r.retryExchangeName, // exchange name + "topic", // exchange type + true, // durable + false, // auto-delete + false, // internal + false, // no-wait + nil, // arguments + ) + + // 2. Declare dead letter exchange + err = r.consumerChannel.ExchangeDeclare( + r.retryDeadLetterExchangeName, // exchange name + "fanout", // exchange type (fanout for DLX) + true, // durable + false, // auto-delete + false, // internal + false, // no-wait + nil, // arguments + ) + + // 3. Bind main queue to dead letter exchange + err = r.consumerChannel.QueueBind( + r.option.QueueName, // queue name + "", // routing key (empty for fanout) + r.retryDeadLetterExchangeName, // exchange name + false, // no-wait + nil, // arguments + ) + + // 4. Create retry queues for each retry level + for i := int64(1); i <= r.option.MaxRetryFailedMessage; i++ { + retryQueueName := getRetryRoutingKey(r.option.QueueName, i) + + // Declare retry queue with TTL and DLX + _, err = r.consumerChannel.QueueDeclare( + retryQueueName, // queue name + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + amqp.Table{ + "x-dead-letter-exchange": r.retryDeadLetterExchangeName, + }, + ) + + // Bind retry queue to retry exchange + err = r.consumerChannel.QueueBind( + retryQueueName, // queue name + retryQueueName, // routing key (same as queue name) + r.retryExchangeName, // exchange name + false, // no-wait + nil, // arguments + ) + } +} +``` + +--- + +## ๐Ÿ”ง Configuration + +### Basic Configuration + +```go +consumer := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithQueueName("user-events"), + consumerOpts.WithMaxRetryFailedMessage(5), // Maximum 5 retry attempts + consumerOpts.WithRabbitMQConsumerConfig( + consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern( + consumerChannel, + requeueChannel, // Separate channel for retry operations + "user-exchange", + []string{"user.created", "user.updated"}, + ), + ), +) +``` + +### Retry Strategy Configuration + +```go +// Exponential backoff: 1s, 2s, 4s, 8s, 16s... +func exponentialBackoff(retryCount int64) int64 { + return int64(math.Pow(2, float64(retryCount-1))) +} + +// Linear backoff: 1s, 2s, 3s, 4s, 5s... +func linearBackoff(retryCount int64) int64 { + return retryCount +} + +// Custom backoff with jitter +func customBackoffWithJitter(retryCount int64) int64 { + base := int64(math.Pow(2, float64(retryCount-1))) + jitter := int64(rand.Intn(1000)) // 0-1000ms jitter + return base + jitter/1000 +} +``` + +--- + +## ๐Ÿ“ Implementation Details + +### Queue Naming Convention + +```go +func getRetryRoutingKey(queueName string, retry int64) string { + return fmt.Sprintf("%s__retry.%d", queueName, retry) +} + +// Examples: +// user-events__retry.1 +// user-events__retry.2 +// user-events__retry.3 +``` + +### Header Management + +```go +const ( + RetryCount = "x-retry-count" + OriginalTopicName = "x-original-topic" + OriginalActionName = "x-original-action" + PublishedTimestamp = "x-published-timestamp" + AppID = "x-app-id" +) + +func extractHeaderInt(headers amqp.Table, key string) int64 { + val, ok := headers[key] + if !ok { + return 0 + } + + var res int64 + _, err := fmt.Sscanf(fmt.Sprintf("%v", val), "%d", &res) + if err != nil { + return 0 + } + return res +} +``` + +### Message TTL Calculation + +```go +const millisecondsMultiplier = 10_000 // Convert to milliseconds + +func calculateExpiration(delayInSeconds int64) string { + return fmt.Sprintf("%d", delayInSeconds*millisecondsMultiplier) +} +``` + +--- + +## ๐ŸŽฎ Usage Examples + +### Basic Retry Usage + +```go +func messageHandler(ctx context.Context, m interfaces.InboundMessage) error { + // Try to process message + if err := processBusinessLogic(m.Data); err != nil { + // Check if error is retryable + if isTemporaryError(err) { + // Retry with exponential backoff + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + // Permanent error - move to DLQ + return m.MoveToDeadLetterQueue(ctx) + } + + // Success - acknowledge message + return m.Ack(ctx) +} +``` + +### Custom Retry Logic + +```go +func smartRetryHandler(ctx context.Context, m interfaces.InboundMessage) error { + err := processMessage(m) + if err == nil { + return m.Ack(ctx) + } + + // Custom retry strategy based on error type + switch { + case isNetworkError(err): + // Quick retry for network issues + return m.RetryWithDelayFn(ctx, func(retryCount int64) int64 { + return 1 // 1 second delay + }) + + case isRateLimitError(err): + // Longer delay for rate limiting + return m.RetryWithDelayFn(ctx, func(retryCount int64) int64 { + return 60 // 1 minute delay + }) + + case isDatabaseError(err): + // Exponential backoff for database issues + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + + default: + // Unknown error - move to DLQ for investigation + log.Error(). + Err(err). + Str("message_id", m.ID). + Msg("Unknown error, moving to DLQ") + return m.MoveToDeadLetterQueue(ctx) + } +} +``` + +### Conditional Retry + +```go +func conditionalRetryHandler(ctx context.Context, m interfaces.InboundMessage) error { + // Check retry count before processing + if m.RetryCount >= 3 { + log.Warn(). + Int64("retry_count", m.RetryCount). + Str("message_id", m.ID). + Msg("Maximum retries reached, moving to DLQ") + return m.MoveToDeadLetterQueue(ctx) + } + + err := processMessage(m) + if err != nil { + // Only retry specific error types + if shouldRetry(err) { + log.Info(). + Err(err). + Int64("retry_count", m.RetryCount). + Str("message_id", m.ID). + Msg("Retrying message") + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + // Don't retry - move to DLQ + return m.MoveToDeadLetterQueue(ctx) + } + + return m.Ack(ctx) +} + +func shouldRetry(err error) bool { + return isTemporaryError(err) || isTimeoutError(err) || isConnectionError(err) +} +``` + +--- + +## ๐Ÿ› ๏ธ Advanced Configuration + +### Custom Retry Queue Configuration + +```go +func customRetrySetup(channel *amqp.Channel, queueName string, maxRetries int64) error { + retryExchange := fmt.Sprintf("%s__retry_exchange", queueName) + dlxExchange := fmt.Sprintf("%s__dlx", queueName) + + // Declare exchanges + if err := channel.ExchangeDeclare(retryExchange, "topic", true, false, false, false, nil); err != nil { + return err + } + + if err := channel.ExchangeDeclare(dlxExchange, "fanout", true, false, false, false, nil); err != nil { + return err + } + + // Create retry queues with custom TTL + for i := int64(1); i <= maxRetries; i++ { + retryQueue := fmt.Sprintf("%s__retry.%d", queueName, i) + ttl := calculateCustomTTL(i) // Custom TTL calculation + + _, err := channel.QueueDeclare( + retryQueue, + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + amqp.Table{ + "x-dead-letter-exchange": dlxExchange, + "x-message-ttl": ttl, + }, + ) + if err != nil { + return err + } + + // Bind to retry exchange + if err := channel.QueueBind(retryQueue, retryQueue, retryExchange, false, nil); err != nil { + return err + } + } + + return nil +} + +func calculateCustomTTL(retryLevel int64) int32 { + // Custom TTL: 5s, 15s, 45s, 135s, 405s (fibonacci-like) + multipliers := []int32{5, 15, 45, 135, 405} + if retryLevel <= int64(len(multipliers)) { + return multipliers[retryLevel-1] * 1000 // Convert to milliseconds + } + return 600000 // 10 minutes for higher retry levels +} +``` + +### Retry Queue Monitoring + +```go +type RetryQueueMonitor struct { + channel *amqp.Channel + queuePrefix string + maxRetries int64 +} + +func (m *RetryQueueMonitor) GetRetryQueueStats() (map[string]int, error) { + stats := make(map[string]int) + + for i := int64(1); i <= m.maxRetries; i++ { + queueName := fmt.Sprintf("%s__retry.%d", m.queuePrefix, i) + + queue, err := m.channel.QueueInspect(queueName) + if err != nil { + return nil, err + } + + stats[queueName] = queue.Messages + } + + return stats, nil +} + +func (m *RetryQueueMonitor) DrainRetryQueue(retryLevel int64) error { + queueName := fmt.Sprintf("%s__retry.%d", m.queuePrefix, retryLevel) + + // Purge the retry queue + _, err := m.channel.QueuePurge(queueName, false) + return err +} +``` + +--- + +## ๐Ÿ“ˆ Monitoring & Observability + +### Metrics Collection + +```go +var ( + retriedMessages = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "goqueue_messages_retried_total", + Help: "Total number of messages retried", + }, + []string{"queue", "retry_level"}, + ) + + dlqMessages = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "goqueue_messages_dlq_total", + Help: "Total number of messages moved to DLQ", + }, + []string{"queue", "reason"}, + ) + + retryDelay = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "goqueue_retry_delay_seconds", + Help: "Delay before message retry", + }, + []string{"queue", "retry_level"}, + ) +) + +func recordRetryMetrics(queueName string, retryLevel int64, delay time.Duration) { + retriedMessages.WithLabelValues(queueName, fmt.Sprintf("%d", retryLevel)).Inc() + retryDelay.WithLabelValues(queueName, fmt.Sprintf("%d", retryLevel)).Observe(delay.Seconds()) +} +``` + +### Retry Logging + +```go +func retryWithLogging(ctx context.Context, m interfaces.InboundMessage, delayFn interfaces.DelayFn) error { + delay := delayFn(m.RetryCount + 1) + + log.Info(). + Str("message_id", m.ID). + Str("topic", m.Topic). + Str("action", m.Action). + Int64("retry_count", m.RetryCount). + Int64("delay_seconds", delay). + Msg("Retrying message") + + // Record metrics + recordRetryMetrics("main-queue", m.RetryCount+1, time.Duration(delay)*time.Second) + + return m.RetryWithDelayFn(ctx, delayFn) +} +``` + +### Dead Letter Queue Monitoring + +```go +func monitorDLQ(channel *amqp.Channel, dlqName string) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + queue, err := channel.QueueInspect(dlqName) + if err != nil { + log.Error().Err(err).Msg("Failed to inspect DLQ") + continue + } + + if queue.Messages > 0 { + log.Warn(). + Int("message_count", queue.Messages). + Str("queue", dlqName). + Msg("Messages in dead letter queue") + + // Send alert + sendDLQAlert(dlqName, queue.Messages) + } + } +} +``` + +--- + +## ๐Ÿ’ก Best Practices + +### 1. **Idempotency** + +```go +func idempotentHandler(ctx context.Context, m interfaces.InboundMessage) error { + // Check if message was already processed + if wasProcessed(m.ID) { + log.Info().Str("message_id", m.ID).Msg("Message already processed") + return m.Ack(ctx) + } + + // Process message + if err := processMessage(m); err != nil { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + // Mark as processed + markAsProcessed(m.ID) + + return m.Ack(ctx) +} +``` + +### 2. **Retry Budget** + +```go +type RetryBudget struct { + maxRetriesPerMinute int + currentRetries int + resetTime time.Time + mu sync.Mutex +} + +func (rb *RetryBudget) CanRetry() bool { + rb.mu.Lock() + defer rb.mu.Unlock() + + now := time.Now() + if now.After(rb.resetTime) { + rb.currentRetries = 0 + rb.resetTime = now.Add(time.Minute) + } + + if rb.currentRetries >= rb.maxRetriesPerMinute { + return false + } + + rb.currentRetries++ + return true +} + +func budgetedRetryHandler(ctx context.Context, m interfaces.InboundMessage, budget *RetryBudget) error { + err := processMessage(m) + if err != nil { + if budget.CanRetry() { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } else { + log.Warn().Msg("Retry budget exceeded, moving to DLQ") + return m.MoveToDeadLetterQueue(ctx) + } + } + + return m.Ack(ctx) +} +``` + +### 3. **Graceful Degradation** + +```go +func gracefulRetryHandler(ctx context.Context, m interfaces.InboundMessage) error { + // Check system health before processing + if !isSystemHealthy() { + // System is unhealthy, retry later + return m.RetryWithDelayFn(ctx, func(retryCount int64) int64 { + return 60 // Wait 1 minute during system issues + }) + } + + err := processMessage(m) + if err != nil { + if isSystemError(err) { + // System error - retry with longer delay + return m.RetryWithDelayFn(ctx, func(retryCount int64) int64 { + return retryCount * 30 // Longer delays for system errors + }) + } + + // Business logic error - use normal retry + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + return m.Ack(ctx) +} +``` + +--- + +## ๐Ÿ”ง Troubleshooting + +### Common Issues + +1. **Messages Not Retrying** + + - Check retry exchange and queue bindings + - Verify TTL configuration + - Check dead letter exchange setup + +2. **Messages Stuck in Retry Queues** + + - Check TTL values + - Verify dead letter exchange configuration + - Monitor queue depths + +3. **Excessive Retries** + - Review retry logic + - Check error classification + - Monitor retry rates + +### Debug Tools + +```go +func debugRetrySetup(channel *amqp.Channel, queueName string) error { + // Check main queue + mainQueue, err := channel.QueueInspect(queueName) + if err != nil { + return err + } + + log.Info(). + Str("queue", queueName). + Int("messages", mainQueue.Messages). + Int("consumers", mainQueue.Consumers). + Msg("Main queue status") + + // Check retry queues + for i := 1; i <= 5; i++ { + retryQueue := fmt.Sprintf("%s__retry.%d", queueName, i) + queue, err := channel.QueueInspect(retryQueue) + if err != nil { + log.Error().Err(err).Str("queue", retryQueue).Msg("Failed to inspect retry queue") + continue + } + + log.Info(). + Str("queue", retryQueue). + Int("messages", queue.Messages). + Msg("Retry queue status") + } + + return nil +} +``` + +--- + +## โšก Performance Considerations + +### 1. **Channel Management** + +```go +// Use separate channels for retry operations +type ChannelManager struct { + consumerChannel *amqp.Channel + retryChannel *amqp.Channel + dlqChannel *amqp.Channel +} + +func (cm *ChannelManager) PublishRetry(ctx context.Context, exchange, routingKey string, msg amqp.Publishing) error { + // Use dedicated retry channel to avoid blocking consumer + return cm.retryChannel.PublishWithContext(ctx, exchange, routingKey, false, false, msg) +} +``` + +### 2. **Memory Management** + +```go +// Pool message objects to reduce GC pressure +var messagePool = sync.Pool{ + New: func() interface{} { + return &amqp.Publishing{} + }, +} + +func getPooledMessage() *amqp.Publishing { + return messagePool.Get().(*amqp.Publishing) +} + +func putPooledMessage(msg *amqp.Publishing) { + // Reset message + *msg = amqp.Publishing{} + messagePool.Put(msg) +} +``` + +### 3. **Batch Operations** + +```go +func batchRetryOperations(ctx context.Context, retries []RetryRequest) error { + // Begin transaction + tx := channel.Tx() + defer tx.TxRollback() + + for _, retry := range retries { + if err := publishRetryMessage(ctx, retry); err != nil { + return err + } + } + + // Commit all retry operations + return tx.TxCommit() +} +``` + +--- + +## ๐Ÿ”ฎ Future Enhancements + +### Planned Improvements + +1. **Adaptive Retry Delays** + + - Machine learning-based delay calculation + - System load-aware retry timing + - Historical success rate optimization + +2. **Advanced Dead Letter Handling** + + - Automatic DLQ message replay + - Message transformation and repair + - Intelligent error categorization + +3. **Cross-Platform Retry** + + - Unified retry interface for all queue platforms + - Platform-specific optimizations + - Consistent retry semantics + +4. **Enhanced Monitoring** + - Real-time retry dashboards + - Predictive failure analysis + - Automated alerting and remediation + +--- + +The RabbitMQ retry mechanism in GoQueue provides a robust, production-ready solution for handling message failures. By leveraging RabbitMQ's native features like TTL, dead letter exchanges, and message routing, it ensures reliable message processing with minimal complexity and maximum visibility into the retry process. diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..24652db --- /dev/null +++ b/docs/README.md @@ -0,0 +1,363 @@ +# ๐Ÿ“š GoQueue Documentation + +Welcome to the comprehensive documentation for GoQueue - the universal Go message queue library. This documentation provides in-depth guides for each component of the system. + +## ๐Ÿ“– Documentation Index + +### ๐Ÿ”Œ [Middleware System](MIDDLEWARE.md) + +Learn how to extend GoQueue's functionality using the powerful middleware system. + +**What you'll learn:** + +- How middleware works in GoQueue +- Creating custom middleware for consumers and publishers +- Built-in middleware examples (logging, metrics, validation, compression) +- Advanced patterns (conditional middleware, circuit breakers, batching) +- Performance considerations and best practices +- Testing middleware components + +**Key Topics:** + +- Consumer and Publisher middleware +- Error handling middleware +- Rate limiting and circuit breaker patterns +- Tracing and observability middleware +- Custom middleware development + +--- + +### ๐Ÿ“จ [Consumer System](CONSUMER.md) + +Master the art of consuming and processing messages reliably. + +**What you'll learn:** + +- Setting up and configuring consumers +- Message handling patterns and best practices +- Retry mechanisms and error handling strategies +- Monitoring and observability for consumers +- Performance tuning and optimization + +**Key Topics:** + +- Message acknowledgment strategies +- Retry patterns and dead letter queues +- Concurrent processing and scaling +- Health checks and graceful shutdown +- Testing consumer logic + +--- + +### ๐Ÿ“ค [Publisher System](PUBLISHER.md) + +Build robust, high-performance message publishing systems. + +**What you'll learn:** + +- Publisher configuration and setup +- Message structure and design patterns +- Connection management and pooling +- Error handling and retry strategies +- Performance optimization techniques + +**Key Topics:** + +- Message builders and serialization +- Async publishing and batching +- Connection health monitoring +- Circuit breaker patterns +- Metrics and observability + +--- + +### ๐Ÿ”„ [RabbitMQ Retry Architecture](RABBITMQ-RETRY.md) + +Deep dive into GoQueue's sophisticated retry mechanism for RabbitMQ. + +**What you'll learn:** + +- How the retry architecture works internally +- Queue topology and message flow +- Configuration options and strategies +- Monitoring retry operations +- Troubleshooting retry issues +- Performance considerations + +**Key Topics:** + +- Dead letter exchange patterns +- TTL-based retry delays +- Exponential backoff strategies +- Retry queue management +- Failure analysis and debugging + +--- + +## ๐Ÿš€ Quick Start Guide + +If you're new to GoQueue, start with these steps: + +1. **๐Ÿ“ฆ Installation** + + ```bash + go get -u github.com/bxcodec/goqueue + ``` + +2. **๐ŸŽฏ Choose Your Platform** + + - Currently supported: RabbitMQ + - Coming soon: Google Pub/Sub, AWS SQS, Apache Kafka + +3. **๐Ÿ“– Read the Basics** + + - Start with the main [README](../README.md) + - Review the [examples](../examples/) directory + - Check out the [Consumer](CONSUMER.md) and [Publisher](PUBLISHER.md) docs + +4. **๐Ÿ”ง Advanced Features** + - Explore [Middleware](MIDDLEWARE.md) for extensibility + - Learn about [RabbitMQ Retry](RABBITMQ-RETRY.md) for resilience + +## ๐ŸŽฏ Use Case Guides + +### Event-Driven Architecture + +```go +// Publisher side +publisher.Publish(ctx, interfaces.Message{ + Topic: "user-events", + Action: "user.created", + Data: userData, +}) + +// Consumer side +func handleUserEvent(ctx context.Context, m interfaces.InboundMessage) error { + switch m.Action { + case "user.created": + return handleUserCreated(ctx, m) + case "user.updated": + return handleUserUpdated(ctx, m) + } +} +``` + +### Microservices Communication + +```go +// Service A publishes +publisher.Publish(ctx, interfaces.Message{ + Topic: "orders", + Action: "order.placed", + Data: orderData, + Headers: map[string]interface{}{ + "correlation_id": requestID, + "reply_to": "order-responses", + }, +}) + +// Service B consumes and processes +func processOrder(ctx context.Context, m interfaces.InboundMessage) error { + // Process order + if err := orderService.Process(m.Data); err != nil { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + return m.Ack(ctx) +} +``` + +### Background Job Processing + +```go +// Job publisher +publisher.Publish(ctx, interfaces.Message{ + Topic: "background-jobs", + Action: "email.send", + Data: EmailJob{ + To: "user@example.com", + Subject: "Welcome!", + Body: emailBody, + }, +}) + +// Job worker +func processEmailJob(ctx context.Context, m interfaces.InboundMessage) error { + var job EmailJob + if err := json.Unmarshal(m.Data, &job); err != nil { + return m.MoveToDeadLetterQueue(ctx) + } + + if err := emailService.Send(job); err != nil { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + return m.Ack(ctx) +} +``` + +## ๐Ÿ› ๏ธ Development Patterns + +### Repository Pattern Integration + +```go +type UserEventHandler struct { + userRepo UserRepository + emailSvc EmailService + logger *log.Logger +} + +func (h *UserEventHandler) HandleMessage(ctx context.Context, m interfaces.InboundMessage) error { + switch m.Action { + case "user.created": + return h.handleUserCreated(ctx, m) + case "user.deleted": + return h.handleUserDeleted(ctx, m) + } + return nil +} + +func (h *UserEventHandler) handleUserCreated(ctx context.Context, m interfaces.InboundMessage) error { + var event UserCreatedEvent + if err := json.Unmarshal(m.Data, &event); err != nil { + return m.MoveToDeadLetterQueue(ctx) + } + + // Business logic with repository + user, err := h.userRepo.FindByID(ctx, event.UserID) + if err != nil { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + // Send welcome email + if err := h.emailSvc.SendWelcome(ctx, user.Email); err != nil { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + + return m.Ack(ctx) +} +``` + +### Domain-Driven Design Integration + +```go +type OrderDomainHandler struct { + orderAggregate OrderAggregate + eventBus EventBus +} + +func (h *OrderDomainHandler) HandleMessage(ctx context.Context, m interfaces.InboundMessage) error { + // Convert to domain event + domainEvent, err := h.toDomainEvent(m) + if err != nil { + return m.MoveToDeadLetterQueue(ctx) + } + + // Process through domain aggregate + events, err := h.orderAggregate.Handle(ctx, domainEvent) + if err != nil { + if errors.Is(err, domain.ErrRetryable) { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + return m.MoveToDeadLetterQueue(ctx) + } + + // Publish resulting events + for _, event := range events { + if err := h.eventBus.Publish(ctx, event); err != nil { + return m.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) + } + } + + return m.Ack(ctx) +} +``` + +## ๐Ÿ“Š Monitoring and Observability + +### Health Checks + +```go +type QueueHealthCheck struct { + consumer consumer.Consumer + publisher publisher.Publisher +} + +func (h *QueueHealthCheck) Check(ctx context.Context) error { + // Test publishing + testMsg := interfaces.Message{ + ID: "health-check", + Topic: "health", + Action: "ping", + Data: map[string]interface{}{"timestamp": time.Now()}, + } + + if err := h.publisher.Publish(ctx, testMsg); err != nil { + return fmt.Errorf("publisher health check failed: %w", err) + } + + // Additional consumer health checks... + return nil +} +``` + +### Metrics Integration + +```go +func MetricsMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) error { + start := time.Now() + + messagesProcessed.WithLabelValues(m.Topic, m.Action).Inc() + + err := next(ctx, m) + + duration := time.Since(start).Seconds() + processingDuration.WithLabelValues(m.Topic, m.Action).Observe(duration) + + if err != nil { + messageErrors.WithLabelValues(m.Topic, m.Action).Inc() + } + + return err + } + } +} +``` + +## ๐Ÿค Contributing to Documentation + +We welcome contributions to improve our documentation! Here's how you can help: + +### ๐Ÿ“ Writing Guidelines + +- Use clear, concise language +- Provide practical examples +- Include code samples that work +- Add troubleshooting sections +- Keep content up-to-date + +### ๐Ÿ› Reporting Issues + +- Documentation bugs or inaccuracies +- Missing information or examples +- Unclear explanations +- Broken code samples + +### ๐Ÿ’ก Suggestions + +- New use case examples +- Additional patterns and best practices +- Performance optimization tips +- Integration guides + +## ๐Ÿ“ž Getting Help + +- **๐Ÿ“– Documentation**: You're here! Check the component-specific docs above +- **๐Ÿ’ฌ Discussions**: [GitHub Discussions](https://github.com/bxcodec/goqueue/discussions) +- **๐Ÿ› Issues**: [GitHub Issues](https://github.com/bxcodec/goqueue/issues) +- ๐Ÿ“ง **Email**: [iman@tumorang.com](mailto:iman@tumorang.com) + +--- + +**Happy queueing! ๐Ÿš€** diff --git a/misc/images/core-concept.png b/misc/images/core-concept.png new file mode 100644 index 0000000..3b5df30 Binary files /dev/null and b/misc/images/core-concept.png differ