Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions graphgen/bases/base_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ def __call__(
is_first = True
for res in result:
yield pd.DataFrame([res])
self.store([res], meta_update if is_first else {})
self.store(
[res], meta_update if is_first else {}, flush=False
)
is_first = False
self.kv_storage.index_done_callback()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling index_done_callback() here is not robust. If an exception occurs within the for loop (lines 93-98), this line will not be executed. This would leave the storage in an inconsistent state because the data stored with flush=False would not be persisted.

To guarantee that index_done_callback() is always called after processing the generator, the loop should be wrapped in a try...finally block, with this call in the finally part.

Suggested change:

                try:
                    for res in result:
                        yield pd.DataFrame([res])
                        self.store(
                            [res], meta_update if is_first else {}, flush=False
                        )
                        is_first = False
                finally:
                    self.kv_storage.index_done_callback()

else:
yield pd.DataFrame(result)
self.store(result, meta_update)
Expand Down Expand Up @@ -141,7 +144,7 @@ def split(self, batch: "pd.DataFrame") -> tuple["pd.DataFrame", "pd.DataFrame"]:
recovered_chunks = [c for c in recovered_chunks if c is not None]
return to_process, pd.DataFrame(recovered_chunks)

def store(self, results: list, meta_update: dict):
def store(self, results: list, meta_update: dict, flush: bool = True):
results = convert_to_serializable(results)
meta_update = convert_to_serializable(meta_update)

Expand All @@ -159,7 +162,8 @@ def store(self, results: list, meta_update: dict):
for v in v_list:
inverse_meta[v] = k
self.kv_storage.update({"_meta_inverse": inverse_meta})
self.kv_storage.index_done_callback()
if flush:
self.kv_storage.index_done_callback()

@abstractmethod
def process(self, batch: list) -> Tuple[Union[list, Iterable[dict]], dict]:
Expand Down
11 changes: 8 additions & 3 deletions graphgen/storage/kv/rocksdb_storage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Set
Expand All @@ -8,6 +9,8 @@

from graphgen.bases.base_storage import BaseKVStorage

logger = logging.getLogger(__name__)


@dataclass
class RocksDBKVStorage(BaseKVStorage):
Expand All @@ -17,8 +20,10 @@ class RocksDBKVStorage(BaseKVStorage):
def __post_init__(self):
self._db_path = os.path.join(self.working_dir, f"{self.namespace}.db")
self._db = Rdict(self._db_path)
print(
f"RocksDBKVStorage initialized for namespace '{self.namespace}' at '{self._db_path}'"
logger.debug(
"RocksDBKVStorage initialized for namespace '%s' at '%s'",
self.namespace,
self._db_path,
)

@property
Expand All @@ -30,7 +35,7 @@ def all_keys(self) -> List[str]:

def index_done_callback(self):
self._db.flush()
print(f"RocksDB flushed for {self.namespace}")
logger.debug("RocksDB flushed for %s", self.namespace)

def get_by_id(self, id: str) -> Any:
return self._db.get(id, None)
Expand Down
Loading