Skip to content

Fix duplicate flush bug#185

Merged
ChenZiHong-Gavin merged 1 commit intomainfrom
fix/fix-db-flush
Mar 30, 2026
Merged

Fix duplicate flush bug#185
ChenZiHong-Gavin merged 1 commit intomainfrom
fix/fix-db-flush

Conversation

@ChenZiHong-Gavin
Copy link
Copy Markdown
Collaborator

@ChenZiHong-Gavin ChenZiHong-Gavin commented Mar 30, 2026

This pull request introduces a flush parameter to the store method in BaseOperator, allowing for deferred storage flushing during batch processing. Additionally, it replaces print statements with proper logging in RocksDBKVStorage. A review comment suggests wrapping the generator processing loop in a try...finally block to ensure index_done_callback() is executed even if an exception occurs, preventing potential data inconsistency.

@github-actions github-actions bot added the core label Mar 30, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a flush parameter to the store method in BaseOperator, allowing for deferred storage flushing during batch processing. Additionally, it replaces print statements with proper logging in RocksDBKVStorage. A review comment suggests wrapping the generator processing loop in a try...finally block to ensure index_done_callback() is executed even if an exception occurs, preventing potential data inconsistency.

[res], meta_update if is_first else {}, flush=False
)
is_first = False
self.kv_storage.index_done_callback()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling index_done_callback() here is not robust. If an exception occurs within the for loop (lines 93-98), this line will not be executed. This would leave the storage in an inconsistent state because the data stored with flush=False would not be persisted.

To guarantee that index_done_callback() is always called after processing the generator, the loop should be wrapped in a try...finally block, with this call in the finally part.

Suggested change:

                try:
                    for res in result:
                        yield pd.DataFrame([res])
                        self.store(
                            [res], meta_update if is_first else {}, flush=False
                        )
                        is_first = False
                finally:
                    self.kv_storage.index_done_callback()

@ChenZiHong-Gavin ChenZiHong-Gavin merged commit 9e994a4 into main Mar 30, 2026
9 checks passed
@ChenZiHong-Gavin ChenZiHong-Gavin deleted the fix/fix-db-flush branch March 30, 2026 08:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant