A functional, type-safe adapter for connecting the Open Bank Project (OBP) API to Core Banking Systems (CBS) via RabbitMQ messaging.
Note to Banks: Do NOT clone and modify this repository directly.
See HOW-BANKS-USE-THIS.md for the correct way to use this adapter.
This adapter acts as a bridge between OBP-API and your Core Banking System:
OBP-API ←→ RabbitMQ ←→ This Adapter ←→ Your CBS (REST/SOAP/etc)
Key Features:
- Plugin Architecture: Implement one interface, get full OBP integration
- Built-in Telemetry: Metrics, logging, and tracing
- Type-Safe: Leverages Scala's type system to catch errors at compile time
- Functional: Pure functional programming with Cats Effect
- Bank-Agnostic: Generic OBP message handling, CBS-specific implementations pluggable
- Environment support: Docker support, health checks, graceful shutdown
This adapter cleanly separates three concerns:
- RabbitMQ message consumption/production
- OBP message parsing and routing
- Message correlation and context tracking
- Local adapter implementations
- Your bank's API integration
- Data mapping between CBS and OBP models
- Telemetry interface
- Metrics collection
- Distributed tracing
- Error tracking
See ARCHITECTURE.md for detailed documentation.
Banks: Read HOW-BANKS-USE-THIS.md first! This explains:
- ✅ The correct way to use this adapter (as a dependency)
- ❌ The wrong way (cloning and modifying)
- 📦 Maven dependency vs Docker base image vs Git submodule
- 🏗️ Setting up your own adapter project
- 🔄 How to get updates without merge conflicts
Try it now:
# 1. Start RabbitMQ
./start_rabbitmq.sh
# 2. Build and run (in new terminal)
./build_and_run.sh
# 3. Open browser to http://localhost:52345
# 4. Click "Send Get Adapter Info" button
# 5. Watch the console logs!See QUICKSTART.md for detailed instructions.
- Java 11+
- Maven 3.6+
- RabbitMQ server
- Your Core Banking System API
mvn clean packageThis creates target/obp-rabbit-cats-adapter.jar
Create a new class implementing LocalAdapter:
package com.tesobe.obp.adapter.cbs.implementations
import com.tesobe.obp.adapter.interfaces._
import com.tesobe.obp.adapter.models._
import cats.effect.IO
class YourBankAdapter(
baseUrl: String,
apiKey: String,
telemetry: Telemetry
) extends LocalAdapter {
override def name: String = "YourBank-Adapter"
override def version: String = "1.0.0"
override def getBank(bankId: String, callContext: CallContext): IO[LocalAdapterResult[BankCommons]] = {
// Your implementation here
// Call your CBS API, map response to BankCommons
???
}
override def getBankAccount(...): IO[LocalAdapterResult[BankAccountCommons]] = {
// Your implementation
???
}
// Implement ~30 methods for full OBP functionality
// See interfaces/LocalAdapter.scala for complete interface
}Create .env file or set environment variables:
# HTTP Server Configuration (Discovery Page)
HTTP_HOST=0.0.0.0
HTTP_PORT=8080
HTTP_ENABLED=true
# RabbitMQ Configuration
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_REQUEST_QUEUE=obp.request
RABBITMQ_RESPONSE_QUEUE=obp.response
# Your CBS Configuration
# (Your bank-specific local adapter will define what config it needs)
# Telemetry
TELEMETRY_TYPE=console
ENABLE_METRICS=true
LOG_LEVEL=INFOjava -jar target/obp-rabbit-cats-adapter.jarOr with Docker:
docker build -t obp-adapter .
docker run --env-file .env obp-adapterOnce running, open your browser to:
http://localhost:8080
The discovery page shows:
- Health & Status - Health check and readiness endpoints
- RabbitMQ - Connection info and management UI link
- Observability - Metrics and logging configuration
- CBS Configuration - Core Banking System settings
- Documentation - Links to OBP resources
Available Endpoints:
GET /- Discovery page (HTML)GET /health- Health check (JSON)GET /ready- Readiness check (JSON)GET /info- Service info (JSON)
Example health check:
curl http://localhost:8080/healthResponse:
{
"status": "healthy",
"service": "OBP-Rabbit-Cats-Adapter",
"version": "1.0.0-SNAPSHOT",
"timestamp": "1704067200000"
}Your local adapter must implement these key operations:
trait LocalAdapter {
// Bank Operations
def getBank(bankId: String, ...): IO[LocalAdapterResult[BankCommons]]
def getBanks(...): IO[LocalAdapterResult[List[BankCommons]]]
// Account Operations
def getBankAccount(bankId: String, accountId: String, ...): IO[LocalAdapterResult[BankAccountCommons]]
def getBankAccounts(...): IO[LocalAdapterResult[List[BankAccountCommons]]]
def createBankAccount(...): IO[LocalAdapterResult[BankAccountCommons]]
// Transaction Operations
def getTransaction(...): IO[LocalAdapterResult[TransactionCommons]]
def getTransactions(...): IO[LocalAdapterResult[List[TransactionCommons]]]
def makePayment(...): IO[LocalAdapterResult[TransactionCommons]]
// Customer Operations
def getCustomer(...): IO[LocalAdapterResult[CustomerCommons]]
def createCustomer(...): IO[LocalAdapterResult[CustomerCommons]]
// ... and more (see interfaces/LocalAdapter.scala)
}Note: You don't need to implement all operations at once. Start with the operations your bank needs, return LocalAdapterResult.Error for unimplemented ones.
A mock adapter is provided for testing:
class MockLocalAdapter extends LocalAdapter {
override def getBank(bankId: String, callContext: CallContext): IO[LocalAdapterResult[BankCommons]] = {
IO.pure(LocalAdapterResult.success(
BankCommons(
bankId = bankId,
shortName = "Mock Bank",
fullName = "Mock Bank for Testing",
logoUrl = "https://example.com/logo.png",
websiteUrl = "https://example.com"
),
callContext
))
}
// ... other operations
}The adapter includes comprehensive telemetry:
val telemetry = new ConsoleTelemetry()Output:
[INFO][CID: 1flssoftxq0cr1nssr68u0mioj] Message received: type=obp.getBank queue=obp.request
[INFO][CID: 1flssoftxq0cr1nssr68u0mioj] CBS operation started: getBank
[INFO][CID: 1flssoftxq0cr1nssr68u0mioj] CBS operation success: getBank duration=45ms
[INFO][CID: 1flssoftxq0cr1nssr68u0mioj] Response sent: type=obp.getBank success=true
val telemetry = new PrometheusTelemetry(prometheusConfig)Exposes metrics at /metrics:
obp_messages_received_total{type="obp.getBank"}obp_messages_processed_seconds{type="obp.getBank"}obp_cbs_operation_duration_seconds{operation="getBank"}obp_cbs_operation_errors_total{operation="getBank",code="timeout"}
Implement the Telemetry trait for your monitoring stack:
class YourTelemetry extends Telemetry {
override def recordMessageProcessed(...) = {
// Send to Datadog, New Relic, etc.
???
}
// ... implement other methods
}- OBP-API sends message to RabbitMQ
obp.requestqueue - Adapter consumes message, extracts correlation ID
- MessageRouter routes to appropriate handler based on message type
- Handler calls your
LocalAdapterimplementation - Your Adapter calls your CBS API
- Response mapped to OBP format and sent to
obp.responsequeue - OBP-API receives response and returns to client
All steps are instrumented with telemetry.
The adapter handles these OBP message types:
obp.getBank- Get bank informationobp.getBankAccount- Get account detailsobp.getTransaction- Get transaction detailsobp.getTransactions- Get transaction listobp.makePayment- Create payment/transferobp.getCustomer- Get customer informationobp.createCustomer- Create new customerobp.getCounterparty- Get counterparty detailsobp.getCurrentFxRate- Get exchange rate- ... and many more (see OBP message docs)
src/main/scala/com/tesobe/obp/adapter/
├── config/ # Configuration
│ └── Config.scala
├── models/ # OBP message models
│ └── OBPModels.scala
├── messaging/ # RabbitMQ handling
│ ├── RabbitMQConsumer.scala
│ └── RabbitMQProducer.scala
├── handlers/ # Message handlers
│ ├── BankHandlers.scala
│ ├── AccountHandlers.scala
│ └── TransactionHandlers.scala
├── interfaces/ # Core contracts
│ └── LocalAdapter.scala ← YOU IMPLEMENT THIS
├── cbs/implementations/ # CBS implementations
│ ├── MockLocalAdapter.scala # For testing
│ ├── RestLocalAdapter.scala # Generic REST
│ └── YourBankAdapter.scala ← YOUR CODE HERE
├── telemetry/ # Observability
│ ├── Telemetry.scala
│ ├── ConsoleTelemetry.scala
│ └── PrometheusTelemetry.scala
└── AdapterMain.scala # Entry point
Test your local adapter in isolation:
class YourBankAdapterSpec extends CatsEffectSuite {
test("getBank returns bank data") {
val adapter = new YourBankAdapter(mockHttp, NoOpTelemetry)
val result = adapter.getBank("test-bank", CallContext("test-123"))
result.map {
case LocalAdapterResult.Success(bank, _, _) =>
assertEquals(bank.bankId, "test-bank")
case LocalAdapterResult.Error(code, msg, _, _) =>
fail(s"Expected success, got error: $code - $msg")
}
}
}Use MockLocalAdapter to test message flow without real CBS.
Test with real RabbitMQ and mock CBS:
# Start RabbitMQ
docker-compose up -d rabbitmq
# Run adapter with mock adapter
LOCAL_ADAPTER_TYPE=mock java -jar target/obp-rabbit-cats-adapter.jar
# Send test message to obp.request queue
# Verify response in obp.response queueThe adapter provides consistent error handling:
// In your local adapter
def getBank(...): IO[LocalAdapterResult[BankCommons]] = {
httpClient.get(url)
.map(response => LocalAdapterResult.success(mapToBankCommons(response), callContext))
.handleErrorWith {
case _: TimeoutException =>
IO.pure(LocalAdapterResult.error("CBS_TIMEOUT", "Request timed out", callContext))
case _: NotFoundException =>
IO.pure(LocalAdapterResult.error("BANK_NOT_FOUND", "Bank does not exist", callContext))
case e: Exception =>
IO.pure(LocalAdapterResult.error("CBS_ERROR", e.getMessage, callContext))
}
}Errors are:
- Logged with correlation ID
- Sent as telemetry events
- Returned to OBP with proper error codes
- Include backend messages for debugging
- Async: All I/O is non-blocking with Cats Effect
- Streaming: RabbitMQ messages processed as stream
- Backpressure: Configurable prefetch count prevents overload
- Connection Pooling: HTTP clients use connection pools
- Retries: Configurable retry logic with exponential backoff
FROM openjdk:11-jre-slim
WORKDIR /app
COPY target/obp-rabbit-cats-adapter.jar .
CMD ["java", "-jar", "obp-rabbit-cats-adapter.jar"]apiVersion: apps/v1
kind: Deployment
metadata:
name: obp-adapter
spec:
replicas: 3
template:
spec:
containers:
- name: adapter
image: obp-adapter:latest
env:
- name: RABBITMQ_HOST
valueFrom:
configMapKeyRef:
name: obp-config
key: rabbitmq.host
- name: CBS_BASE_URL
valueFrom:
secretKeyRef:
name: cbs-credentials
key: base-url| Variable | Description | Default |
|---|---|---|
HTTP_HOST |
HTTP server host | 0.0.0.0 |
HTTP_PORT |
HTTP server port | 8080 |
HTTP_ENABLED |
Enable HTTP server | true |
| Variable | Description | Default |
|---|---|---|
RABBITMQ_HOST |
RabbitMQ server host | localhost |
RABBITMQ_PORT |
RabbitMQ server port | 5672 |
RABBITMQ_USERNAME |
Username | guest |
RABBITMQ_PASSWORD |
Password | guest |
RABBITMQ_VIRTUAL_HOST |
Virtual host | / |
RABBITMQ_REQUEST_QUEUE |
Request queue name | obp.request |
RABBITMQ_RESPONSE_QUEUE |
Response queue name | obp.response |
RABBITMQ_PREFETCH_COUNT |
Messages to prefetch | 10 |
| Variable | Description | Default |
|---|---|---|
REDIS_HOST |
Redis host | localhost |
REDIS_PORT |
Redis port | 6379 |
REDIS_ENABLED |
Enable Redis | true |
| Variable | Description | Default |
|---|---|---|
TELEMETRY_TYPE |
Type (console/prometheus/noop) | console |
ENABLE_METRICS |
Enable metrics | true |
LOG_LEVEL |
Log level | INFO |
Q: Do I need to implement all 30+ methods in LocalAdapter?
A: No. Start with the operations you need. Return LocalAdapterResult.Error("NOT_IMPLEMENTED", ...) for others.
Q: Can I use SOAP instead of REST?
A: Yes. Implement LocalAdapter to call SOAP endpoints. The interface is transport-agnostic.
Q: How do I handle authentication to my CBS?
A: Handle it in your LocalAdapter implementation. The adapter provides config for common auth types.
Q: Can I run multiple adapters for different banks?
A: Yes. Each adapter instance can have a different LocalAdapter implementation.
Q: What if my CBS API doesn't map directly to OBP models?
A: That's expected. Your LocalAdapter implementation does the mapping. This is bank-specific logic.
Q: How do I debug message processing?
A: Use ConsoleTelemetry in development. Every message includes a correlation ID for tracing.
Contributions welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
Apache License 2.0
- How Banks Use This: HOW-BANKS-USE-THIS.md ⭐
- Architecture: ARCHITECTURE.md
- Separation of Concerns: SEPARATION-OF-CONCERNS.md
- Issues: GitHub Issues
- OBP Wiki: https://github.com/OpenBankProject/OBP-API/wiki
- Community: Join OBP Rocket Chat
Built with:
- Scala - Programming language
- Cats Effect - Functional effects
- fs2 - Functional streams
- fs2-rabbit - RabbitMQ client
- http4s - HTTP client
- Circe - JSON library