diff --git a/src/parser/Parser.cpp b/src/parser/Parser.cpp index 8298acfe..e999c777 100644 --- a/src/parser/Parser.cpp +++ b/src/parser/Parser.cpp @@ -1398,6 +1398,12 @@ namespace OpenLogReplicator { blockOffset = 16U; // New LWN block if (currentBlock == lwnEndBlock) { + // Online redo retry: if LWN validation fails, re-read from disk and + // revalidate. Oracle may be mid-write — the Reader's cached copy can + // be stale. Max 300 retries * 100ms = 30s timeout. + static constexpr int MAX_REREAD_ATTEMPTS = 300; + int lwnRereadAttempts = 0; + lwnBlockRetry: const uint8_t vld = redoBlock[blockOffset + 4U]; if (likely((vld & 0x04) != 0)) { @@ -1416,13 +1422,35 @@ namespace OpenLogReplicator { lwnCheckpointBlock = currentBlock; lwnNumMax = ctx->read16(redoBlock + blockOffset + 26U); // Verify LWN header start - if (unlikely(lwnScn < reader->getFirstScn() || (lwnScn > reader->getNextScn() && reader->getNextScn() != Scn::none()))) + if (unlikely(lwnScn < reader->getFirstScn() || (lwnScn > reader->getNextScn() && reader->getNextScn() != Scn::none()))) { + if (group != 0 && lwnRereadAttempts < MAX_REREAD_ATTEMPTS) { + ++lwnRereadAttempts; + ctx->usleepInt(100000); + const auto ret = reader->reReadAndValidate( + reader->redoBufferList[redoBufferNum] + redoBufferPos, + static_cast(currentBlock) * reader->getBlockSize(), currentBlock); + if (ret == Reader::REDO_CODE::OVERWRITTEN) { reader->setRet(Reader::REDO_CODE::OVERWRITTEN); break; } + if (ret == Reader::REDO_CODE::OK) goto lwnBlockRetry; + goto lwnBlockRetry; // EMPTY/ERROR_CRC/ERROR_READ — keep trying + } throw RedoLogException(50049, "invalid lwn scn: " + lwnScn.toString()); + } } else { const typeLwn lwnNumCur = ctx->read16(redoBlock + blockOffset + 26U); - if (unlikely(lwnNumCur != lwnNumMax)) + if (unlikely(lwnNumCur != lwnNumMax)) { + if (group != 0 && lwnRereadAttempts < MAX_REREAD_ATTEMPTS) { + ++lwnRereadAttempts; + ctx->usleepInt(100000); + const auto ret = reader->reReadAndValidate( + reader->redoBufferList[redoBufferNum] + redoBufferPos, + static_cast(currentBlock) * reader->getBlockSize(), currentBlock); + if (ret == Reader::REDO_CODE::OVERWRITTEN) { reader->setRet(Reader::REDO_CODE::OVERWRITTEN); break; } + if (ret == Reader::REDO_CODE::OK) goto lwnBlockRetry; + goto lwnBlockRetry; + } throw RedoLogException(50050, "invalid lwn max: " + std::to_string(lwnNum) + "/" + std::to_string(lwnNumCur) + "/" + std::to_string(lwnNumMax)); + } } ++lwnNumCnt; @@ -1431,8 +1459,19 @@ namespace OpenLogReplicator { ctx->logTrace(Ctx::TRACE::LWN, "at: " + std::to_string(lwnStartBlock) + " size: " + std::to_string(lwnSize) + " chk: " + std::to_string(lwnNum) + " max: " + std::to_string(lwnNumMax)); } - } else + } else { + if (group != 0 && lwnRereadAttempts < MAX_REREAD_ATTEMPTS) { + ++lwnRereadAttempts; + ctx->usleepInt(100000); + const auto ret = reader->reReadAndValidate( + reader->redoBufferList[redoBufferNum] + redoBufferPos, + static_cast(currentBlock) * reader->getBlockSize(), currentBlock); + if (ret == Reader::REDO_CODE::OVERWRITTEN) { reader->setRet(Reader::REDO_CODE::OVERWRITTEN); break; } + if (ret == Reader::REDO_CODE::OK) goto lwnBlockRetry; + goto lwnBlockRetry; + } throw RedoLogException(50051, "did not find lwn at offset: " + confirmedBufferStart.toString()); + } } while (blockOffset < reader->getBlockSize()) { @@ -1456,8 +1495,33 @@ namespace OpenLogReplicator { *recordSize = sizeof(uint64_t); } - if (unlikely(((*recordSize + sizeof(LwnMember) + recordSize4 + 7) & 0xFFFFFFF8) > Ctx::MEMORY_CHUNK_SIZE_MB * 1024 * 1024)) - throw RedoLogException(50053, "too big redo log record, size: " + std::to_string(recordSize4)); + if (unlikely(((*recordSize + sizeof(LwnMember) + recordSize4 + 7) & 0xFFFFFFF8) > Ctx::MEMORY_CHUNK_SIZE_MB * 1024 * 1024)) { + if (group != 0) { + // Online redo: record size may be mid-write. Re-read and validate. + static constexpr int MAX_REREAD_ATTEMPTS_REC = 300; + bool resolved = false; + for (int attempt = 0; attempt < MAX_REREAD_ATTEMPTS_REC; ++attempt) { + ctx->usleepInt(100000); + const auto ret = reader->reReadAndValidate( + reader->redoBufferList[redoBufferNum] + redoBufferPos, + static_cast(currentBlock) * reader->getBlockSize(), currentBlock); + if (ret == Reader::REDO_CODE::OVERWRITTEN) { reader->setRet(Reader::REDO_CODE::OVERWRITTEN); break; } + if (ret != Reader::REDO_CODE::OK) continue; // keep waiting + recordSize4 = (static_cast(ctx->read32(redoBlock + blockOffset)) + 3U) & 0xFFFFFFFC; + if (recordSize4 > 0 && + ((*recordSize + sizeof(LwnMember) + recordSize4 + 7) & 0xFFFFFFF8) <= Ctx::MEMORY_CHUNK_SIZE_MB * 1024 * 1024) { + resolved = true; + break; + } + } + if (reader->getRet() == Reader::REDO_CODE::OVERWRITTEN) break; + if (!resolved) + throw RedoLogException(50053, "timeout waiting for valid redo record size at offset: " + + confirmedBufferStart.toString() + ", size: " + std::to_string(recordSize4) + + " (re-read from disk over 30s)"); + } else + throw RedoLogException(50053, "too big redo log record, size: " + std::to_string(recordSize4)); + } lwnMember = reinterpret_cast(lwnChunks[lwnAllocated - 1] + *recordSize); *recordSize += (sizeof(LwnMember) + recordSize4 + 7) & 0xFFFFFFF8; diff --git a/src/reader/Reader.cpp b/src/reader/Reader.cpp index 39dc00bf..7ff06716 100644 --- a/src/reader/Reader.cpp +++ b/src/reader/Reader.cpp @@ -760,6 +760,13 @@ namespace OpenLogReplicator { } } + Reader::REDO_CODE Reader::reReadAndValidate(uint8_t* buffer, uint64_t offset, typeBlk blockNumber) { + const int bytesRead = redoRead(buffer, offset, blockSize); + if (bytesRead != static_cast(blockSize)) + return REDO_CODE::ERROR_READ; + return checkBlockHeader(buffer, blockNumber, false); + } + typeSum Reader::calcChSum(uint8_t* buffer, uint size) const { const typeSum oldChSum = ctx->read16(buffer + 14); uint64_t sum = 0; diff --git a/src/reader/Reader.h b/src/reader/Reader.h index dc6273e0..11bc4ba5 100644 --- a/src/reader/Reader.h +++ b/src/reader/Reader.h @@ -137,6 +137,7 @@ namespace OpenLogReplicator { void bufferFree(Thread* t, uint num); bool bufferIsFree(); typeSum calcChSum(uint8_t* buffer, uint size) const; + REDO_CODE reReadAndValidate(uint8_t* buffer, uint64_t offset, typeBlk blockNumber); void printHeaderInfo(std::ostringstream& ss, const std::string& path) const; [[nodiscard]] uint getBlockSize() const; [[nodiscard]] FileOffset getBufferStart() const; diff --git a/tests/debezium/PERF-TEST-PLAN.md b/tests/debezium/PERF-TEST-PLAN.md deleted file mode 100644 index 1c5aec3e..00000000 --- a/tests/debezium/PERF-TEST-PLAN.md +++ /dev/null @@ -1,90 +0,0 @@ -# Debezium Performance Test: OLR vs LogMiner - -## Goal - -Compare Debezium CDC throughput and latency when using the **OLR adapter** vs the -**LogMiner adapter**, both running against the same Oracle RAC instance under -sustained DML pressure. - -## What We're Measuring - -| Metric | Definition | How | -|--------|-----------|-----| -| **Throughput (events/sec)** | Events delivered to HTTP receiver per second | Receiver timestamps each event; compute rate over time windows | -| **End-to-end latency** | Oracle commit → event arrival at receiver | `source.ts_ms` (commit time) vs receiver arrival time | -| **Catch-up time** | Time from connector start to "caught up" (lag < 1s) | Monitor lag over time after cold start with backlog | -| **Sustained lag** | Steady-state lag under continuous pressure | Average latency once caught up | - -## Architecture - -``` -Oracle RAC (2 nodes) - └── PL/SQL DML generator (DBMS_SCHEDULER jobs on both nodes) - └── continuous INSERT/UPDATE/DELETE on BENCH table - -OLR (on RAC VM) - └── reads redo logs → TCP → Debezium OLR adapter → HTTP sink → receiver - -LogMiner adapter - └── queries redo via SQL → HTTP sink → receiver - -Receiver (Python) - └── timestamps each event, computes throughput/latency, exposes /metrics -``` - -## DML Generator - -PL/SQL job running inside Oracle on both RAC nodes simultaneously: - -- Table: `OLR_TEST.BENCH` (id NUMBER, val VARCHAR2(200), node_id NUMBER, created TIMESTAMP) -- Operations: 70% INSERT, 20% UPDATE, 10% DELETE (realistic CDC mix) -- Target rate: configurable, start with ~500 rows/sec per node (1000 total) -- Commit frequency: every 10-50 rows (variable batch size) -- Duration: configurable (default 5 minutes) - -No external tools needed — pure PL/SQL with `DBMS_SCHEDULER`. - -## Test Scenarios - -### 1. Sustained throughput (primary) -- Start DML generator at steady rate -- Let both adapters run for 5 minutes -- Compare: events/sec, average latency, p95 latency - -### 2. Burst + catch-up -- Generate 100K rows with both adapters stopped -- Start both adapters simultaneously -- Measure time to process full backlog - -### 3. Scaling test -- Increase DML rate in steps: 500, 1000, 2000, 5000 rows/sec -- Find the throughput ceiling for each adapter - -## Receiver Enhancements - -Current `debezium-receiver.py` needs: -- Per-event timestamp recording (arrival time) -- Extract `source.ts_ms` from Debezium events for latency calculation -- `/metrics` endpoint returning: event count, events/sec (last 10s window), - avg latency, p50/p95/p99 latency, per-adapter breakdown -- `/metrics/reset` to clear stats between test runs - -## Deliverables - -1. `tests/debezium/perf-test.sh` — orchestrates the full benchmark -2. Enhanced `debezium-receiver.py` — adds latency/throughput metrics -3. PL/SQL generator scripts (run inside Oracle, no external tools) -4. Results output: JSON summary + human-readable table - -## Prerequisites - -- RAC VM running with Oracle operational -- OLR image loaded on VM -- Debezium services (docker-compose) configured -- Both adapters configured to consume from same Oracle PDB - -## Open Questions - -- Should we test single-instance (Oracle XE) as well, or RAC only? -- Do we need Kafka in the path, or is HTTP sink sufficient for comparison? -- Should the generator run for a fixed duration or fixed row count? diff --git a/tests/design/REFACTOR-PLAN.md b/tests/design/REFACTOR-PLAN.md new file mode 100644 index 00000000..328af60f --- /dev/null +++ b/tests/design/REFACTOR-PLAN.md @@ -0,0 +1,170 @@ +# Test Framework Refactor Plan + +## Current Problems + +1. **Scattered layout** — Debezium tests split across `tests/debezium/` and + `tests/sql/environments/rac/debezium/` with duplicated configs and scripts +2. **Multiple docker-compose files** for the same services with slightly different + configs (twin-test vs perf vs checkpoint-restart) +3. **OLR started manually via SSH** in every RAC test script — duplicated + `podman run` commands with subtle differences (DNS, network, ports) +4. **No shared entry point** — each test type has its own startup/cleanup + conventions, easy to miss steps (e.g., Prometheus not started) +5. **Environment configs mixed with test logic** — OLR configs, Debezium + properties, and Oracle init scripts scattered across test directories +6. **Pytest files at wrong level** — `test_e2e.py` and `test_fixtures.py` + at `tests/` root but logically belong to `sql/` and `fixtures/` + +## Target Structure + +``` +tests/ + environments/ # Shared Oracle environments + free-23/ + docker-compose.yaml + oracle-init/ + .env + xe-21/ + docker-compose.yaml + oracle-init/ + .env + xe-21-official/ + ... + enterprise-19/ + ... + rac/ + vm-env.sh # Auto-detect VM IP, validate configs + up.sh # Verify VM + Oracle reachable + down.sh + .env + olr.sh # Shared OLR start/stop on VM (single source) + + sql/ # SQL e2e fixture generation + inputs/ # *.sql and *.rac.sql scenarios + scripts/ + generate.sh # 7-stage pipeline + compare.py # Content-based comparison + logminer2json.py + drivers/ + base.sh + docker.sh + local.sh + rac.sh + generated/ # gitignored output + test_e2e.py # pytest entry: SQL e2e + conftest.py # SQL-specific pytest config + + fixtures/ # Redo log regression (batch replay) + *.tar.gz # Pre-captured archives + test_fixtures.py # pytest entry: redo regression + conftest.py # Fixtures-specific pytest config + + dbz-twin/ # Debezium twin-test (LogMiner vs OLR) + debezium-receiver.py # HTTP receiver (shared by twin + perf) + compare-debezium.py # Event comparison + docker-compose.yaml # Single-instance: receiver + adapters + config/ + olr-config.json # Single-instance OLR config + application-logminer.properties + application-olr.properties + run.sh # Run twin-test per scenario + checkpoint-restart-test.sh # Single-instance fault tolerance + rac/ # RAC extensions + docker-compose.yaml # RAC: receiver + adapters (host network) + config/ + olr-config.json # RAC OLR config (SCAN connection) + application-logminer.properties + application-olr.properties + checkpoint-restart-test.sh # RAC fault tolerance + soak-test.sh # 2-hour soak wrapper + perf/ # Performance + durability (extends dbz-twin) + docker-compose.yaml # Adds: swingbench, validator, prometheus + validator.py # Real-time event matching + Dockerfile.swingbench + run.sh # Manual perf orchestrator + config/ + olr-config.json # Perf OLR config (SOE schema) + application-logminer.properties + application-olr.properties + prometheus.yml + + conftest.py # Root pytest config (shared markers, CLI args) + pytest.ini # Pytest configuration + README.md # Test framework documentation +``` + +## Key Changes + +### 1. Move environments up +- `tests/sql/environments/*` → `tests/environments/*` +- All test types reference `tests/environments/` + +### 2. Consolidate Debezium tests +- `tests/debezium/` → `tests/dbz-twin/` (single-instance) +- `tests/sql/environments/rac/debezium/` → `tests/dbz-twin/rac/` +- `tests/sql/environments/rac/debezium/perf/` → `tests/dbz-twin/perf/` +- `debezium-receiver.py` moves to `tests/dbz-twin/` (shared by twin + perf) + +### 3. Shared OLR helper for RAC +- New `tests/environments/rac/olr.sh` with functions: + - `olr_start ` — podman run with DNS, network, volumes + - `olr_stop` — podman rm + - `olr_wait_ready` — poll logs for "processing redo log" + - `olr_logs` — podman logs +- All RAC test scripts source this instead of duplicating podman commands + +### 4. Move pytest files +- `tests/test_e2e.py` → `tests/sql/test_e2e.py` +- `tests/test_fixtures.py` → `tests/fixtures/test_fixtures.py` +- `tests/conftest.py` splits: shared part stays at root, specific parts + move to `sql/conftest.py` and `fixtures/conftest.py` +- Update `pytest.ini` testpaths accordingly + +### 5. Single docker-compose per context +- `dbz-twin/docker-compose.yaml` — single-instance twin-test +- `dbz-twin/rac/docker-compose.yaml` — RAC twin-test +- `dbz-twin/perf/docker-compose.yaml` — perf (extends RAC compose) +- Each compose is self-contained with all needed services + +## Migration Steps + +1. Create target directories +2. Move files with `git mv` +3. Update all path references in scripts (`source`, volume mounts, etc.) +4. Update `pytest.ini` testpaths +5. Create `environments/rac/olr.sh` shared helper +6. Update RAC test scripts to source `olr.sh` +7. Run full test suite to verify: + - `make test-redo` (if fixtures available) + - SQL e2e: free-23, xe-21, rac + - Checkpoint-restart soak test + - Perf validation test +8. Update `README.md` +9. Update `CLAUDE.md` / `AGENTS.md` if they reference old paths + +## Risks + +- **Many path references** — shell scripts use relative paths extensively. + Need to update every `source`, `cd`, volume mount, and `$SCRIPT_DIR` + reference. +- **CI workflows** — `.github/workflows/*.yaml` reference test paths. + Must update simultaneously. +- **Large diff** — many files moved. Hard to review. Consider doing it in + stages (environments first, then dbz-twin, then pytest). + +## Staged Approach + +**Phase 1**: Move environments +- `tests/sql/environments/` → `tests/environments/` +- Update all references + +**Phase 2**: Consolidate dbz-twin +- Merge `tests/debezium/` + `tests/sql/environments/rac/debezium/` → `tests/dbz-twin/` +- Create shared OLR helper + +**Phase 3**: Move pytest files +- Move `test_e2e.py`, `test_fixtures.py`, split `conftest.py` + +**Phase 4**: Validate +- Run all test suites +- Update documentation diff --git a/tests/sql/environments/rac/debezium/perf/VALIDATION-PLAN.md b/tests/design/VALIDATION-PLAN.md similarity index 100% rename from tests/sql/environments/rac/debezium/perf/VALIDATION-PLAN.md rename to tests/design/VALIDATION-PLAN.md diff --git a/tests/sql/environments/rac/debezium/perf/validator.py b/tests/sql/environments/rac/debezium/perf/validator.py index 7c705358..459005a3 100644 --- a/tests/sql/environments/rac/debezium/perf/validator.py +++ b/tests/sql/environments/rac/debezium/perf/validator.py @@ -34,8 +34,8 @@ def event_key(event): table = source.get('table', '') op = event.get('op', '') - if table == SENTINEL_TABLE: - return None # skip sentinel + if table in (SENTINEL_TABLE, 'BENCHMARK_RESULTS'): + return None # skip sentinel and Swingbench internal tables after = event.get('after') or {} before = event.get('before') or {}