From a0a5f02b7161ef885a8b0158a8ba8c1364a780cf Mon Sep 17 00:00:00 2001 From: Theo Satabin Date: Wed, 13 Sep 2023 16:57:33 +0200 Subject: [PATCH 1/2] Client S3 avec connexions ouvertes et cache LRU de lecture --- CHANGELOG.md | 13 ++++++ src/rok4/storage.py | 103 +++++++++++++++++++++++++++++++------------- 2 files changed, 87 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24eaed0..59c51ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +## 2.0.0 + +### [Fixed] + +* Pyramid + * quand on lit une tuile dans une pyramide PNG 1 canal, on retourne bien aussi un numpy.array à 3 dimensions (la dernière dimension sera bien un array à un élément) + +### [Changed] + +* Storage + * Le client S3 garde ouverte des connexions + * La fonction get_data_binary a un système de cache de type LRU, avec un temps de validité de 5 minutes + ## 1.7.1 ### [Added] diff --git a/src/rok4/storage.py b/src/rok4/storage.py index 8fda078..14c689e 100644 --- a/src/rok4/storage.py +++ b/src/rok4/storage.py @@ -18,6 +18,7 @@ - ROK4_S3_KEY - ROK4_S3_SECRETKEY - ROK4_S3_URL +- ROK4_SSL_NO_VERIFY (optionnal) with a non empty value disables certificate check.. Define PYTHONWARNINGS to "ignore:Unverified HTTPS request" to disable warnings logs To use several S3 clusters, each environment variable have to contain a list (comma-separated), with the same number of elements @@ -34,12 +35,15 @@ import os import re import tempfile +from functools import lru_cache from shutil import copyfile from typing import Dict, Tuple, Union import boto3 import botocore.exceptions + import requests +import time from osgeo import gdal # conditional import @@ -67,6 +71,10 @@ __S3_DEFAULT_CLIENT = None +def __get_ttl_hash(): + """Return the same value withing 5 minutes time period""" + return round(time.time() / 300) + def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", str]], str, str]: """Get the S3 client @@ -85,7 +93,13 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s global __S3_CLIENTS, __S3_DEFAULT_CLIENT + if not __S3_CLIENTS: + + verify = True + if "ROK4_SSL_NO_VERIFY" in os.environ and os.environ["ROK4_SSL_NO_VERIFY"] != "": + verify = False + # C'est la première fois qu'on cherche à utiliser le stockage S3, chargeons les informations depuis les variables d'environnement try: keys = os.environ["ROK4_S3_KEY"].split(",") @@ -109,7 +123,12 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s "s3", aws_access_key_id=keys[i], aws_secret_access_key=secret_keys[i], + verify=verify, endpoint_url=urls[i], + config=botocore.config.Config( + tcp_keepalive = True, + max_pool_connections = 10 + ) ), "key": keys[i], "secret_key": secret_keys[i], @@ -271,6 +290,7 @@ def get_data_str(path: str) -> str: MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue FileNotFoundError: File or object does not exist + NotImplementedError: Storage type not handled Returns: str: Data content @@ -279,17 +299,20 @@ def get_data_str(path: str) -> str: return get_data_binary(path).decode("utf-8") -def get_data_binary(path: str, range: Tuple[int, int] = None) -> str: - """Load data into a binary string +@lru_cache(maxsize=50) +def __get_cached_data_binary(path: str, ttl_hash: int, range: Tuple[int, int] = None) -> str: + """Load data into a binary string, using a LRU cache Args: path (str): path to data + ttl_hash (int): time hash, to invalid cache range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None. Raises: MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue FileNotFoundError: File or object does not exist + NotImplementedError: Storage type not handled Returns: str: Data binary content @@ -329,7 +352,7 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str: except Exception as e: raise StorageError("S3", e) - elif storage_type == StorageType.CEPH: + elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: ioctx = __get_ceph_ioctx(tray_name) try: @@ -372,13 +395,33 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str: except Exception as e: raise StorageError(storage_type.name, e) else: - raise NotImplementedError + raise NotImplementedError(f"Cannot get partial data for storage type HTTP(S)") else: - raise StorageError("UNKNOWN", "Unhandled storage type to read binary data") + raise NotImplementedError(f"Cannot get data for storage type {storage_type.name}") return data +def get_data_binary(path: str, range: Tuple[int, int] = None) -> str: + """Load data into a binary string + + This function uses a LRU cache, with a TTL of 5 minutes + + Args: + path (str): path to data + range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None. + + Raises: + MissingEnvironmentError: Missing object storage informations + StorageError: Storage read issue + FileNotFoundError: File or object does not exist + NotImplementedError: Storage type not handled + + Returns: + str: Data binary content + """ + return __get_cached_data_binary(path, __get_ttl_hash(), range) + def put_data_str(data: str, path: str) -> None: """Store string data into a file or an object @@ -392,6 +435,7 @@ def put_data_str(data: str, path: str) -> None: Raises: MissingEnvironmentError: Missing object storage informations StorageError: Storage write issue + NotImplementedError: Storage type not handled """ storage_type, path, tray_name, base_name = get_infos_from_path(path) @@ -406,7 +450,7 @@ def put_data_str(data: str, path: str) -> None: except Exception as e: raise StorageError("S3", e) - elif storage_type == StorageType.CEPH: + elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: ioctx = __get_ceph_ioctx(tray_name) try: @@ -423,7 +467,7 @@ def put_data_str(data: str, path: str) -> None: raise StorageError("FILE", e) else: - raise StorageError("UNKNOWN", "Unhandled storage type to write string data") + raise NotImplementedError(f"Cannot write data for storage type {storage_type.name}") def get_size(path: str) -> int: @@ -435,6 +479,7 @@ def get_size(path: str) -> int: Raises: MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue + NotImplementedError: Storage type not handled Returns: int: file/object size, in bytes @@ -453,7 +498,7 @@ def get_size(path: str) -> int: except Exception as e: raise StorageError("S3", e) - elif storage_type == StorageType.CEPH: + elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: ioctx = __get_ceph_ioctx(tray_name) try: @@ -478,7 +523,7 @@ def get_size(path: str) -> int: raise StorageError(storage_type.name, e) else: - raise StorageError("UNKNOWN", "Unhandled storage type to get size") + raise NotImplementedError(f"Cannot get size for storage type {storage_type.name}") def exists(path: str) -> bool: @@ -490,6 +535,7 @@ def exists(path: str) -> bool: Raises: MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue + NotImplementedError: Storage type not handled Returns: bool: file/object existing status @@ -509,7 +555,7 @@ def exists(path: str) -> bool: else: raise StorageError("S3", e) - elif storage_type == StorageType.CEPH: + elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: ioctx = __get_ceph_ioctx(tray_name) try: @@ -534,7 +580,7 @@ def exists(path: str) -> bool: raise StorageError(storage_type.name, e) else: - raise StorageError("UNKNOWN", "Unhandled storage type to test if exists") + raise NotImplementedError(f"Cannot test existence for storage type {storage_type.name}") def remove(path: str) -> None: @@ -546,6 +592,7 @@ def remove(path: str) -> None: Raises: MissingEnvironmentError: Missing object storage informations StorageError: Storage removal issue + NotImplementedError: Storage type not handled """ storage_type, path, tray_name, base_name = get_infos_from_path(path) @@ -557,7 +604,7 @@ def remove(path: str) -> None: except Exception as e: raise StorageError("S3", e) - elif storage_type == StorageType.CEPH: + elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: ioctx = __get_ceph_ioctx(tray_name) try: @@ -576,7 +623,7 @@ def remove(path: str) -> None: raise StorageError("FILE", e) else: - raise StorageError("UNKNOWN", "Unhandled storage type to remove things") + raise NotImplementedError(f"Cannot remove data for storage type {storage_type.name}") def copy(from_path: str, to_path: str, from_md5: str = None) -> None: @@ -588,8 +635,9 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None: from_md5 (str, optional): MD5 sum, re-processed after copy and controlled. Defaults to None. Raises: - StorageError: Unhandled copy or copy issue + StorageError: Copy issue MissingEnvironmentError: Missing object storage informations + NotImplementedError: Storage type not handled """ from_type, from_path, from_tray, from_base_name = get_infos_from_path(from_path) @@ -687,7 +735,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None: except Exception as e: raise StorageError("S3", f"Cannot copy S3 object {from_path} to {to_path} : {e}") - elif from_type == StorageType.CEPH and to_type == StorageType.FILE: + elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.FILE: ioctx = __get_ceph_ioctx(from_tray) if from_md5 is not None: @@ -726,7 +774,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None: "CEPH and FILE", f"Cannot copy CEPH object {from_path} to file {to_path} : {e}" ) - elif from_type == StorageType.FILE and to_type == StorageType.CEPH: + elif from_type == StorageType.FILE and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: ioctx = __get_ceph_ioctx(to_tray) if from_md5 is not None: @@ -763,7 +811,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None: "FILE and CEPH", f"Cannot copy file {from_path} to CEPH object {to_path} : {e}" ) - elif from_type == StorageType.CEPH and to_type == StorageType.CEPH: + elif from_type == StorageType.CEPH and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: from_ioctx = __get_ceph_ioctx(from_tray) to_ioctx = __get_ceph_ioctx(to_tray) @@ -795,7 +843,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None: except Exception as e: raise StorageError("CEPH", f"Cannot copy CEPH object {from_path} to {to_path} : {e}") - elif from_type == StorageType.CEPH and to_type == StorageType.S3: + elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.S3: from_ioctx = __get_ceph_ioctx(from_tray) s3_client, to_bucket = __get_s3_client(to_tray) @@ -854,7 +902,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None: elif ( from_type == StorageType.HTTP or from_type == StorageType.HTTPS - ) and to_type == StorageType.CEPH: + ) and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: to_ioctx = __get_ceph_ioctx(to_tray) try: @@ -896,10 +944,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None: ) else: - raise StorageError( - f"{from_type.name} and {to_type.name}", - f"Cannot copy from {from_type.name} to {to_type.name}", - ) + raise NotImplementedError(f"Cannot copy data from storage type {from_type.name} to storage type {to_type.name}") def link(target_path: str, link_path: str, hard: bool = False) -> None: @@ -911,8 +956,9 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None: hard (bool, optional): hard link rather than symbolic. Only for FILE storage. Defaults to False. Raises: - StorageError: Unhandled link or link issue + StorageError: link issue MissingEnvironmentError: Missing object storage informations + NotImplementedError: Storage type not handled """ target_type, target_path, target_tray, target_base_name = get_infos_from_path(target_path) @@ -947,7 +993,7 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None: except Exception as e: raise StorageError("S3", e) - elif target_type == StorageType.CEPH: + elif target_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: ioctx = __get_ceph_ioctx(link_tray) try: @@ -965,7 +1011,7 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None: raise StorageError("FILE", e) else: - raise StorageError("UNKNOWN", "Unhandled storage type to make link") + raise NotImplementedError(f"Cannot make link for storage type {target_type.name}") def get_osgeo_path(path: str) -> str: @@ -1013,6 +1059,7 @@ def size_path(path: str) -> int: Raises: StorageError: Unhandled link or link issue MissingEnvironmentError: Missing object storage informations + NotImplementedError: Storage type not handled Returns: int: size of the path @@ -1052,9 +1099,7 @@ def size_path(path: str) -> int: except Exception as e: raise StorageError("S3", e) - elif storage_type == StorageType.CEPH: - raise NotImplementedError else: - raise StorageError("UNKNOWN", "Unhandled storage type to calculate size") + raise NotImplementedError(f"Cannot get prefix path size for storage type {storage_type.name}") return total From 73b49d79a45c71d557829b66be1377c4af35f405 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 13 Sep 2023 15:00:40 +0000 Subject: [PATCH 2/2] =?UTF-8?q?[pre-commit.ci]=20Corrections=20automatique?= =?UTF-8?q?s=20appliqu=C3=A9es=20par=20les=20git=20hooks.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/rok4/storage.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/rok4/storage.py b/src/rok4/storage.py index 14c689e..fc7a775 100644 --- a/src/rok4/storage.py +++ b/src/rok4/storage.py @@ -35,15 +35,14 @@ import os import re import tempfile +import time from functools import lru_cache from shutil import copyfile from typing import Dict, Tuple, Union import boto3 import botocore.exceptions - import requests -import time from osgeo import gdal # conditional import @@ -75,6 +74,7 @@ def __get_ttl_hash(): """Return the same value withing 5 minutes time period""" return round(time.time() / 300) + def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", str]], str, str]: """Get the S3 client @@ -93,9 +93,7 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s global __S3_CLIENTS, __S3_DEFAULT_CLIENT - if not __S3_CLIENTS: - verify = True if "ROK4_SSL_NO_VERIFY" in os.environ and os.environ["ROK4_SSL_NO_VERIFY"] != "": verify = False @@ -125,10 +123,7 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s aws_secret_access_key=secret_keys[i], verify=verify, endpoint_url=urls[i], - config=botocore.config.Config( - tcp_keepalive = True, - max_pool_connections = 10 - ) + config=botocore.config.Config(tcp_keepalive=True, max_pool_connections=10), ), "key": keys[i], "secret_key": secret_keys[i], @@ -395,13 +390,14 @@ def __get_cached_data_binary(path: str, ttl_hash: int, range: Tuple[int, int] = except Exception as e: raise StorageError(storage_type.name, e) else: - raise NotImplementedError(f"Cannot get partial data for storage type HTTP(S)") + raise NotImplementedError("Cannot get partial data for storage type HTTP(S)") else: raise NotImplementedError(f"Cannot get data for storage type {storage_type.name}") return data + def get_data_binary(path: str, range: Tuple[int, int] = None) -> str: """Load data into a binary string @@ -901,8 +897,10 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None: ) elif ( - from_type == StorageType.HTTP or from_type == StorageType.HTTPS - ) and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE: + (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) + and to_type == StorageType.CEPH + and CEPH_RADOS_AVAILABLE + ): to_ioctx = __get_ceph_ioctx(to_tray) try: @@ -944,7 +942,9 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None: ) else: - raise NotImplementedError(f"Cannot copy data from storage type {from_type.name} to storage type {to_type.name}") + raise NotImplementedError( + f"Cannot copy data from storage type {from_type.name} to storage type {to_type.name}" + ) def link(target_path: str, link_path: str, hard: bool = False) -> None: @@ -1100,6 +1100,8 @@ def size_path(path: str) -> int: raise StorageError("S3", e) else: - raise NotImplementedError(f"Cannot get prefix path size for storage type {storage_type.name}") + raise NotImplementedError( + f"Cannot get prefix path size for storage type {storage_type.name}" + ) return total