From 098597259e9f918a942965523c803040d2abe868 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 2 Feb 2025 16:15:01 -0500 Subject: [PATCH 01/17] feat: remove old metadata files and add setting --- pyiceberg/catalog/__init__.py | 24 +++++++++++++++++++++++- pyiceberg/table/__init__.py | 3 +++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index c3ea23c1d2..d1c2d19505 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -55,6 +55,7 @@ CreateTableTransaction, StagedTable, Table, + TableProperties, ) from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -72,6 +73,7 @@ from pyiceberg.utils.config import Config, merge_config from pyiceberg.utils.deprecated import deprecated as deprecated from pyiceberg.utils.deprecated import deprecation_message +from pyiceberg.utils.properties import property_as_bool if TYPE_CHECKING: import pyarrow as pa @@ -859,6 +861,12 @@ def _update_and_stage_table( metadata_location=current_table.metadata_location if current_table else None, ) + # https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527 + # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true + io = self._load_file_io(properties=updated_metadata.properties, location=updated_metadata.location) + if current_table is not None: + self._delete_old_metadata(io, current_table.metadata, updated_metadata) + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) @@ -866,7 +874,7 @@ def _update_and_stage_table( identifier=table_identifier, metadata=updated_metadata, metadata_location=new_metadata_location, - io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location), + io=io, catalog=self, ) @@ -913,6 +921,20 @@ def _get_default_warehouse_location(self, database_name: str, table_name: str) - raise ValueError("No default path is set, please specify a location when creating a table") + def _delete_old_metadata(self, io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None: + """Delete oldest metadata if config is set to true.""" + delete_after_commit: bool = property_as_bool( + metadata.properties, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT, + ) + + if delete_after_commit: + removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log} + current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log} + removed_previous_metadata_files.difference_update(current_metadata_files) + delete_files(io, removed_previous_metadata_files, METADATA) + @staticmethod def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None: ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f857fb8cc0..eb4ee0155a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -217,6 +217,9 @@ class TableProperties: METADATA_PREVIOUS_VERSIONS_MAX = "write.metadata.previous-versions-max" METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT = 100 + METADATA_DELETE_AFTER_COMMIT_ENABLED = "write.metadata.delete-after-commit.enabled" + METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT = False + MAX_SNAPSHOT_AGE_MS = "history.expire.max-snapshot-age-ms" MAX_SNAPSHOT_AGE_MS_DEFAULT = 5 * 24 * 60 * 60 * 1000 # 5 days From d244ffd145b60ad7e8c3576c6675cdea0ccf70b5 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 4 Feb 2025 01:27:18 -0500 Subject: [PATCH 02/17] add test for delete after commit --- tests/catalog/test_sql.py | 59 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index cffc14d9d7..31326fb8cb 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -60,6 +60,7 @@ from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import Identifier from pyiceberg.types import IntegerType, strtobool +from pyiceberg.table import TableProperties CATALOG_TABLES = [c.__tablename__ for c in SqlCatalogBaseTable.__subclasses__()] @@ -1613,3 +1614,61 @@ def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with tbl.append(arrow_table_with_null) assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null) + + +@pytest.mark.parametrize( + "catalog", + [ + lazy_fixture("catalog_memory"), + lazy_fixture("catalog_sqlite"), + lazy_fixture("catalog_sqlite_without_rowcount"), + ], +) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + ], +) +def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + namespace = Catalog.namespace_from(table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, table_schema_nested) + + original_metadata_location = table.metadata_location + + for i in range(5): + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path=f"new_column_{i}", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + assert len(table.metadata.metadata_log) == 5 + assert os.path.exists(original_metadata_location[len("file://") :]) + + # Set the max versions property to 2, and delete after commit + new_property = { + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2", + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true" + } + + transaction = table.transaction() + update = transaction.set_properties(new_property) + update.commit_transaction() + + # Verify that only the most recent metadata files are kept + assert len(table.metadata.metadata_log) == 2 + updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path=f"new_column_x", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + assert len(table.metadata.metadata_log) == 2 + assert not os.path.exists(original_metadata_location[len("file://") :]) + assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") :]) + assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :]) \ No newline at end of file From 48070406ba0a08c607059d952db02f46f81d3fe2 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 4 Feb 2025 01:27:46 -0500 Subject: [PATCH 03/17] feat: delete old metadata after commit table --- tests/catalog/test_base.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 22ac65c0ff..9a540a6ab1 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -141,6 +141,14 @@ def commit_table( requirement.validate(base_metadata) updated_metadata = update_table_metadata(base_metadata, updates) + + # delete old metadata + self._delete_old_metadata( + current_table.io, + base_metadata, + updated_metadata + ) + if updated_metadata == base_metadata: # no changes, do nothing return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) From 352b97274924d887dffe212089d0f97a90e71d9c Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 4 Feb 2025 09:15:57 -0500 Subject: [PATCH 04/17] reformatting --- tests/catalog/test_base.py | 6 +----- tests/catalog/test_sql.py | 17 ++++++++--------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 9a540a6ab1..6224abaa0f 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -143,11 +143,7 @@ def commit_table( updated_metadata = update_table_metadata(base_metadata, updates) # delete old metadata - self._delete_old_metadata( - current_table.io, - base_metadata, - updated_metadata - ) + self._delete_old_metadata(current_table.io, base_metadata, updated_metadata) if updated_metadata == base_metadata: # no changes, do nothing diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 31326fb8cb..91bfd5948b 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -50,6 +50,7 @@ from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema +from pyiceberg.table import TableProperties from pyiceberg.table.snapshots import Operation from pyiceberg.table.sorting import ( NullOrder, @@ -60,7 +61,6 @@ from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import Identifier from pyiceberg.types import IntegerType, strtobool -from pyiceberg.table import TableProperties CATALOG_TABLES = [c.__tablename__ for c in SqlCatalogBaseTable.__subclasses__()] @@ -1635,28 +1635,27 @@ def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Sche namespace = Catalog.namespace_from(table_identifier) catalog.create_namespace(namespace) table = catalog.create_table(table_identifier, table_schema_nested) - + original_metadata_location = table.metadata_location - + for i in range(5): transaction = table.transaction() update = transaction.update_schema() update.add_column(path=f"new_column_{i}", field_type=IntegerType()) update.commit() transaction.commit_transaction() - + assert len(table.metadata.metadata_log) == 5 assert os.path.exists(original_metadata_location[len("file://") :]) # Set the max versions property to 2, and delete after commit new_property = { TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2", - TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true" + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true", } transaction = table.transaction() - update = transaction.set_properties(new_property) - update.commit_transaction() + transaction.set_properties(properties=new_property).commit_transaction() # Verify that only the most recent metadata files are kept assert len(table.metadata.metadata_log) == 2 @@ -1664,11 +1663,11 @@ def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Sche transaction = table.transaction() update = transaction.update_schema() - update.add_column(path=f"new_column_x", field_type=IntegerType()) + update.add_column(path="new_column_x", field_type=IntegerType()) update.commit() transaction.commit_transaction() assert len(table.metadata.metadata_log) == 2 assert not os.path.exists(original_metadata_location[len("file://") :]) assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") :]) - assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :]) \ No newline at end of file + assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :]) From a375e1c680ab74eca4fe01d1c1119838d391d6f8 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Fri, 7 Feb 2025 16:24:27 -0500 Subject: [PATCH 05/17] use context managers --- tests/catalog/test_sql.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 91bfd5948b..01504bb33a 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -1639,11 +1639,9 @@ def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Sche original_metadata_location = table.metadata_location for i in range(5): - transaction = table.transaction() - update = transaction.update_schema() - update.add_column(path=f"new_column_{i}", field_type=IntegerType()) - update.commit() - transaction.commit_transaction() + with table.transaction() as transaction: + with transaction.update_schema() as update: + update.add_column(path=f"new_column_{i}", field_type=IntegerType()) assert len(table.metadata.metadata_log) == 5 assert os.path.exists(original_metadata_location[len("file://") :]) @@ -1654,18 +1652,16 @@ def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Sche TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true", } - transaction = table.transaction() - transaction.set_properties(properties=new_property).commit_transaction() + with table.transaction() as transaction: + transaction.set_properties(properties=new_property) # Verify that only the most recent metadata files are kept assert len(table.metadata.metadata_log) == 2 updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log - transaction = table.transaction() - update = transaction.update_schema() - update.add_column(path="new_column_x", field_type=IntegerType()) - update.commit() - transaction.commit_transaction() + with table.transaction() as transaction: + with transaction.update_schema() as update: + update.add_column(path="new_column_x", field_type=IntegerType()) assert len(table.metadata.metadata_log) == 2 assert not os.path.exists(original_metadata_location[len("file://") :]) From 1f631c251d28a8b1d119fab90cdc431881605c39 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Fri, 7 Feb 2025 16:31:33 -0500 Subject: [PATCH 06/17] add docs to configuration --- mkdocs/docs/configuration.md | 1 + 1 file changed, 1 insertion(+) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 3705be5d35..9388a36b4f 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -63,6 +63,7 @@ Iceberg tables support table properties to configure table behavior. | `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk | | `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | | `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. | +| `write.metadata.delete-after-commit.enabled` | Boolean | False | Whether to automatically delete old *tracked* metadata files after each table commit. It will retain a number of the most recent metadata files, which can be set using property `write.metadata.previous-versions-max`. | | `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation | | `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled | | `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation | From 7e4c4b32e022dbf345169d6b877e2a5d90a97b77 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Fri, 7 Feb 2025 16:34:38 -0500 Subject: [PATCH 07/17] reorder comment --- pyiceberg/catalog/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index d1c2d19505..6f393db299 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -860,10 +860,10 @@ def _update_and_stage_table( enforce_validation=current_table is None, metadata_location=current_table.metadata_location if current_table else None, ) + io = self._load_file_io(properties=updated_metadata.properties, location=updated_metadata.location) # https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527 # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true - io = self._load_file_io(properties=updated_metadata.properties, location=updated_metadata.location) if current_table is not None: self._delete_old_metadata(io, current_table.metadata, updated_metadata) From f440ac146bfa61cef41b989172f37795b042a54e Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 9 Feb 2025 11:07:00 -0500 Subject: [PATCH 08/17] remove call from catalog; convert to static method --- pyiceberg/catalog/__init__.py | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 6f393db299..c71301b50a 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -747,6 +747,21 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: except ModuleNotFoundError: pass raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema") + + @staticmethod + def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None: + """Delete oldest metadata if config is set to true.""" + delete_after_commit: bool = property_as_bool( + metadata.properties, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT, + ) + + if delete_after_commit: + removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log} + current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log} + removed_previous_metadata_files.difference_update(current_metadata_files) + delete_files(io, removed_previous_metadata_files, METADATA) def __repr__(self) -> str: """Return the string representation of the Catalog class.""" @@ -862,11 +877,6 @@ def _update_and_stage_table( ) io = self._load_file_io(properties=updated_metadata.properties, location=updated_metadata.location) - # https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527 - # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true - if current_table is not None: - self._delete_old_metadata(io, current_table.metadata, updated_metadata) - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) @@ -921,20 +931,6 @@ def _get_default_warehouse_location(self, database_name: str, table_name: str) - raise ValueError("No default path is set, please specify a location when creating a table") - def _delete_old_metadata(self, io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None: - """Delete oldest metadata if config is set to true.""" - delete_after_commit: bool = property_as_bool( - metadata.properties, - TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, - TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT, - ) - - if delete_after_commit: - removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log} - current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log} - removed_previous_metadata_files.difference_update(current_metadata_files) - delete_files(io, removed_previous_metadata_files, METADATA) - @staticmethod def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None: ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) From 4b36bfca417899171f40d4ba1c333588b201199c Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 9 Feb 2025 11:07:19 -0500 Subject: [PATCH 09/17] call after commit --- pyiceberg/table/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c6346ea33e..c05a1abf06 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1182,6 +1182,11 @@ def refs(self) -> Dict[str, SnapshotRef]: def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None: response = self.catalog.commit_table(self, requirements, updates) + + # https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527 + # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true + self.catalog._delete_old_metadata(self.io, self.metadata, response.metadata) + self.metadata = response.metadata self.metadata_location = response.metadata_location From f7b7c03e2de8763f6f36bf13d6deaf4e6f216231 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 9 Feb 2025 11:07:26 -0500 Subject: [PATCH 10/17] remove call from test --- tests/catalog/test_base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 6224abaa0f..4b0aee9bb6 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -142,9 +142,6 @@ def commit_table( updated_metadata = update_table_metadata(base_metadata, updates) - # delete old metadata - self._delete_old_metadata(current_table.io, base_metadata, updated_metadata) - if updated_metadata == base_metadata: # no changes, do nothing return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) From d30bbaf6bd937d9dc7f6f79ecc4e4eca37735f02 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 9 Feb 2025 12:17:33 -0500 Subject: [PATCH 11/17] few lint fixes --- pyiceberg/catalog/__init__.py | 2 +- pyiceberg/table/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index c71301b50a..625f54fd00 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -747,7 +747,7 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: except ModuleNotFoundError: pass raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema") - + @staticmethod def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None: """Delete oldest metadata if config is set to true.""" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c05a1abf06..38018f8db9 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1182,7 +1182,7 @@ def refs(self) -> Dict[str, SnapshotRef]: def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None: response = self.catalog.commit_table(self, requirements, updates) - + # https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527 # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true self.catalog._delete_old_metadata(self.io, self.metadata, response.metadata) From 01fb388ab54f47bbf41bdf824b8a81581af1b9a8 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 9 Feb 2025 14:56:20 -0500 Subject: [PATCH 12/17] add comment and remove parameterized test --- tests/catalog/test_sql.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 01504bb33a..4e705d2c62 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -1624,13 +1624,6 @@ def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with lazy_fixture("catalog_sqlite_without_rowcount"), ], ) -@pytest.mark.parametrize( - "table_identifier", - [ - lazy_fixture("random_table_identifier"), - lazy_fixture("random_hierarchical_identifier"), - ], -) def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: namespace = Catalog.namespace_from(table_identifier) catalog.create_namespace(namespace) @@ -1659,6 +1652,7 @@ def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Sche assert len(table.metadata.metadata_log) == 2 updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log + # new metadata log was added, so earlier metadata logs are removed. with table.transaction() as transaction: with transaction.update_schema() as update: update.add_column(path="new_column_x", field_type=IntegerType()) From 462f3d4288468c50b05d26dacd6f8c7ee767d9f8 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 9 Feb 2025 14:56:37 -0500 Subject: [PATCH 13/17] reset io change in catalog --- pyiceberg/catalog/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 625f54fd00..1e2fa0ab78 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -875,7 +875,6 @@ def _update_and_stage_table( enforce_validation=current_table is None, metadata_location=current_table.metadata_location if current_table else None, ) - io = self._load_file_io(properties=updated_metadata.properties, location=updated_metadata.location) new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) @@ -884,7 +883,7 @@ def _update_and_stage_table( identifier=table_identifier, metadata=updated_metadata, metadata_location=new_metadata_location, - io=io, + io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location), catalog=self, ) From 45516dc185626f85273dbe530604ca0694093265 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 9 Feb 2025 14:57:09 -0500 Subject: [PATCH 14/17] add comments, wrap around exception and warning --- pyiceberg/table/__init__.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 38018f8db9..73e1d09e7e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1184,8 +1184,13 @@ def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[Table response = self.catalog.commit_table(self, requirements, updates) # https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527 - # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true - self.catalog._delete_old_metadata(self.io, self.metadata, response.metadata) + # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true and uses + # TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many previous versions to keep - + # everything else will be removed. + try: + self.catalog._delete_old_metadata(self.io, self.metadata, response.metadata) + except Exception as e: + warnings.warn(f"Failed to delete old metadata after commit: {e}") self.metadata = response.metadata self.metadata_location = response.metadata_location From e8277c84eb0bb757dd2b26cf7d046407d2d16735 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sun, 9 Feb 2025 14:58:48 -0500 Subject: [PATCH 15/17] remove additional new line --- tests/catalog/test_base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 4b0aee9bb6..22ac65c0ff 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -141,7 +141,6 @@ def commit_table( requirement.validate(base_metadata) updated_metadata = update_table_metadata(base_metadata, updates) - if updated_metadata == base_metadata: # no changes, do nothing return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) From ed7a705b91e130312ee55f9a1fa0d01c203bd1a8 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Mon, 10 Feb 2025 23:38:49 -0500 Subject: [PATCH 16/17] remove table identifier from test function --- tests/catalog/test_sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 4e705d2c62..221741e830 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -1624,7 +1624,7 @@ def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with lazy_fixture("catalog_sqlite_without_rowcount"), ], ) -def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: +def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema) -> None: namespace = Catalog.namespace_from(table_identifier) catalog.create_namespace(namespace) table = catalog.create_table(table_identifier, table_schema_nested) From ec3de23b41a2cb290ce2185089dfce46426c2cff Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 11 Feb 2025 23:24:23 -0500 Subject: [PATCH 17/17] Apply suggestions from code review --- tests/catalog/test_sql.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 221741e830..d2800363a6 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -1624,10 +1624,10 @@ def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with lazy_fixture("catalog_sqlite_without_rowcount"), ], ) -def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema) -> None: - namespace = Catalog.namespace_from(table_identifier) +def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema, random_table_identifier: str) -> None: + namespace = Catalog.namespace_from(random_table_identifier) catalog.create_namespace(namespace) - table = catalog.create_table(table_identifier, table_schema_nested) + table = catalog.create_table(random_table_identifier, table_schema_nested) original_metadata_location = table.metadata_location