Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a flush parameter to the store method in BaseOperator, allowing for deferred storage flushing during batch processing. Additionally, it replaces print statements with proper logging in RocksDBKVStorage. A review comment suggests wrapping the generator processing loop in a try...finally block to ensure index_done_callback() is executed even if an exception occurs, preventing potential data inconsistency.
| [res], meta_update if is_first else {}, flush=False | ||
| ) | ||
| is_first = False | ||
| self.kv_storage.index_done_callback() |
There was a problem hiding this comment.
Calling index_done_callback() here is not robust. If an exception occurs within the for loop (lines 93-98), this line will not be executed. This would leave the storage in an inconsistent state because the data stored with flush=False would not be persisted.
To guarantee that index_done_callback() is always called after processing the generator, the loop should be wrapped in a try...finally block, with this call in the finally part.
Suggested change:
try:
for res in result:
yield pd.DataFrame([res])
self.store(
[res], meta_update if is_first else {}, flush=False
)
is_first = False
finally:
self.kv_storage.index_done_callback()
This pull request introduces a flush parameter to the store method in BaseOperator, allowing for deferred storage flushing during batch processing. Additionally, it replaces print statements with proper logging in RocksDBKVStorage. A review comment suggests wrapping the generator processing loop in a try...finally block to ensure index_done_callback() is executed even if an exception occurs, preventing potential data inconsistency.