watcher
watcherbeta

Excalibase Watcher

Turn any database into a real-time event stream.

Excalibase Watcher captures INSERT, UPDATE, DELETE, DDL, and TRUNCATE events from your database and streams them to NATS JetStream — so any service, in any language, can react to data changes without polling or triggers.

Think of it as your self-hosted Firebase Realtime Database alternative: your backend services subscribe to change streams the same way a frontend subscribes to Firebase. Except it runs on your infrastructure, against your existing PostgreSQL or MySQL, with no vendor lock-in.

Feature Matrix

FeaturePostgreSQLMySQL
INSERT / UPDATE / DELETEReal column namesReal column names
DDL capture (CREATE, ALTER, DROP)capture-ddl=trueAutomatic via QUERY events
TRUNCATEVia WALVia binlog
Column namesReal names (from RELATION message)Real names
Before + after on UPDATEREPLICA IDENTITY FULLAlways included
Full row on DELETEREPLICA IDENTITY FULLAlways included
Table filteringapp.cdc.postgres.tablesapp.cdc.mysql.tables
Snapshot on startupapp.cdc.postgres.snapshot-modeapp.cdc.mysql.snapshot-mode
Offset persistenceReplication slot (automatic)NATS JetStream (automatic)
Health indicatorSpring Boot Actuator /healthSpring Boot Actuator /health
MetricsMicrometer (events/sec, lag)Micrometer (events/sec, lag)
Schema historyTracks DDL changesTracks DDL changes

Two use cases

Real-time event fan-out

Keep downstream systems in sync as your data changes:

[PostgreSQL / MySQL]
        ↓  WAL / binlog
  excalibase-watcher
        ↓  NATS JetStream  (subject: cdc.{schema}.{table})
        ↓
  inventory-svc  |  search-indexer  |  audit-logger  |  cache-invalidator

Any number of consumers subscribe independently. Each gets every event (fan-out), or you can load-balance across pods with a queue group.

Data pipeline / migration

Stream changes from one database to another — a lighter alternative to Debezium for scenarios like:

  • Postgres → data warehouse (Redshift, BigQuery, Snowflake via your own consumer)
  • Postgres → Elasticsearch — keep your search index in sync without cron jobs
  • Postgres → another Postgres — live replication without setting up a full replica
  • Legacy DB migration — capture changes during cutover so nothing is lost

The consumer receives structured JSON events with the full row data — your pipeline reads the stream and writes to the target.

Modules

ModuleDescription
excalibase-watcher-coreCDCService, CDCEvent, health indicator, schema history, metrics — pure Java
excalibase-watcher-postgresPostgreSQL WAL listener (logical replication, pgoutput, RELATION-based column names)
excalibase-watcher-mysqlMySQL binlog listener (row-based replication, real column names)
excalibase-watcher-natsNATS JetStream publisher
excalibase-watcher-apiSpring Boot CDC server with health checks, metrics, and E2E tests

Quick Start

1. Start infrastructure

docker compose up -d

Starts PostgreSQL 16 (wal_level=logical), MySQL 8.0, and NATS 2.10 with JetStream enabled.

2. Run the CDC server

mvn spring-boot:run -pl excalibase-watcher-api

3. Make changes and watch events flow

INSERT INTO orders (product, qty) VALUES ('keyboard', 2);
UPDATE orders SET status = 'shipped' WHERE product = 'keyboard';
DELETE FROM orders WHERE product = 'keyboard';

Console output:

[CDC] type=INSERT schema=public table=orders data={"id":"1","product":"keyboard","qty":"2"}
[CDC] type=UPDATE schema=public table=orders data={"new":{"id":"1","product":"keyboard","status":"shipped"}}
[CDC] type=DELETE schema=public table=orders data={"id":"1","product":"keyboard","qty":"2"}

NATS Message Format

Every change is published as JSON to the subject cdc.{schema}.{table}:

{
  "type": "INSERT",
  "schema": "public",
  "table": "orders",
  "data": "{\"id\":\"1\", \"product\":\"keyboard\", \"qty\":\"2\"}",
  "lsn": "0/1989540",
  "timestamp": 1742056200000,
  "sourceTimestamp": 1742056199000
}
FieldDescription
typeINSERT, UPDATE, DELETE, DDL, TRUNCATE
schemaDatabase schema name
tableTable name
dataRow data as JSON string (real column names for both PostgreSQL and MySQL)
lsnPostgreSQL LSN or MySQL binlog position
timestampTime the event was published to NATS (epoch ms)
sourceTimestampTime the change occurred in the database (epoch ms)

DDL Capture

Both databases support capturing schema changes (CREATE TABLE, ALTER TABLE, DROP TABLE, etc.):

PostgreSQL — enable with app.cdc.postgres.capture-ddl=true. DDL events are published as:

{
  "type": "DDL",
  "schema": "public",
  "table": "orders",
  "data": "{\"ddl\":\"ALTER TABLE orders ADD COLUMN notes TEXT\"}",
  "sourceTimestamp": 1742056200000
}

MySQL — DDL is captured automatically via QUERY binlog events. No extra config needed.

Consuming Events

Node.js

const { connect, consumerOpts } = require('nats')

const nc = await connect({ servers: 'nats://localhost:4222' })
const js = nc.jetstream()

const opts = consumerOpts()
opts.durable('my-service')
opts.deliverAll()
opts.ackExplicit()

const sub = await js.subscribe('cdc.>', opts)
for await (const msg of sub) {
  const event = JSON.parse(msg.data)
  console.log(`[${event.type}] ${event.schema}.${event.table}`, event.data)
  msg.ack()
}

Java (jnats)

Connection nc = Nats.connect("nats://localhost:4222");
JetStream js = nc.jetStream();

PushSubscribeOptions opts = PushSubscribeOptions.builder()
    .durable("order-svc")
    .build();

JetStreamSubscription sub = js.subscribe("cdc.public.orders", opts);

while (true) {
    Message msg = sub.nextMessage(Duration.ofSeconds(30));
    if (msg != null) {
        String json = new String(msg.getData(), StandardCharsets.UTF_8);
        CDCEvent event = objectMapper.readValue(json, CDCEvent.class);

        if ("INSERT".equals(event.getType())) {
            // handle new order
        }
        msg.ack();
    }
}

Scaling

Use a durable consumer to survive restarts — NATS replays missed messages when your service comes back up:

opts.durable('inventory-service')   // survives restarts
opts.deliverAll()                    // replay from beginning on first connect

Use a queue group to load-balance across multiple pods of the same service:

opts.durable('inventory-service')
opts.queue('inventory-service')     // each message delivered to one pod only

Without queue(), every pod gets every message (fan-out — useful for cache invalidation across all instances simultaneously).

GitHub

github.com/excalibase/excalibase-watcher