-
-
Notifications
You must be signed in to change notification settings - Fork 16
feat: add chain sync protocol for block history synchronization #84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -156,6 +156,50 @@ async def handler(data): | |
| else: | ||
| logger.warning("📥 Received Block #%s — rejected", block.index) | ||
|
|
||
| elif msg_type == "status": | ||
| import json as _json | ||
| peer_height = payload["height"] | ||
| my_height = chain.height | ||
|
|
||
| if peer_height > my_height: | ||
| writer = data.get("_writer") | ||
| if writer: | ||
| request = _json.dumps({ | ||
| "type": "get_blocks", | ||
| "data": { | ||
| "from_height": my_height + 1, | ||
| "to_height": peer_height | ||
| } | ||
| }) + "\n" | ||
| writer.write(request.encode()) | ||
| await writer.drain() | ||
| logger.info("📡 Requesting blocks %d~%d from %s", | ||
| my_height + 1, peer_height, peer_addr) | ||
| elif msg_type == "get_blocks": | ||
| import json as _json | ||
| from_h = payload["from_height"] | ||
| to_h = payload["to_height"] | ||
| blocks = chain.get_blocks_range(from_h, to_h) | ||
|
|
||
| writer = data.get("_writer") | ||
| if writer and blocks: | ||
| response = _json.dumps({ | ||
| "type": "blocks", | ||
| "data": {"blocks": blocks} | ||
| }) + "\n" | ||
| writer.write(response.encode()) | ||
| await writer.drain() | ||
| logger.info("📤 Sent %d blocks to %s", len(blocks), peer_addr) | ||
|
|
||
| elif msg_type == "blocks": | ||
| received = payload["blocks"] | ||
| success, count = chain.add_blocks_bulk(received) | ||
|
|
||
| if success: | ||
| logger.info("✅ Chain synced: added %d blocks", count) | ||
| else: | ||
| logger.warning("❌ Chain sync failed after %d blocks", count) | ||
|
|
||
| return handler | ||
|
|
||
|
|
||
|
|
@@ -324,7 +368,14 @@ async def on_peer_connected(writer): | |
| }) + "\n" | ||
| writer.write(sync_msg.encode()) | ||
| await writer.drain() | ||
| logger.info("🔄 Sent state sync to new peer") | ||
|
|
||
| status_msg = _json.dumps({ | ||
| "type": "status", | ||
| "data": {"height": chain.height} | ||
| }) + "\n" | ||
| writer.write(status_msg.encode()) | ||
| await writer.drain() | ||
| logger.info("🔄 Sent state sync and status to new peer") | ||
|
Comment on lines
+372
to
+378
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Capture This 🤖 Prompt for AI Agents |
||
|
|
||
| network.register_on_peer_connected(on_peer_connected) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,12 @@ def last_block(self): | |
| with self._lock: # Acquire lock for thread-safe access | ||
| return self.chain[-1] | ||
|
|
||
| @property | ||
| def height(self) -> int: | ||
| """Returns the current chain height (genesis = 0)""" | ||
| with self._lock: | ||
| return len(self.chain) - 1 | ||
|
|
||
| def add_block(self, block): | ||
| """ | ||
| Validates and adds a block to the chain if all transactions succeed. | ||
|
|
@@ -82,3 +88,26 @@ def add_block(self, block): | |
| self.state = temp_state | ||
| self.chain.append(block) | ||
| return True | ||
|
|
||
| def get_blocks_range(self, from_height: int, to_height: int) -> list: | ||
| """Return serialized blocks from from_height to to_height inclusive.""" | ||
| with self._lock: | ||
| to_height = min(to_height, len(self.chain) - 1) | ||
| if from_height > to_height or from_height < 0: | ||
| return [] | ||
| return [b.to_dict() for b in self.chain[from_height:to_height + 1]] | ||
|
|
||
| def add_blocks_bulk(self, block_dicts: list) -> tuple: | ||
| """Add blocks validating chain linkage only. State relies on sync message.""" | ||
| added = 0 | ||
| for block_dict in block_dicts: | ||
| block = Block.from_dict(block_dict) | ||
| with self._lock: | ||
| try: | ||
| validate_block_link_and_hash(self.last_block, block) | ||
| except ValueError as exc: | ||
| logger.warning("Block %s rejected: %s", block.index, exc) | ||
| return False, added | ||
| self.chain.append(block) | ||
| added += 1 | ||
| return True, added | ||
|
Comment on lines
+100
to
+113
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make A failure on a later element returns 🔧 Proposed fix def add_blocks_bulk(self, block_dicts: list) -> tuple:
"""Add blocks validating chain linkage only. State relies on sync message."""
- added = 0
- for block_dict in block_dicts:
- block = Block.from_dict(block_dict)
- with self._lock:
- try:
- validate_block_link_and_hash(self.last_block, block)
- except ValueError as exc:
- logger.warning("Block %s rejected: %s", block.index, exc)
- return False, added
- self.chain.append(block)
- added += 1
- return True, added
+ with self._lock:
+ tip = self.chain[-1]
+ new_blocks = []
+
+ for block_dict in block_dicts:
+ block = Block.from_dict(block_dict)
+ try:
+ validate_block_link_and_hash(tip, block)
+ except ValueError as exc:
+ logger.warning("Block %s rejected: %s", block.index, exc)
+ return False, 0
+ new_blocks.append(block)
+ tip = block
+
+ self.chain.extend(new_blocks)
+ return True, len(new_blocks)🤖 Prompt for AI Agents |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
| TOPIC = "minichain-global" | ||
| SUPPORTED_MESSAGE_TYPES = {"sync", "tx", "block"} | ||
| SUPPORTED_MESSAGE_TYPES = {"sync", "tx", "block", "status", "get_blocks", "blocks"} | ||
|
|
||
|
|
||
| class P2PNetwork: | ||
|
|
@@ -207,6 +207,37 @@ def _validate_block_payload(self, payload): | |
| for tx_payload in payload["transactions"] | ||
| ) | ||
|
|
||
| def _validate_status_payload(self, payload): | ||
| if not isinstance(payload, dict): | ||
| return False | ||
| if set(payload) != {"height"}: | ||
| return False | ||
| if not isinstance(payload["height"], int) or payload["height"] < 0: | ||
| return False | ||
| return True | ||
|
|
||
| def _validate_get_blocks_payload(self, payload): | ||
| if not isinstance(payload, dict): | ||
| return False | ||
| if set(payload) != {"from_height", "to_height"}: | ||
| return False | ||
| fh, th = payload.get("from_height"), payload.get("to_height") | ||
| if not isinstance(fh, int) or not isinstance(th, int): | ||
| return False | ||
| if fh < 0 or fh > th: | ||
| return False | ||
| return True | ||
|
|
||
| def _validate_blocks_payload(self, payload): | ||
| if not isinstance(payload, dict): | ||
| return False | ||
| if set(payload) != {"blocks"}: | ||
| return False | ||
| blocks = payload.get("blocks") | ||
| if not isinstance(blocks, list): | ||
| return False | ||
| return all(isinstance(b, dict) for b in blocks) | ||
|
Comment on lines
+231
to
+239
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validate block entries inside This currently accepts any 🔧 Proposed fix def _validate_blocks_payload(self, payload):
if not isinstance(payload, dict):
return False
if set(payload) != {"blocks"}:
return False
blocks = payload.get("blocks")
if not isinstance(blocks, list):
return False
- return all(isinstance(b, dict) for b in blocks)
+ return all(self._validate_block_payload(block) for block in blocks)🧰 Tools🪛 Ruff (0.15.7)[warning] 231-231: Missing return type annotation for private function (ANN202) 🤖 Prompt for AI Agents |
||
|
|
||
| def _validate_message(self, message): | ||
| if not isinstance(message, dict): | ||
| return False | ||
|
|
@@ -226,6 +257,9 @@ def _validate_message(self, message): | |
| "sync": self._validate_sync_payload, | ||
| "tx": self._validate_transaction_payload, | ||
| "block": self._validate_block_payload, | ||
| "status": self._validate_status_payload, | ||
| "get_blocks": self._validate_get_blocks_payload, | ||
| "blocks": self._validate_blocks_payload, | ||
| } | ||
| return validators[msg_type](payload) | ||
|
|
||
|
|
@@ -283,6 +317,7 @@ async def _listen_to_peer( | |
| continue | ||
| self._mark_seen(msg_type, payload) | ||
| data["_peer_addr"] = addr | ||
| data["_writer"] = writer | ||
|
|
||
| if self._handler_callback: | ||
| try: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
History import assumes a state snapshot invariant that
syncdoes not guarantee.add_blocks_bulk()never replays transactions, so this path is only safe when the local state already matches the peer tip. But thesyncbranch above can reject the snapshot on Lines 120-122 and, when accepted, only merges missing accounts on Lines 130-136. The default--fundpath already creates local state before connect, so a node can append historical blocks and still keep non-canonical balances/nonces. Either make initial sync replace local state atomically, or make bulk history import rebuild state instead of relying onsync.🤖 Prompt for AI Agents