diff --git a/mise.toml b/mise.toml index a190abba..a439ec37 100644 --- a/mise.toml +++ b/mise.toml @@ -1,2 +1,3 @@ [tools] +java = "21" python = "3.12" diff --git a/oracle-rac/DEPLOY.md b/oracle-rac/DEPLOY.md index 048ec541..894970aa 100644 --- a/oracle-rac/DEPLOY.md +++ b/oracle-rac/DEPLOY.md @@ -25,6 +25,25 @@ fi This is ~14GB. The tar file can be reused across VM recreations. +### 1.2 Pull CMAN image and save to file + +Oracle Connection Manager (CMAN) acts as a proxy for external clients to connect +to RAC through SCAN with proper load balancing. Required because the RAC public +network (podman bridge) is not routable from the host. + +Note: You must accept the CMAN license agreement at https://container-registry.oracle.com/ +(navigate to Database → cman) before pulling. + +```bash +if [ ! -f oracle-rac/assets/cman-23.7.0.0.tar ]; then + docker pull container-registry.oracle.com/database/cman:23.7.0.0 + docker save container-registry.oracle.com/database/cman:23.7.0.0 \ + -o oracle-rac/assets/cman-23.7.0.0.tar +fi +``` + +CMAN 23c is backward compatible with RAC 19c+ databases. + ## Step 2: Create VM ### 2.1 Generate SSH keypair @@ -93,10 +112,11 @@ SSH="ssh -i oracle-rac/assets/vm-key -o StrictHostKeyChecking=no -o UserKnownHos $SSH "growpart /dev/vda 4 && pvresize /dev/vda4 && lvextend -l +100%FREE /dev/mapper/vg_main-lv_root && xfs_growfs /" ``` -### 2.6 Copy RAC image into VM +### 2.6 Copy RAC and CMAN images into VM ```bash scp -i oracle-rac/assets/vm-key oracle-rac/assets/rac-23.26.1.0.tar root@$VM_IP:/root/ +scp -i oracle-rac/assets/vm-key oracle-rac/assets/cman-23.7.0.0.tar root@$VM_IP:/root/ ``` ## Step 3: Configure VM @@ -135,7 +155,75 @@ sed -i 's/vm.nr_hugepages=16384/vm.nr_hugepages=2048/' /etc/sysctl.conf sysctl -w vm.nr_hugepages=2048 ``` -### 3.4 Add swap (if needed to reach 32GB total) +### 3.4 Disable firewall + +The VM runs on a host-only libvirt network. Firewall is unnecessary and blocks +Prometheus scraping and other host-to-VM connections. + +```bash +systemctl stop firewalld +systemctl disable firewalld +``` + +### 3.5 Install node_exporter + +For monitoring RAC VM resources (CPU, memory, disk) during performance tests. + +```bash +cd /tmp +curl -sLO https://github.com/prometheus/node_exporter/releases/download/v1.9.0/node_exporter-1.9.0.linux-amd64.tar.gz +tar xzf node_exporter-1.9.0.linux-amd64.tar.gz +cp node_exporter-1.9.0.linux-amd64/node_exporter /usr/local/bin/ +rm -rf node_exporter-1.9.0.linux-amd64* + +cat > /etc/systemd/system/node_exporter.service <<'EOF' +[Unit] +Description=Prometheus Node Exporter +After=network.target + +[Service] +ExecStart=/usr/local/bin/node_exporter +Restart=always + +[Install] +WantedBy=multi-user.target +EOF + +systemctl daemon-reload +systemctl enable --now node_exporter +``` + +Verify: `curl -s http://localhost:9100/metrics | head -1` + +### 3.6 Install cAdvisor + +For per-container CPU/memory/network/disk metrics (Podman containers). + +```bash +podman run -d --name cadvisor \ + --restart always \ + --privileged \ + -p 9101:8080 \ + -v /:/rootfs:ro \ + -v /dev/disk/:/dev/disk:ro \ + -v /etc/machine-id:/etc/machine-id:ro \ + -v /sys:/sys:ro \ + -v /sys/fs/cgroup:/sys/fs/cgroup:ro \ + -v /var/lib/containers:/var/lib/containers:ro \ + -v /var/run:/var/run:rw \ + gcr.io/cadvisor/cadvisor:latest +``` + +Verify: `curl -s http://localhost:9101/metrics | grep container_cpu | grep -v '^#' | grep -v 'id="/"' | head -1` + +Note: `/var/run` must be mounted read-write for cAdvisor to discover Podman +containers via cgroups. Container names may appear empty in metrics — use the +cgroup `id` label (contains libpod container ID) to identify containers. + +The perf test script (`perf/run.sh`) generates a Prometheus config that scrapes +`VM_IP:9100` (node_exporter) and `VM_IP:9101` (cAdvisor) automatically. + +### 3.7 Add swap (if needed to reach 32GB total) ```bash dd if=/dev/zero of=/swapfile bs=1G count=28 @@ -145,7 +233,7 @@ swapon /swapfile echo "/swapfile swap swap defaults 0 0" >> /etc/fstab ``` -### 3.5 Create ASM block devices +### 3.8 Create ASM block devices ```bash mkdir -p /oradata @@ -180,7 +268,7 @@ systemctl daemon-reload systemctl enable asm-loop-devices ``` -### 3.6 Create Podman networks +### 3.9 Create Podman networks ```bash podman network create --subnet 10.0.20.0/24 rac_pub1_nw @@ -188,11 +276,12 @@ podman network create --subnet 192.168.17.0/24 rac_priv1_nw podman network create --subnet 192.168.18.0/24 rac_priv2_nw ``` -### 3.7 Load RAC image and build DNS server image +### 3.10 Load images and build DNS server ```bash podman load -i /root/rac-23.26.1.0.tar -rm /root/rac-23.26.1.0.tar +podman load -i /root/cman-23.7.0.0.tar +rm /root/rac-23.26.1.0.tar /root/cman-23.7.0.0.tar ``` ```bash @@ -200,7 +289,7 @@ cd /root/docker-images/OracleDatabase/RAC/OracleDNSServer/containerfiles/latest podman build -t oracle/rac-dnsserver:latest . ``` -### 3.8 Create the initsh fix script +### 3.11 Create the initsh fix script Oracle's `/usr/bin/initsh` has a bug: it writes env vars to `/etc/rac_env_vars` without quoting values. When `CRS_NODES` contains semicolons (multi-node separator), @@ -365,6 +454,32 @@ podman network connect rac_priv2_nw --ip 192.168.18.171 racnodep2 podman start racnodep1 podman start racnodep2 echo "RAC nodes started — provisioning takes ~15 minutes" + +# --- CMAN (Connection Manager) --- +# Proxy for external clients to connect through SCAN with load balancing. +# Deploy after RAC nodes are started (CMAN will wait for service registration). +podman create -t -i \ + --hostname racnodepc1-cman \ + --dns-search "example.info" \ + --dns 10.0.20.25 \ + --network=rac_pub1_nw \ + --ip=10.0.20.166 \ + --cap-add=AUDIT_WRITE \ + --cap-add=NET_RAW \ + -e DOMAIN=example.info \ + -e PUBLIC_IP=10.0.20.166 \ + -e DNS_SERVER=10.0.20.25 \ + -e PUBLIC_HOSTNAME=racnodepc1-cman \ + -e SCAN_NAME=racnodepc1-scan \ + -e SCAN_IP=10.0.20.238 \ + --privileged=false \ + -p 1521:1521 \ + --name rac-cman \ + container-registry.oracle.com/database/cman:23.7.0.0 + +podman network disconnect podman rac-cman 2>/dev/null || true +podman start rac-cman +echo "CMAN started — binds VM port 1521 for external access" SCRIPT chmod +x /root/create-rac.sh ``` @@ -390,12 +505,46 @@ ORACLE RAC DATABASE IS READY TO USE ### Verify ```bash -podman ps -a # all 3 containers should show (healthy) +podman ps -a # all 4 containers should show (healthy) podman exec racnodep1 su - oracle -c "srvctl status database -d ORCLCDB" # Expected: Instance ORCLCDB1 is running on node racnodep1 # Instance ORCLCDB2 is running on node racnodep2 ``` +### Register RAC instances with CMAN + +After RAC provisioning completes, register both instances with CMAN so it can +route client connections with SCAN load balancing: + +```bash +for node_sid in "racnodep1:ORCLCDB1" "racnodep2:ORCLCDB2"; do + node=${node_sid%%:*} + sid=${node_sid#*:} + podman exec $node su - oracle -c " +export ORACLE_SID=$sid +sqlplus -S / as sysdba <<'SQL' +ALTER SYSTEM SET remote_listener='racnodepc1-scan:1521,racnodepc1-cman.example.info:1521' SCOPE=BOTH; +ALTER SYSTEM REGISTER; +SQL +" +done +``` + +Verify CMAN sees both instances: + +```bash +podman logs rac-cman 2>&1 | grep "READY TO USE" +``` + +Test external connectivity from the host: + +```bash +sqlplus soe/soe@//VM_IP:1521/ORCLPDB +``` + +All client connections through `VM_IP:1521` are now load-balanced across both +RAC nodes via CMAN + SCAN. + ## Step 6: Migrate Redo Logs to Shared Filesystem After RAC provisioning completes, migrate online redo logs from ASM to the shared @@ -574,7 +723,7 @@ Note: `crsctl stop crs` must run as root (not grid): ```bash podman exec racnodep2 /u01/app/23ai/grid/bin/crsctl stop crs podman exec racnodep1 /u01/app/23ai/grid/bin/crsctl stop crs -podman stop racnodep2 racnodep1 rac-dnsserver +podman stop racnodep2 racnodep1 rac-cman rac-dnsserver ``` If CRS stop hangs, use `-f` to force: @@ -582,7 +731,7 @@ If CRS stop hangs, use `-f` to force: ```bash podman exec racnodep2 /u01/app/23ai/grid/bin/crsctl stop crs -f podman exec racnodep1 /u01/app/23ai/grid/bin/crsctl stop crs -f -podman stop racnodep2 racnodep1 rac-dnsserver +podman stop racnodep2 racnodep1 rac-cman rac-dnsserver ``` ### Shutdown the VM (from host) @@ -617,13 +766,14 @@ ls -la /dev/asm-disk1 /dev/asm-disk2 # ln -sf /dev/loop1 /dev/asm-disk2 ``` -Start containers in order — DNS first, then RAC nodes: +Start containers in order — DNS first, then RAC nodes, then CMAN: ```bash podman start rac-dnsserver sleep 5 podman start racnodep1 podman start racnodep2 +podman start rac-cman ``` Wait for CRS to come online (~2-5 minutes): @@ -695,6 +845,8 @@ rm oracle-rac/assets/OL9-vm.qcow2 | App service | orclpdb_app (connects to ORCLPDB) | | SCAN name | racnodepc1-scan | | Domain | example.info | +| CMAN IP | 10.0.20.166 (on rac_pub1_nw) | +| CMAN external port | VM_IP:1521 (load-balanced to both nodes via SCAN) | ## Files @@ -705,6 +857,7 @@ All dynamic assets live in `oracle-rac/assets/` (gitignored): | `assets/OL9U7_x86_64-kvm-b269.qcow2` | Original OL9 cloud image (~800MB) | Yes — base image, never modified | | `assets/OL9-vm.qcow2` | VM runtime disk (grows to ~60GB+) | No — destroyed on VM recreate | | `assets/rac-23.26.1.0.tar` | RAC container image (~14GB) | Yes — loaded into each new VM | +| `assets/cman-23.7.0.0.tar` | CMAN container image | Yes — loaded into each new VM | | `assets/vm-key` / `vm-key.pub` | SSH keypair | Yes | | `assets/cloud-init.yaml` | Cloud-init user-data | Yes | | `assets/cloud-init.iso` | Cloud-init ISO | Yes — regenerate if yaml changes | diff --git a/tests/debezium/PERF-TEST-PLAN.md b/tests/debezium/PERF-TEST-PLAN.md new file mode 100644 index 00000000..1c5aec3e --- /dev/null +++ b/tests/debezium/PERF-TEST-PLAN.md @@ -0,0 +1,90 @@ +# Debezium Performance Test: OLR vs LogMiner + +## Goal + +Compare Debezium CDC throughput and latency when using the **OLR adapter** vs the +**LogMiner adapter**, both running against the same Oracle RAC instance under +sustained DML pressure. + +## What We're Measuring + +| Metric | Definition | How | +|--------|-----------|-----| +| **Throughput (events/sec)** | Events delivered to HTTP receiver per second | Receiver timestamps each event; compute rate over time windows | +| **End-to-end latency** | Oracle commit → event arrival at receiver | `source.ts_ms` (commit time) vs receiver arrival time | +| **Catch-up time** | Time from connector start to "caught up" (lag < 1s) | Monitor lag over time after cold start with backlog | +| **Sustained lag** | Steady-state lag under continuous pressure | Average latency once caught up | + +## Architecture + +``` +Oracle RAC (2 nodes) + └── PL/SQL DML generator (DBMS_SCHEDULER jobs on both nodes) + └── continuous INSERT/UPDATE/DELETE on BENCH table + +OLR (on RAC VM) + └── reads redo logs → TCP → Debezium OLR adapter → HTTP sink → receiver + +LogMiner adapter + └── queries redo via SQL → HTTP sink → receiver + +Receiver (Python) + └── timestamps each event, computes throughput/latency, exposes /metrics +``` + +## DML Generator + +PL/SQL job running inside Oracle on both RAC nodes simultaneously: + +- Table: `OLR_TEST.BENCH` (id NUMBER, val VARCHAR2(200), node_id NUMBER, created TIMESTAMP) +- Operations: 70% INSERT, 20% UPDATE, 10% DELETE (realistic CDC mix) +- Target rate: configurable, start with ~500 rows/sec per node (1000 total) +- Commit frequency: every 10-50 rows (variable batch size) +- Duration: configurable (default 5 minutes) + +No external tools needed — pure PL/SQL with `DBMS_SCHEDULER`. + +## Test Scenarios + +### 1. Sustained throughput (primary) +- Start DML generator at steady rate +- Let both adapters run for 5 minutes +- Compare: events/sec, average latency, p95 latency + +### 2. Burst + catch-up +- Generate 100K rows with both adapters stopped +- Start both adapters simultaneously +- Measure time to process full backlog + +### 3. Scaling test +- Increase DML rate in steps: 500, 1000, 2000, 5000 rows/sec +- Find the throughput ceiling for each adapter + +## Receiver Enhancements + +Current `debezium-receiver.py` needs: +- Per-event timestamp recording (arrival time) +- Extract `source.ts_ms` from Debezium events for latency calculation +- `/metrics` endpoint returning: event count, events/sec (last 10s window), + avg latency, p50/p95/p99 latency, per-adapter breakdown +- `/metrics/reset` to clear stats between test runs + +## Deliverables + +1. `tests/debezium/perf-test.sh` — orchestrates the full benchmark +2. Enhanced `debezium-receiver.py` — adds latency/throughput metrics +3. PL/SQL generator scripts (run inside Oracle, no external tools) +4. Results output: JSON summary + human-readable table + +## Prerequisites + +- RAC VM running with Oracle operational +- OLR image loaded on VM +- Debezium services (docker-compose) configured +- Both adapters configured to consume from same Oracle PDB + +## Open Questions + +- Should we test single-instance (Oracle XE) as well, or RAC only? +- Do we need Kafka in the path, or is HTTP sink sufficient for comparison? +- Should the generator run for a fixed duration or fixed row count? diff --git a/tests/sql/environments/rac/debezium/config/olr-config.json b/tests/sql/environments/rac/debezium/config/olr-config.json index 31ae9a02..c664fa2e 100644 --- a/tests/sql/environments/rac/debezium/config/olr-config.json +++ b/tests/sql/environments/rac/debezium/config/olr-config.json @@ -20,7 +20,7 @@ "path-mapping": ["/shared/redo", "/shared/redo"], "user": "c##dbzuser", "password": "dbz", - "server": "//10.0.20.170:1521/ORCLPDB" + "server": "//racnodepc1-scan:1521/ORCLPDB" }, "format": { "type": "debezium", diff --git a/tests/sql/environments/rac/debezium/perf/Dockerfile.swingbench b/tests/sql/environments/rac/debezium/perf/Dockerfile.swingbench new file mode 100644 index 00000000..19409985 --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/Dockerfile.swingbench @@ -0,0 +1,16 @@ +FROM eclipse-temurin:21-jre + +ARG SWINGBENCH_URL=https://github.com/domgiles/swingbench-public/releases/download/production/swingbenchlatest.zip + +RUN apt-get update && apt-get install -y --no-install-recommends unzip curl && \ + curl -sL -o /tmp/swingbench.zip "$SWINGBENCH_URL" && \ + unzip -qo /tmp/swingbench.zip -d /opt && \ + rm /tmp/swingbench.zip && \ + chmod +x /opt/swingbench/bin/* && \ + apt-get remove -y unzip curl && apt-get autoremove -y && rm -rf /var/lib/apt/lists/* + +ENV PATH="/opt/swingbench/bin:${PATH}" + +WORKDIR /opt/swingbench + +ENTRYPOINT ["charbench"] diff --git a/tests/sql/environments/rac/debezium/perf/RESULTS.md b/tests/sql/environments/rac/debezium/perf/RESULTS.md new file mode 100644 index 00000000..455be33f --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/RESULTS.md @@ -0,0 +1,114 @@ +# Debezium Performance Test: OLR vs LogMiner on Oracle RAC + +## Test Environment + +| Component | Spec | +|-----------|------| +| Oracle RAC | 2-node 23.26.1.0, Podman containers in KVM VM | +| VM | 16 GB RAM, 8 vCPU, virtio disk (loop-device ASM) | +| Host | Intel NUC, Linux 6.8 | +| OLR | v1.9.0 Debug build (ASAN enabled) | +| Debezium | Server 3.5.0.Beta1, HTTP sink | +| Load generator | Swingbench 2.7 (SOE Order Entry, 0.1 scale) | +| Network | Host-only libvirt network (VM ↔ host) | + +## Methodology + +- Swingbench generates OLTP workload (INSERT/UPDATE/DELETE/SELECT mix) against + Oracle RAC PDB via JDBC from host machine +- Both Debezium adapters (LogMiner and OLR) consume CDC events from the same + Oracle instance simultaneously (tested separately to avoid interference) +- Events delivered to HTTP receiver on host, which timestamps each event +- Latency = `receiver_arrival_time - source.ts_ms` (Oracle commit time) +- Each test: 5 minutes of sustained load, then wait for adapter to catch up + +## Results: LogMiner Adapter + +| Users | ~TPS | Events | Latency p50 | Latency p95 | Status | +|-------|------|--------|-------------|-------------|--------| +| 1 | 300 | 299,732 | 6.6s (stable) | 7.7s | **Keeping up** | +| 2 | 600 | 428,680 | 83s (growing) | 127s | **Falling behind** | + +At 1 user (~300 TPS), LogMiner maintains stable ~6.6s latency — this is the +inherent LogMiner mining cycle delay. At 2 users (~600 TPS), latency grows +linearly from 10s to 83s over 5 minutes, indicating the adapter cannot keep +up with the redo generation rate. + +**LogMiner ceiling: ~300 TPS sustained** on this hardware. + +## Results: OLR Adapter + +| Users | ~TPS | Events | Latency p50 | Latency p95 | Status | +|-------|------|--------|-------------|-------------|--------| +| 1 | 300 | 296,556 | 7.9s (stable) | 9.5s | **Keeping up** | +| 2 | 600 | 432,266 | 8.5s (stable) | 9.7s | **Keeping up** | +| 4 | 1200 | 540,401 | 6.4s (stable) | 7.2s | **Keeping up** | +| 8 | 1700 | 564,362 | 6.5s (stable) | 8.5s | **Keeping up** | + +OLR maintains stable latency across all tested load levels. At 8 users +(~1,700 TPS), latency stays flat at p50=6.5s. + +**OLR did not reach its ceiling** — the Oracle RAC VM became the bottleneck +before OLR saturated. + +## VM Resource Usage at 8 Users + +| Resource | Value | +|----------|-------| +| CPU (us+sy) | ~68% | +| Memory used | 10.5-11 GB / 16 GB | +| Swap used | 8.4 GB | +| Free memory | 160-280 MB | +| Disk I/O (bi/bo) | 15-18K / 18-30K blocks/sec | +| I/O wait | 5-6% | + +The VM is saturated on CPU, memory, and disk. The ~1,400 events/sec throughput +ceiling is Oracle's redo generation limit on this hardware, not OLR's +processing limit. + +## Comparison Summary + +| Metric | LogMiner | OLR | +|--------|----------|-----| +| Max sustained TPS (stable latency) | ~300 | **>1,700** (VM-limited) | +| Latency at 300 TPS | 6.6s | 7.9s | +| Latency at 600 TPS | 83s (diverging) | 8.5s (stable) | +| Catch-up throughput | ~1,000 eps | ~1,400 eps | + +## Key Findings + +1. **OLR sustains 5x+ higher TPS** than LogMiner before latency diverges +2. **LogMiner's ~6s baseline latency** is inherent to its SQL-based mining + cycle — it cannot go lower regardless of load +3. **OLR's ~7s baseline latency** is dominated by Oracle redo flush interval, + not OLR processing time +4. Under sustained load, **LogMiner latency grows linearly** while + **OLR latency stays flat** +5. The **RAC VM is the bottleneck** at higher TPS — OLR's true ceiling was + not reached in these tests + +## Limitations + +- Debug build with ASAN adds overhead — Release build would be faster +- 16 GB VM with loop-device ASM is not representative of production hardware +- Single PDB, small SOE schema (0.1 scale factor) +- HTTP sink adds overhead vs direct Kafka +- LogMiner and OLR tested separately (not simultaneously) + +## Reproducing + +```bash +# Prerequisites: RAC VM running, Swingbench installed, SOE schema created +cd tests/sql/environments/rac/debezium/perf + +# Automated test (starts services, runs Swingbench, collects metrics) +./run.sh 300 8 # 5 min, 8 users + +# Manual testing (as done for these results) +docker compose up -d receiver dbz-logminer # or dbz-olr +# Start OLR on VM if testing OLR adapter +charbench -cs //VM_IP:1521/ORCLPDB -u soe -p soe \ + -c $SWINGBENCH_HOME/configs/SOE_Server_Side_V2.xml \ + -uc 8 -rt 00:05.00 -v users,tps,dml,resp +curl http://localhost:8080/metrics # throughput + latency +``` diff --git a/tests/sql/environments/rac/debezium/perf/VALIDATION-PLAN.md b/tests/sql/environments/rac/debezium/perf/VALIDATION-PLAN.md new file mode 100644 index 00000000..2d5c67ff --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/VALIDATION-PLAN.md @@ -0,0 +1,93 @@ +# Continuous Data Validation Framework + +## Goal + +Run OLR and LogMiner Debezium adapters simultaneously under sustained load, +continuously validate both produce identical events, and stop immediately on +any mismatch — preserving redo logs and event history for replay. + +## Architecture + +``` +Oracle RAC (VM) + └── OLR container (reads redo → TCP:5000) + +Host (docker-compose): + swingbench → continuous OLTP load via CMAN (port 1521) + dbz-logminer → LogMiner adapter → POST /logminer → receiver + dbz-olr → OLR adapter → POST /olr → receiver + receiver → writes logminer.jsonl + olr.jsonl + validator → tails both files, matches events, stops on mismatch +``` + +## Components + +### receiver (existing, no changes needed) +- Writes events to `logminer.jsonl` and `olr.jsonl` +- Provides `/metrics` for throughput/latency monitoring + +### swingbench (new container) +- `Dockerfile.swingbench` — eclipse-temurin:21 + Swingbench +- Connects to Oracle via CMAN (`VM_IP:1521`) +- Configurable users and runtime via env vars / command args +- Stopped by validator on mismatch + +### validator (new) +- Python script that tails both JSONL files +- Extracts match key: `(table, op, sorted(after_columns))` +- Maintains two multisets (one per adapter) +- Match window: events from one adapter are held for N seconds waiting + for the matching event from the other adapter +- On timeout (event in one adapter but not the other): MISMATCH → stop +- On content diff (same key but different values): MISMATCH → stop +- On match: remove from both sets, increment match counter +- Logs progress every 10s: matched count, pending LM, pending OLR + +### On mismatch: +1. Validator sends `docker stop swingbench` (DML stops) +2. Logs the mismatched events with full detail +3. Redo logs on VM are preserved (no log switch) +4. JSONL files preserved for offline replay +5. Exit with non-zero code + +## Docker Compose + +```yaml +services: + receiver: # existing + dbz-logminer: # existing + dbz-olr: # existing + swingbench: + image: swingbench:latest + network_mode: host + command: ["-cs", "//VM_IP:1521/ORCLPDB", "-u", "soe", "-p", "soe", + "-c", "/opt/swingbench/configs/SOE_Server_Side_V2.xml", + "-uc", "4", "-rt", "99:00.00", "-nc", "-nr", "-s"] + validator: + image: python:3.12-slim + network_mode: host + volumes: + - ./output:/app/output:ro + - /var/run/docker.sock:/var/run/docker.sock + command: ["python3", "/app/validator.py", + "--logminer", "/app/output/logminer.jsonl", + "--olr", "/app/output/olr.jsonl", + "--match-window", "60"] +``` + +## Match Key Design + +For INSERT: `(table, "c", hash(sorted(after_columns)))` +For UPDATE: `(table, "u", hash(sorted(before_columns)), hash(sorted(after_columns)))` +For DELETE: `(table, "d", hash(sorted(before_columns)))` + +Using hash of column values (not full content) keeps memory bounded for +long-running tests. Store full content only for recent unmatched events +(within match window) for mismatch reporting. + +## Open Questions + +- Match window duration: 60s? 120s? Depends on max lag between adapters. +- Should validator also check event count periodically? +- Memory management for very long runs (hours/days)? +- Should we also validate ordering within the same table/key? diff --git a/tests/sql/environments/rac/debezium/perf/config/application-logminer.properties b/tests/sql/environments/rac/debezium/perf/config/application-logminer.properties new file mode 100644 index 00000000..ca05634d --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/config/application-logminer.properties @@ -0,0 +1,26 @@ +quarkus.http.port=8081 +debezium.sink.type=http +debezium.sink.http.url=http://localhost:8080/logminer + +debezium.format.value=json +debezium.format.value.schemas.enable=false +debezium.format.key=json +debezium.format.key.schemas.enable=false + +debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector +debezium.source.database.connection.adapter=logminer +debezium.source.database.hostname=192.168.122.130 +debezium.source.database.port=1521 +debezium.source.database.user=c##dbzuser +debezium.source.database.password=dbz +debezium.source.database.dbname=ORCLCDB +debezium.source.database.pdb.name=ORCLPDB +debezium.source.topic.prefix=logminer +debezium.source.schema.include.list=SOE +debezium.source.snapshot.mode=no_data + +debezium.source.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore +debezium.source.offset.storage.file.filename=/debezium/data/offsets.dat +debezium.source.offset.flush.interval.ms=0 +debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory +debezium.source.schema.history.internal.file.filename=/debezium/data/schema-history.dat diff --git a/tests/sql/environments/rac/debezium/perf/config/application-olr.properties b/tests/sql/environments/rac/debezium/perf/config/application-olr.properties new file mode 100644 index 00000000..a7014c7e --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/config/application-olr.properties @@ -0,0 +1,29 @@ +quarkus.http.port=8082 +debezium.sink.type=http +debezium.sink.http.url=http://localhost:8080/olr + +debezium.format.value=json +debezium.format.value.schemas.enable=false +debezium.format.key=json +debezium.format.key.schemas.enable=false + +debezium.source.connector.class=io.debezium.connector.oracle.OracleConnector +debezium.source.database.connection.adapter=olr +debezium.source.openlogreplicator.source=ORCLCDB +debezium.source.openlogreplicator.host=192.168.122.130 +debezium.source.openlogreplicator.port=5000 +debezium.source.database.hostname=192.168.122.130 +debezium.source.database.port=1521 +debezium.source.database.user=c##dbzuser +debezium.source.database.password=dbz +debezium.source.database.dbname=ORCLCDB +debezium.source.database.pdb.name=ORCLPDB +debezium.source.topic.prefix=olr +debezium.source.schema.include.list=SOE +debezium.source.snapshot.mode=no_data + +debezium.source.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore +debezium.source.offset.storage.file.filename=/debezium/data/offsets.dat +debezium.source.offset.flush.interval.ms=0 +debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory +debezium.source.schema.history.internal.file.filename=/debezium/data/schema-history.dat diff --git a/tests/sql/environments/rac/debezium/perf/config/prometheus.yml b/tests/sql/environments/rac/debezium/perf/config/prometheus.yml new file mode 100644 index 00000000..55f81270 --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/config/prometheus.yml @@ -0,0 +1,10 @@ +global: + scrape_interval: 5s + +scrape_configs: + - job_name: 'node-exporter' + static_configs: + - targets: ['192.168.122.130:9100'] + - job_name: 'cadvisor' + static_configs: + - targets: ['192.168.122.130:9101'] diff --git a/tests/sql/environments/rac/debezium/perf/dml-generator.sql b/tests/sql/environments/rac/debezium/perf/dml-generator.sql new file mode 100644 index 00000000..b1fcfd99 --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/dml-generator.sql @@ -0,0 +1,76 @@ +-- DML generator for performance testing. +-- Runs continuous INSERT/UPDATE/DELETE on OLR_TEST.PERF_BENCH. +-- +-- Parameters (substitution variables): +-- &1 = batch size per commit (default 50) +-- &2 = number of batches (default 1000) +-- &3 = node_id (1 or 2) +-- &4 = start ID offset (default 1) +-- +-- Each batch: ~70% INSERT, ~20% UPDATE, ~10% DELETE +-- Commits after each batch for realistic CDC workload. + +SET FEEDBACK OFF +SET SERVEROUTPUT ON + +DECLARE + v_batch_size PLS_INTEGER := &1; + v_batches PLS_INTEGER := &2; + v_node_id PLS_INTEGER := &3; + v_start_id PLS_INTEGER := &4; + v_next_id PLS_INTEGER := v_start_id; + v_insert_cnt PLS_INTEGER := 0; + v_update_cnt PLS_INTEGER := 0; + v_delete_cnt PLS_INTEGER := 0; + v_total PLS_INTEGER := 0; + v_start_ts TIMESTAMP := SYSTIMESTAMP; + v_rand NUMBER; + v_target_id PLS_INTEGER; +BEGIN + FOR batch IN 1..v_batches LOOP + FOR i IN 1..v_batch_size LOOP + v_rand := DBMS_RANDOM.VALUE(0, 1); + + IF v_rand < 0.7 OR v_next_id = v_start_id THEN + -- INSERT (70% or first row) + INSERT INTO olr_test.PERF_BENCH (id, val, node_id, batch_num, created) + VALUES (v_next_id, + DBMS_RANDOM.STRING('x', 100), + v_node_id, + batch, + SYSTIMESTAMP); + v_next_id := v_next_id + 1; + v_insert_cnt := v_insert_cnt + 1; + + ELSIF v_rand < 0.9 THEN + -- UPDATE (20%) — target a recent row + v_target_id := v_start_id + MOD(ABS(DBMS_RANDOM.RANDOM), GREATEST(v_next_id - v_start_id, 1)); + UPDATE olr_test.PERF_BENCH + SET val = DBMS_RANDOM.STRING('x', 100), + batch_num = batch + WHERE id = v_target_id AND node_id = v_node_id; + v_update_cnt := v_update_cnt + 1; + + ELSE + -- DELETE (10%) — target an old row + v_target_id := v_start_id + MOD(ABS(DBMS_RANDOM.RANDOM), GREATEST(v_next_id - v_start_id, 1)); + DELETE FROM olr_test.PERF_BENCH + WHERE id = v_target_id AND node_id = v_node_id; + v_delete_cnt := v_delete_cnt + 1; + END IF; + + v_total := v_total + 1; + END LOOP; + COMMIT; + END LOOP; + + DBMS_OUTPUT.PUT_LINE('PERF_DML_DONE: node=' || v_node_id || + ' inserts=' || v_insert_cnt || + ' updates=' || v_update_cnt || + ' deletes=' || v_delete_cnt || + ' total=' || v_total || + ' batches=' || v_batches || + ' elapsed_ms=' || EXTRACT(SECOND FROM (SYSTIMESTAMP - v_start_ts)) * 1000); +END; +/ +EXIT diff --git a/tests/sql/environments/rac/debezium/perf/docker-compose.yaml b/tests/sql/environments/rac/debezium/perf/docker-compose.yaml new file mode 100644 index 00000000..d460b534 --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/docker-compose.yaml @@ -0,0 +1,98 @@ +services: + receiver: + image: python:3.12-slim + container_name: dbz-receiver + network_mode: host + command: ["python3", "/app/debezium-receiver.py"] + environment: + OUTPUT_DIR: /app/output + volumes: + - ../../../../scripts/debezium-receiver.py:/app/debezium-receiver.py:ro + - ./output:/app/output + + dbz-logminer: + image: quay.io/debezium/server:3.5.0.Beta1 + container_name: dbz-logminer + network_mode: host + depends_on: + receiver: + condition: service_started + volumes: + - ./config/application-logminer.properties:/debezium/config/application.properties:ro + - ../../../../../debezium/lib/ojdbc8.jar:/debezium/lib/ojdbc8.jar:ro + - dbz-logminer-data:/debezium/data + + dbz-olr: + image: quay.io/debezium/server:3.5.0.Beta1 + container_name: dbz-olr-adapter + network_mode: host + restart: unless-stopped + depends_on: + receiver: + condition: service_started + volumes: + - ./config/application-olr.properties:/debezium/config/application.properties:ro + - ../../../../../debezium/lib/ojdbc8.jar:/debezium/lib/ojdbc8.jar:ro + - ../../../../../debezium/lib/debezium-connector-oracle-3.5.0.Beta1.jar:/debezium/lib/debezium-connector-oracle-3.5.0.Beta1.jar:ro + - dbz-olr-data:/debezium/data + + swingbench: + image: swingbench:latest + container_name: perf-swingbench + network_mode: host + # Default: 4 users, run for 99 hours (effectively forever until stopped) + # Override with: docker compose run swingbench -uc 8 -rt 01:00.00 + command: + - "-cs" + - "//192.168.122.130:1521/ORCLPDB" + - "-u" + - "soe" + - "-p" + - "soe" + - "-c" + - "/opt/swingbench/configs/SOE_Server_Side_V2.xml" + - "-uc" + - "${SWINGBENCH_USERS:-4}" + - "-rt" + - "${SWINGBENCH_RUNTIME:-99:00.00}" + - "-nc" + - "-nr" + - "-s" + + validator: + image: python:3.12-slim + container_name: perf-validator + network_mode: host + depends_on: + receiver: + condition: service_started + volumes: + - ./validator.py:/app/validator.py:ro + - ./output:/app/output:ro + - /var/run/docker.sock:/var/run/docker.sock + command: + - "python3" + - "/app/validator.py" + - "--logminer" + - "/app/output/logminer.jsonl" + - "--olr" + - "/app/output/olr.jsonl" + - "--match-window" + - "${MATCH_WINDOW:-120}" + + prometheus: + image: prom/prometheus:latest + container_name: perf-prometheus + network_mode: host + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--web.listen-address=:9090' + - '--storage.tsdb.retention.time=1d' + volumes: + - ./config/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prometheus-data:/prometheus + +volumes: + dbz-logminer-data: + dbz-olr-data: + prometheus-data: diff --git a/tests/sql/environments/rac/debezium/perf/olr-config.json b/tests/sql/environments/rac/debezium/perf/olr-config.json new file mode 100644 index 00000000..a7d7a873 --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/olr-config.json @@ -0,0 +1,49 @@ +{ + "version": "1.9.0", + "log-level": 3, + "state": { + "type": "disk", + "path": "/olr-data/checkpoint", + "interval-s": 30, + "interval-mb": 100 + }, + "memory": { + "min-mb": 128, + "max-mb": 2048 + }, + "source": [ + { + "alias": "SOURCE", + "name": "ORCLCDB", + "reader": { + "type": "online", + "path-mapping": ["/shared/redo", "/shared/redo"], + "user": "c##dbzuser", + "password": "dbz", + "server": "//racnodepc1-scan:1521/ORCLPDB" + }, + "format": { + "type": "debezium", + "scn-type": 1, + "timestamp-type": 1, + "user-type": 0, + "redo-thread": 0 + }, + "filter": { + "table": [ + {"owner": "SOE", "table": ".*"} + ] + } + } + ], + "target": [ + { + "alias": "DEBEZIUM", + "source": "SOURCE", + "writer": { + "type": "network", + "uri": "0.0.0.0:5000" + } + } + ] +} diff --git a/tests/sql/environments/rac/debezium/perf/run.sh b/tests/sql/environments/rac/debezium/perf/run.sh new file mode 100755 index 00000000..52a133fb --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/run.sh @@ -0,0 +1,230 @@ +#!/usr/bin/env bash +# Performance test: Debezium OLR adapter vs LogMiner adapter. +# +# Uses Swingbench (charbench) to generate sustained OLTP load on Oracle RAC +# while both Debezium adapters consume events. Measures throughput and latency. +# +# Usage: ./run.sh [duration_seconds] [swingbench_users] +# duration_seconds Swingbench run time (default: 300) +# swingbench_users Concurrent Swingbench users (default: 8) + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +DEBEZIUM_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" +RAC_ENV_DIR="$(cd "$DEBEZIUM_DIR/.." && pwd)" + +source "$RAC_ENV_DIR/vm-env.sh" + +DURATION="${1:-300}" +SB_USERS="${2:-8}" + +SWINGBENCH_HOME="${SWINGBENCH_HOME:-$HOME/tools/swingbench}" +CHARBENCH="$SWINGBENCH_HOME/bin/charbench" +SB_CONFIG="$SWINGBENCH_HOME/configs/SOE_Server_Side_V2.xml" + +OLR_IMAGE="${OLR_IMAGE:-docker.io/library/olr-dev:latest}" +OLR_CONTAINER="olr-debezium" +RECEIVER_URL="${RECEIVER_URL:-http://localhost:8080}" + +SB_RT=$(printf "%02d:%02d.%02d" $(( DURATION / 3600 )) $(( (DURATION % 3600) / 60 )) $(( DURATION % 60 ))) + +if [[ ! -x "$CHARBENCH" ]]; then + echo "ERROR: Swingbench not found at $SWINGBENCH_HOME" >&2 + exit 1 +fi + +_poll_metrics() { + curl -sf "$RECEIVER_URL/metrics" 2>/dev/null || echo '{}' +} + +_print_metrics() { + local label="$1" + echo "$label" + _poll_metrics | python3 -c " +import json, sys +d = json.load(sys.stdin) +for ch in ('logminer', 'olr'): + m = d.get(ch, {}) + print(f' {ch:10s}: {m.get(\"count\",0):>8d} events | ' + f'{m.get(\"throughput_total_eps\",0):>7.1f} total eps | ' + f'{m.get(\"throughput_10s_eps\",0):>7.1f} 10s eps | ' + f'p50={m.get(\"latency_p50_ms\",0):>7.0f} p95={m.get(\"latency_p95_ms\",0):>7.0f} ms') +" +} + +echo "=== Debezium Performance Test: OLR vs LogMiner ===" +echo " Swingbench: ${SB_USERS} users, ${DURATION}s" +echo " RAC VM: ${VM_HOST}" +echo "" + +# ---- Stage 1: Setup ---- +echo "--- Stage 1: Setup ---" + +# Push perf OLR config to VM and clean state +scp $_SSH_OPTS "$SCRIPT_DIR/olr-config.json" \ + "${VM_USER}@${VM_HOST}:/root/olr-debezium/config/olr-config.json" > /dev/null +ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman rm -f $OLR_CONTAINER 2>/dev/null; rm -rf /root/olr-debezium/checkpoint/*; true" > /dev/null + +# Generate prometheus config with current VM_HOST +# (node_exporter + cAdvisor run on VM, see DEPLOY.md steps 3.5-3.6) +cat > "$SCRIPT_DIR/config/prometheus.yml" </dev/null || true +docker compose up -d receiver prometheus 2>/dev/null +sleep 2 +curl -sf -X POST "$RECEIVER_URL/reset" > /dev/null +echo " Prometheus: http://localhost:9090" +docker compose up -d dbz-logminer 2>/dev/null + +echo " Waiting for LogMiner streaming..." +for i in $(seq 1 90); do + if docker logs dbz-logminer 2>&1 | tail -20 | grep -q "Starting streaming"; then + echo " LogMiner: streaming" + break + fi + if [[ $i -eq 90 ]]; then echo "ERROR: LogMiner timeout" >&2; exit 1; fi + sleep 2 +done + +# Start OLR on VM, then adapter (adapter must connect before OLR processes) +ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman run -d --name $OLR_CONTAINER \ + --user 1000:54335 \ + -p 5000:5000 \ + -v /root/olr-debezium/config:/config:ro,Z \ + -v /root/olr-debezium/checkpoint:/olr-data/checkpoint:Z \ + -v /shared/redo:/shared/redo:ro \ + $OLR_IMAGE \ + -r -f /config/olr-config.json" > /dev/null +sleep 2 +docker compose up -d dbz-olr 2>/dev/null + +echo " Waiting for OLR..." +for i in $(seq 1 90); do + if ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman logs $OLR_CONTAINER 2>&1 | grep -q 'processing redo log'" 2>/dev/null; then + echo " OLR + adapter: ready" + break + fi + if [[ $i -eq 90 ]]; then echo "ERROR: OLR timeout" >&2; exit 1; fi + sleep 2 +done +sleep 5 +cd - > /dev/null +echo "" + +# ---- Stage 2: Run Swingbench ---- +echo "--- Stage 2: Swingbench (${SB_USERS} users, ${DURATION}s) ---" + +"$CHARBENCH" \ + -cs //${VM_HOST}:1521/ORCLPDB \ + -u soe -p soe \ + -c "$SB_CONFIG" \ + -uc "$SB_USERS" \ + -rt "$SB_RT" \ + -nc -nr \ + -v users,tps,dml,resp 2>&1 & +SB_PID=$! + +POLL_START=$(date +%s) +while kill -0 $SB_PID 2>/dev/null; do + ELAPSED=$(( $(date +%s) - POLL_START )) + _print_metrics " [${ELAPSED}s]" + sleep 10 +done +wait $SB_PID || true + +echo "" +echo " Swingbench completed in $(( $(date +%s) - POLL_START ))s" +echo "" + +# ---- Stage 3: Wait for adapters to catch up ---- +echo "--- Stage 3: Wait for adapters to catch up (120s max) ---" +WAIT_START=$(date +%s) +LAST_LM=0 +LAST_OLR=0 +STALL_COUNT=0 + +while true; do + ELAPSED=$(( $(date +%s) - WAIT_START )) + if [[ $ELAPSED -ge 120 ]]; then + echo " Time limit reached" + break + fi + + DATA=$(_poll_metrics) + CUR_LM=$(echo "$DATA" | python3 -c "import json,sys; print(json.load(sys.stdin).get('logminer',{}).get('count',0))" 2>/dev/null || echo 0) + CUR_OLR=$(echo "$DATA" | python3 -c "import json,sys; print(json.load(sys.stdin).get('olr',{}).get('count',0))" 2>/dev/null || echo 0) + + if [[ "$CUR_LM" == "$LAST_LM" && "$CUR_OLR" == "$LAST_OLR" ]]; then + STALL_COUNT=$(( STALL_COUNT + 1 )) + else + STALL_COUNT=0 + fi + LAST_LM="$CUR_LM" + LAST_OLR="$CUR_OLR" + + if [[ $STALL_COUNT -ge 3 && "$CUR_LM" -gt 0 && "$CUR_OLR" -gt 0 ]]; then + echo " Both adapters caught up" + break + fi + + _print_metrics " [${ELAPSED}s]" + sleep 10 +done +echo "" + +# ---- Stage 4: Results ---- +echo "--- Stage 4: Results ---" +FINAL=$(_poll_metrics) +echo "$FINAL" | python3 -c " +import json, sys + +d = json.load(sys.stdin) + +print('=' * 80) +print(f'{\"Metric\":30s} {\"LogMiner\":>20s} {\"OLR\":>20s}') +print('=' * 80) + +lm = d.get('logminer', {}) +olr = d.get('olr', {}) + +rows = [ + ('Events captured', f'{lm.get(\"count\",0):,}', f'{olr.get(\"count\",0):,}'), + ('Throughput (total eps)', f'{lm.get(\"throughput_total_eps\",0):.1f}', f'{olr.get(\"throughput_total_eps\",0):.1f}'), + ('Throughput (10s eps)', f'{lm.get(\"throughput_10s_eps\",0):.1f}', f'{olr.get(\"throughput_10s_eps\",0):.1f}'), + ('Latency avg (ms)', f'{lm.get(\"latency_avg_ms\",0):.1f}', f'{olr.get(\"latency_avg_ms\",0):.1f}'), + ('Latency p50 (ms)', f'{lm.get(\"latency_p50_ms\",0):.1f}', f'{olr.get(\"latency_p50_ms\",0):.1f}'), + ('Latency p95 (ms)', f'{lm.get(\"latency_p95_ms\",0):.1f}', f'{olr.get(\"latency_p95_ms\",0):.1f}'), + ('Latency p99 (ms)', f'{lm.get(\"latency_p99_ms\",0):.1f}', f'{olr.get(\"latency_p99_ms\",0):.1f}'), + ('Latency min (ms)', f'{lm.get(\"latency_min_ms\",0):.1f}', f'{olr.get(\"latency_min_ms\",0):.1f}'), + ('Latency max (ms)', f'{lm.get(\"latency_max_ms\",0):.1f}', f'{olr.get(\"latency_max_ms\",0):.1f}'), +] + +for label, lm_val, olr_val in rows: + print(f'{label:30s} {lm_val:>20s} {olr_val:>20s}') + +print('=' * 80) +" + +# Cleanup +cd "$SCRIPT_DIR" +docker compose down -v 2>/dev/null || true +cd - > /dev/null +ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman rm -f $OLR_CONTAINER 2>/dev/null" > /dev/null || true + +echo "" +echo "=== Performance test complete ===" diff --git a/tests/sql/environments/rac/debezium/perf/setup.sql b/tests/sql/environments/rac/debezium/perf/setup.sql new file mode 100644 index 00000000..7eed2783 --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/setup.sql @@ -0,0 +1,28 @@ +-- Setup for performance testing. +-- Creates the benchmark table and supplemental logging. + +SET FEEDBACK OFF +SET SERVEROUTPUT ON + +BEGIN EXECUTE IMMEDIATE 'DROP TABLE olr_test.PERF_BENCH PURGE'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -942 THEN RAISE; END IF; END; +/ + +CREATE TABLE olr_test.PERF_BENCH ( + id NUMBER, + val VARCHAR2(200), + node_id NUMBER(1), + batch_num NUMBER, + created TIMESTAMP DEFAULT SYSTIMESTAMP, + CONSTRAINT perf_bench_pk PRIMARY KEY (id, node_id) +); +ALTER TABLE olr_test.PERF_BENCH ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +DECLARE + v_scn NUMBER; +BEGIN + v_scn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER; + DBMS_OUTPUT.PUT_LINE('PERF_SCN_START: ' || v_scn); +END; +/ + +EXIT diff --git a/tests/sql/environments/rac/debezium/perf/validator.py b/tests/sql/environments/rac/debezium/perf/validator.py new file mode 100644 index 00000000..7c705358 --- /dev/null +++ b/tests/sql/environments/rac/debezium/perf/validator.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +"""Real-time validator: tails LogMiner and OLR JSONL files, matches events. + +Stops the swingbench container on mismatch. Designed for long-running +continuous validation of OLR vs LogMiner data correctness. + +Usage: + python3 validator.py --logminer output/logminer.jsonl --olr output/olr.jsonl +""" + +import argparse +import json +import os +import subprocess +import sys +import time +from collections import defaultdict + +SENTINEL_TABLE = 'DEBEZIUM_SENTINEL' +POLL_INTERVAL = 1.0 # seconds between file polls +REPORT_INTERVAL = 10.0 # seconds between progress reports +DEFAULT_MATCH_WINDOW = 120 # seconds to wait for matching event + + +def normalize_value(v): + if v is None: + return None + return str(v) + + +def event_key(event): + """Extract a content-based match key from a Debezium event.""" + source = event.get('source', {}) + table = source.get('table', '') + op = event.get('op', '') + + if table == SENTINEL_TABLE: + return None # skip sentinel + + after = event.get('after') or {} + before = event.get('before') or {} + + after_norm = tuple(sorted((k, normalize_value(v)) for k, v in after.items())) + before_norm = tuple(sorted((k, normalize_value(v)) for k, v in before.items())) + + if op == 'c': + return (table, op, after_norm) + elif op == 'u': + return (table, op, before_norm, after_norm) + elif op == 'd': + return (table, op, before_norm) + else: + return None # skip unknown ops (heartbeats, etc.) + + +def tail_file(path, position): + """Read new lines from file starting at position. Returns (lines, new_position).""" + try: + size = os.path.getsize(path) + except OSError: + return [], position + + if size <= position: + return [], position + + lines = [] + with open(path, 'r') as f: + f.seek(position) + for line in f: + line = line.strip() + if line: + lines.append(line) + new_position = f.tell() + return lines, new_position + + +def stop_swingbench(): + """Stop the swingbench container via Docker socket.""" + import socket + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect('/var/run/docker.sock') + request = ( + 'POST /v1.40/containers/perf-swingbench/stop HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Content-Length: 0\r\n' + '\r\n' + ) + sock.sendall(request.encode()) + response = sock.recv(4096).decode() + sock.close() + if '204' in response or '304' in response: + print(' Swingbench stopped', flush=True) + else: + print(f' WARNING: Unexpected response: {response[:100]}', flush=True) + except Exception as e: + print(f' WARNING: Failed to stop swingbench: {e}', flush=True) + + +def main(): + parser = argparse.ArgumentParser(description='Real-time OLR vs LogMiner validator') + parser.add_argument('--logminer', required=True, help='Path to logminer.jsonl') + parser.add_argument('--olr', required=True, help='Path to olr.jsonl') + parser.add_argument('--match-window', type=int, default=DEFAULT_MATCH_WINDOW, + help=f'Seconds to wait for matching event (default: {DEFAULT_MATCH_WINDOW})') + parser.add_argument('--no-stop-on-fail', dest='stop_on_fail', action='store_false', default=True, + help='Do not stop swingbench on mismatch') + args = parser.parse_args() + + print(f'Validator starting', flush=True) + print(f' LogMiner: {args.logminer}', flush=True) + print(f' OLR: {args.olr}', flush=True) + print(f' Match window: {args.match_window}s', flush=True) + print(flush=True) + + # Pending events: key -> [(timestamp, event_json), ...] + # When both sides produce the same key, they cancel out (match). + # Using lists to handle duplicate events (same key can appear multiple times). + lm_pending = {} # key -> [(timestamp, event_json), ...] + olr_pending = {} # key -> [(timestamp, event_json), ...] + + lm_pos = 0 + olr_pos = 0 + matched = 0 + lm_total = 0 + olr_total = 0 + skipped = 0 + last_report = time.time() + + while True: + now = time.time() + + # Tail both files + lm_lines, lm_pos = tail_file(args.logminer, lm_pos) + olr_lines, olr_pos = tail_file(args.olr, olr_pos) + + # Process LogMiner events + for line in lm_lines: + try: + event = json.loads(line) + except json.JSONDecodeError: + continue + key = event_key(event) + if key is None: + skipped += 1 + continue + lm_total += 1 + + if key in olr_pending and olr_pending[key]: + # Match found — OLR already has this event + olr_pending[key].pop(0) + if not olr_pending[key]: + del olr_pending[key] + matched += 1 + else: + lm_pending.setdefault(key, []).append((now, line)) + + # Process OLR events + for line in olr_lines: + try: + event = json.loads(line) + except json.JSONDecodeError: + continue + key = event_key(event) + if key is None: + skipped += 1 + continue + olr_total += 1 + + if key in lm_pending and lm_pending[key]: + # Match found — LogMiner already has this event + lm_pending[key].pop(0) + if not lm_pending[key]: + del lm_pending[key] + matched += 1 + else: + olr_pending.setdefault(key, []).append((now, line)) + + # Check for expired events (exceeded match window) + expired_lm = [(k, ts, line) for k, entries in lm_pending.items() + for ts, line in entries if now - ts > args.match_window] + expired_olr = [(k, ts, line) for k, entries in olr_pending.items() + for ts, line in entries if now - ts > args.match_window] + + if expired_lm or expired_olr: + print(flush=True) + print('!!! MISMATCH DETECTED !!!', flush=True) + print(f' Matched so far: {matched}', flush=True) + print(f' LogMiner total: {lm_total}, OLR total: {olr_total}', flush=True) + print(f' LogMiner pending: {sum(len(v) for v in lm_pending.values())}, OLR pending: {sum(len(v) for v in olr_pending.values())}', flush=True) + print(flush=True) + + if expired_lm: + print(f' Events in LogMiner but NOT in OLR ({len(expired_lm)} expired):', flush=True) + for key, ts, line in expired_lm[:5]: + age = now - ts + print(f' [{age:.0f}s old] table={key[0]} op={key[1]}', flush=True) + print(f' {line[:200]}', flush=True) + + if expired_olr: + print(f' Events in OLR but NOT in LogMiner ({len(expired_olr)} expired):', flush=True) + for key, ts, line in expired_olr[:5]: + age = now - ts + print(f' [{age:.0f}s old] table={key[0]} op={key[1]}', flush=True) + print(f' {line[:200]}', flush=True) + + if args.stop_on_fail: + stop_swingbench() + + print(flush=True) + print('VALIDATION FAILED', flush=True) + sys.exit(1) + + # Progress report + if now - last_report >= REPORT_INTERVAL: + print(f'[{time.strftime("%H:%M:%S")}] ' + f'matched={matched:,} ' + f'lm={lm_total:,} olr={olr_total:,} ' + f'pending: lm={sum(len(v) for v in lm_pending.values()):,} olr={sum(len(v) for v in olr_pending.values()):,} ' + f'skipped={skipped:,}', + flush=True) + last_report = now + + time.sleep(POLL_INTERVAL) + + +if __name__ == '__main__': + main() diff --git a/tests/sql/scripts/debezium-receiver.py b/tests/sql/scripts/debezium-receiver.py index b41274fc..5f29c660 100755 --- a/tests/sql/scripts/debezium-receiver.py +++ b/tests/sql/scripts/debezium-receiver.py @@ -9,12 +9,14 @@ POST /logminer — append event(s) to logminer.jsonl POST /olr — append event(s) to olr.jsonl GET /status — return event counts and sentinel detection status + GET /metrics — return throughput and latency statistics per adapter POST /reset — clear all state for next scenario """ import json import os import sys +import time import threading from http.server import HTTPServer, BaseHTTPRequestHandler @@ -32,6 +34,20 @@ logminer_file = None olr_file = None +# Per-adapter metrics for throughput and latency +metrics = { + 'logminer': { + 'latencies_ms': [], # list of (arrival_ms - source_ts_ms) values + 'timestamps': [], # arrival timestamps for throughput calc + 'first_event_time': None, # wall clock of first event + }, + 'olr': { + 'latencies_ms': [], + 'timestamps': [], + 'first_event_time': None, + }, +} + def open_files(): global logminer_file, olr_file @@ -48,6 +64,11 @@ def reset_state(): state['logminer_sentinel'] = False state['olr_sentinel'] = False + for ch in ('logminer', 'olr'): + metrics[ch]['latencies_ms'] = [] + metrics[ch]['timestamps'] = [] + metrics[ch]['first_event_time'] = None + if logminer_file: logminer_file.close() if olr_file: @@ -66,7 +87,6 @@ def is_sentinel(event): """Check if event is a sentinel table insert.""" if not isinstance(event, dict): return False - # Debezium envelope: source.table source = event.get('source', {}) table = source.get('table', '') op = event.get('op', '') @@ -75,7 +95,8 @@ def is_sentinel(event): def process_events(body, channel): """Parse and store events from HTTP POST body.""" - # Debezium HTTP sink may send single event or array + arrival_ms = time.time() * 1000 + try: data = json.loads(body) except json.JSONDecodeError: @@ -87,6 +108,7 @@ def process_events(body, channel): f = logminer_file if channel == 'logminer' else olr_file count_key = f'{channel}_count' sentinel_key = f'{channel}_sentinel' + m = metrics[channel] for event in events: if not isinstance(event, dict): @@ -95,12 +117,74 @@ def process_events(body, channel): f.flush() state[count_key] += 1 + # Track arrival time for throughput + m['timestamps'].append(arrival_ms) + if m['first_event_time'] is None: + m['first_event_time'] = arrival_ms + + # Track latency: source.ts_ms is Oracle commit time (epoch ms) + source_ts = event.get('source', {}).get('ts_ms') + if source_ts is not None: + try: + latency = arrival_ms - float(source_ts) + if latency >= 0: + m['latencies_ms'].append(latency) + except (TypeError, ValueError): + pass + if is_sentinel(event): state[sentinel_key] = True return len(events) +def compute_metrics(channel): + """Compute throughput and latency stats for a channel. Caller holds lock.""" + m = metrics[channel] + count = state[f'{channel}_count'] + now_ms = time.time() * 1000 + + result = { + 'count': count, + 'throughput_total_eps': 0.0, + 'throughput_10s_eps': 0.0, + 'latency_avg_ms': 0.0, + 'latency_p50_ms': 0.0, + 'latency_p95_ms': 0.0, + 'latency_p99_ms': 0.0, + 'latency_min_ms': 0.0, + 'latency_max_ms': 0.0, + } + + # Overall throughput + if m['first_event_time'] is not None and count > 0: + elapsed_s = (now_ms - m['first_event_time']) / 1000.0 + if elapsed_s > 0: + result['throughput_total_eps'] = round(count / elapsed_s, 1) + + # 10-second window throughput + cutoff = now_ms - 10000 + recent = [t for t in m['timestamps'] if t >= cutoff] + if len(recent) >= 1: + window_s = (now_ms - cutoff) / 1000.0 + if window_s > 0: + result['throughput_10s_eps'] = round(len(recent) / window_s, 1) + + # Latency percentiles + lats = m['latencies_ms'] + if lats: + sorted_lats = sorted(lats) + n = len(sorted_lats) + result['latency_avg_ms'] = round(sum(sorted_lats) / n, 1) + result['latency_min_ms'] = round(sorted_lats[0], 1) + result['latency_max_ms'] = round(sorted_lats[-1], 1) + result['latency_p50_ms'] = round(sorted_lats[int(n * 0.50)], 1) + result['latency_p95_ms'] = round(sorted_lats[int(min(n * 0.95, n - 1))], 1) + result['latency_p99_ms'] = round(sorted_lats[int(min(n * 0.99, n - 1))], 1) + + return result + + class Handler(BaseHTTPRequestHandler): def do_POST(self): content_length = int(self.headers.get('Content-Length', 0)) @@ -137,6 +221,17 @@ def do_GET(self): self.end_headers() self.wfile.write(body.encode()) + elif self.path == '/metrics': + with lock: + body = json.dumps({ + 'logminer': compute_metrics('logminer'), + 'olr': compute_metrics('olr'), + }) + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + self.wfile.write(body.encode()) + elif self.path == '/health': self.send_response(200) self.end_headers() @@ -147,7 +242,6 @@ def do_GET(self): self.end_headers() def log_message(self, format, *args): - # Log requests for debugging sys.stderr.write("%s - - [%s] %s\n" % (self.client_address[0], self.log_date_time_string(),