Excalibase Watcher
Turn any database into a real-time event stream.
Excalibase Watcher captures INSERT, UPDATE, DELETE, DDL, and TRUNCATE events from your database and streams them to NATS JetStream — so any service, in any language, can react to data changes without polling or triggers.
Think of it as your self-hosted Firebase Realtime Database alternative: your backend services subscribe to change streams the same way a frontend subscribes to Firebase. Except it runs on your infrastructure, against your existing PostgreSQL or MySQL, with no vendor lock-in.
Feature Matrix
| Feature | PostgreSQL | MySQL |
|---|---|---|
| INSERT / UPDATE / DELETE | Real column names | Real column names |
| DDL capture (CREATE, ALTER, DROP) | capture-ddl=true | Automatic via QUERY events |
| TRUNCATE | Via WAL | Via binlog |
| Column names | Real names (from RELATION message) | Real names |
| Before + after on UPDATE | REPLICA IDENTITY FULL | Always included |
| Full row on DELETE | REPLICA IDENTITY FULL | Always included |
| Table filtering | app.cdc.postgres.tables | app.cdc.mysql.tables |
| Snapshot on startup | app.cdc.postgres.snapshot-mode | app.cdc.mysql.snapshot-mode |
| Offset persistence | Replication slot (automatic) | NATS JetStream (automatic) |
| Health indicator | Spring Boot Actuator /health | Spring Boot Actuator /health |
| Metrics | Micrometer (events/sec, lag) | Micrometer (events/sec, lag) |
| Schema history | Tracks DDL changes | Tracks DDL changes |
Two use cases
Real-time event fan-out
Keep downstream systems in sync as your data changes:
[PostgreSQL / MySQL]
↓ WAL / binlog
excalibase-watcher
↓ NATS JetStream (subject: cdc.{schema}.{table})
↓
inventory-svc | search-indexer | audit-logger | cache-invalidator
Any number of consumers subscribe independently. Each gets every event (fan-out), or you can load-balance across pods with a queue group.
Data pipeline / migration
Stream changes from one database to another — a lighter alternative to Debezium for scenarios like:
- Postgres → data warehouse (Redshift, BigQuery, Snowflake via your own consumer)
- Postgres → Elasticsearch — keep your search index in sync without cron jobs
- Postgres → another Postgres — live replication without setting up a full replica
- Legacy DB migration — capture changes during cutover so nothing is lost
The consumer receives structured JSON events with the full row data — your pipeline reads the stream and writes to the target.
Modules
| Module | Description |
|---|---|
excalibase-watcher-core | CDCService, CDCEvent, health indicator, schema history, metrics — pure Java |
excalibase-watcher-postgres | PostgreSQL WAL listener (logical replication, pgoutput, RELATION-based column names) |
excalibase-watcher-mysql | MySQL binlog listener (row-based replication, real column names) |
excalibase-watcher-nats | NATS JetStream publisher |
excalibase-watcher-api | Spring Boot CDC server with health checks, metrics, and E2E tests |
Quick Start
1. Start infrastructure
docker compose up -d
Starts PostgreSQL 16 (wal_level=logical), MySQL 8.0, and NATS 2.10 with JetStream enabled.
2. Run the CDC server
mvn spring-boot:run -pl excalibase-watcher-api
3. Make changes and watch events flow
INSERT INTO orders (product, qty) VALUES ('keyboard', 2);
UPDATE orders SET status = 'shipped' WHERE product = 'keyboard';
DELETE FROM orders WHERE product = 'keyboard';
Console output:
[CDC] type=INSERT schema=public table=orders data={"id":"1","product":"keyboard","qty":"2"}
[CDC] type=UPDATE schema=public table=orders data={"new":{"id":"1","product":"keyboard","status":"shipped"}}
[CDC] type=DELETE schema=public table=orders data={"id":"1","product":"keyboard","qty":"2"}
NATS Message Format
Every change is published as JSON to the subject cdc.{schema}.{table}:
{
"type": "INSERT",
"schema": "public",
"table": "orders",
"data": "{\"id\":\"1\", \"product\":\"keyboard\", \"qty\":\"2\"}",
"lsn": "0/1989540",
"timestamp": 1742056200000,
"sourceTimestamp": 1742056199000
}
| Field | Description |
|---|---|
type | INSERT, UPDATE, DELETE, DDL, TRUNCATE |
schema | Database schema name |
table | Table name |
data | Row data as JSON string (real column names for both PostgreSQL and MySQL) |
lsn | PostgreSQL LSN or MySQL binlog position |
timestamp | Time the event was published to NATS (epoch ms) |
sourceTimestamp | Time the change occurred in the database (epoch ms) |
DDL Capture
Both databases support capturing schema changes (CREATE TABLE, ALTER TABLE, DROP TABLE, etc.):
PostgreSQL — enable with app.cdc.postgres.capture-ddl=true. DDL events are published as:
{
"type": "DDL",
"schema": "public",
"table": "orders",
"data": "{\"ddl\":\"ALTER TABLE orders ADD COLUMN notes TEXT\"}",
"sourceTimestamp": 1742056200000
}
MySQL — DDL is captured automatically via QUERY binlog events. No extra config needed.
Consuming Events
Node.js
const { connect, consumerOpts } = require('nats')
const nc = await connect({ servers: 'nats://localhost:4222' })
const js = nc.jetstream()
const opts = consumerOpts()
opts.durable('my-service')
opts.deliverAll()
opts.ackExplicit()
const sub = await js.subscribe('cdc.>', opts)
for await (const msg of sub) {
const event = JSON.parse(msg.data)
console.log(`[${event.type}] ${event.schema}.${event.table}`, event.data)
msg.ack()
}
Java (jnats)
Connection nc = Nats.connect("nats://localhost:4222");
JetStream js = nc.jetStream();
PushSubscribeOptions opts = PushSubscribeOptions.builder()
.durable("order-svc")
.build();
JetStreamSubscription sub = js.subscribe("cdc.public.orders", opts);
while (true) {
Message msg = sub.nextMessage(Duration.ofSeconds(30));
if (msg != null) {
String json = new String(msg.getData(), StandardCharsets.UTF_8);
CDCEvent event = objectMapper.readValue(json, CDCEvent.class);
if ("INSERT".equals(event.getType())) {
// handle new order
}
msg.ack();
}
}
Scaling
Use a durable consumer to survive restarts — NATS replays missed messages when your service comes back up:
opts.durable('inventory-service') // survives restarts
opts.deliverAll() // replay from beginning on first connect
Use a queue group to load-balance across multiple pods of the same service:
opts.durable('inventory-service')
opts.queue('inventory-service') // each message delivered to one pod only
Without queue(), every pod gets every message (fan-out — useful for cache invalidation across all instances simultaneously).