Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
## 2.0.0

### [Fixed]

* Pyramid
* quand on lit une tuile dans une pyramide PNG 1 canal, on retourne bien aussi un numpy.array à 3 dimensions (la dernière dimension sera bien un array à un élément)

### [Changed]

* Storage
* Le client S3 garde ouverte des connexions
* La fonction get_data_binary a un système de cache de type LRU, avec un temps de validité de 5 minutes

## 1.7.1

### [Added]
Expand Down
105 changes: 76 additions & 29 deletions src/rok4/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- ROK4_S3_KEY
- ROK4_S3_SECRETKEY
- ROK4_S3_URL
- ROK4_SSL_NO_VERIFY (optionnal) with a non empty value disables certificate check.. Define PYTHONWARNINGS to "ignore:Unverified HTTPS request" to disable warnings logs

To use several S3 clusters, each environment variable have to contain a list (comma-separated), with the same number of elements

Expand All @@ -34,6 +35,8 @@
import os
import re
import tempfile
import time
from functools import lru_cache
from shutil import copyfile
from typing import Dict, Tuple, Union

Expand Down Expand Up @@ -67,6 +70,11 @@
__S3_DEFAULT_CLIENT = None


def __get_ttl_hash():
"""Return the same value withing 5 minutes time period"""
return round(time.time() / 300)


def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", str]], str, str]:
"""Get the S3 client

Expand All @@ -86,6 +94,10 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s
global __S3_CLIENTS, __S3_DEFAULT_CLIENT

if not __S3_CLIENTS:
verify = True
if "ROK4_SSL_NO_VERIFY" in os.environ and os.environ["ROK4_SSL_NO_VERIFY"] != "":
verify = False

# C'est la première fois qu'on cherche à utiliser le stockage S3, chargeons les informations depuis les variables d'environnement
try:
keys = os.environ["ROK4_S3_KEY"].split(",")
Expand All @@ -109,7 +121,9 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s
"s3",
aws_access_key_id=keys[i],
aws_secret_access_key=secret_keys[i],
verify=verify,
endpoint_url=urls[i],
config=botocore.config.Config(tcp_keepalive=True, max_pool_connections=10),
),
"key": keys[i],
"secret_key": secret_keys[i],
Expand Down Expand Up @@ -271,6 +285,7 @@ def get_data_str(path: str) -> str:
MissingEnvironmentError: Missing object storage informations
StorageError: Storage read issue
FileNotFoundError: File or object does not exist
NotImplementedError: Storage type not handled

Returns:
str: Data content
Expand All @@ -279,17 +294,20 @@ def get_data_str(path: str) -> str:
return get_data_binary(path).decode("utf-8")


def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
"""Load data into a binary string
@lru_cache(maxsize=50)
def __get_cached_data_binary(path: str, ttl_hash: int, range: Tuple[int, int] = None) -> str:
"""Load data into a binary string, using a LRU cache

Args:
path (str): path to data
ttl_hash (int): time hash, to invalid cache
range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None.

Raises:
MissingEnvironmentError: Missing object storage informations
StorageError: Storage read issue
FileNotFoundError: File or object does not exist
NotImplementedError: Storage type not handled

Returns:
str: Data binary content
Expand Down Expand Up @@ -329,7 +347,7 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
except Exception as e:
raise StorageError("S3", e)

elif storage_type == StorageType.CEPH:
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
ioctx = __get_ceph_ioctx(tray_name)

try:
Expand Down Expand Up @@ -372,14 +390,35 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
except Exception as e:
raise StorageError(storage_type.name, e)
else:
raise NotImplementedError
raise NotImplementedError("Cannot get partial data for storage type HTTP(S)")

else:
raise StorageError("UNKNOWN", "Unhandled storage type to read binary data")
raise NotImplementedError(f"Cannot get data for storage type {storage_type.name}")

return data


def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
"""Load data into a binary string

This function uses a LRU cache, with a TTL of 5 minutes

Args:
path (str): path to data
range (Tuple[int, int], optional): offset and size, to make a partial read. Defaults to None.

Raises:
MissingEnvironmentError: Missing object storage informations
StorageError: Storage read issue
FileNotFoundError: File or object does not exist
NotImplementedError: Storage type not handled

Returns:
str: Data binary content
"""
return __get_cached_data_binary(path, __get_ttl_hash(), range)


def put_data_str(data: str, path: str) -> None:
"""Store string data into a file or an object

Expand All @@ -392,6 +431,7 @@ def put_data_str(data: str, path: str) -> None:
Raises:
MissingEnvironmentError: Missing object storage informations
StorageError: Storage write issue
NotImplementedError: Storage type not handled
"""

storage_type, path, tray_name, base_name = get_infos_from_path(path)
Expand All @@ -406,7 +446,7 @@ def put_data_str(data: str, path: str) -> None:
except Exception as e:
raise StorageError("S3", e)

elif storage_type == StorageType.CEPH:
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
ioctx = __get_ceph_ioctx(tray_name)

try:
Expand All @@ -423,7 +463,7 @@ def put_data_str(data: str, path: str) -> None:
raise StorageError("FILE", e)

else:
raise StorageError("UNKNOWN", "Unhandled storage type to write string data")
raise NotImplementedError(f"Cannot write data for storage type {storage_type.name}")


def get_size(path: str) -> int:
Expand All @@ -435,6 +475,7 @@ def get_size(path: str) -> int:
Raises:
MissingEnvironmentError: Missing object storage informations
StorageError: Storage read issue
NotImplementedError: Storage type not handled

Returns:
int: file/object size, in bytes
Expand All @@ -453,7 +494,7 @@ def get_size(path: str) -> int:
except Exception as e:
raise StorageError("S3", e)

elif storage_type == StorageType.CEPH:
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
ioctx = __get_ceph_ioctx(tray_name)

try:
Expand All @@ -478,7 +519,7 @@ def get_size(path: str) -> int:
raise StorageError(storage_type.name, e)

else:
raise StorageError("UNKNOWN", "Unhandled storage type to get size")
raise NotImplementedError(f"Cannot get size for storage type {storage_type.name}")


def exists(path: str) -> bool:
Expand All @@ -490,6 +531,7 @@ def exists(path: str) -> bool:
Raises:
MissingEnvironmentError: Missing object storage informations
StorageError: Storage read issue
NotImplementedError: Storage type not handled

Returns:
bool: file/object existing status
Expand All @@ -509,7 +551,7 @@ def exists(path: str) -> bool:
else:
raise StorageError("S3", e)

elif storage_type == StorageType.CEPH:
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
ioctx = __get_ceph_ioctx(tray_name)

try:
Expand All @@ -534,7 +576,7 @@ def exists(path: str) -> bool:
raise StorageError(storage_type.name, e)

else:
raise StorageError("UNKNOWN", "Unhandled storage type to test if exists")
raise NotImplementedError(f"Cannot test existence for storage type {storage_type.name}")


def remove(path: str) -> None:
Expand All @@ -546,6 +588,7 @@ def remove(path: str) -> None:
Raises:
MissingEnvironmentError: Missing object storage informations
StorageError: Storage removal issue
NotImplementedError: Storage type not handled
"""
storage_type, path, tray_name, base_name = get_infos_from_path(path)

Expand All @@ -557,7 +600,7 @@ def remove(path: str) -> None:
except Exception as e:
raise StorageError("S3", e)

elif storage_type == StorageType.CEPH:
elif storage_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
ioctx = __get_ceph_ioctx(tray_name)

try:
Expand All @@ -576,7 +619,7 @@ def remove(path: str) -> None:
raise StorageError("FILE", e)

else:
raise StorageError("UNKNOWN", "Unhandled storage type to remove things")
raise NotImplementedError(f"Cannot remove data for storage type {storage_type.name}")


def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
Expand All @@ -588,8 +631,9 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
from_md5 (str, optional): MD5 sum, re-processed after copy and controlled. Defaults to None.

Raises:
StorageError: Unhandled copy or copy issue
StorageError: Copy issue
MissingEnvironmentError: Missing object storage informations
NotImplementedError: Storage type not handled
"""

from_type, from_path, from_tray, from_base_name = get_infos_from_path(from_path)
Expand Down Expand Up @@ -687,7 +731,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
except Exception as e:
raise StorageError("S3", f"Cannot copy S3 object {from_path} to {to_path} : {e}")

elif from_type == StorageType.CEPH and to_type == StorageType.FILE:
elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.FILE:
ioctx = __get_ceph_ioctx(from_tray)

if from_md5 is not None:
Expand Down Expand Up @@ -726,7 +770,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
"CEPH and FILE", f"Cannot copy CEPH object {from_path} to file {to_path} : {e}"
)

elif from_type == StorageType.FILE and to_type == StorageType.CEPH:
elif from_type == StorageType.FILE and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
ioctx = __get_ceph_ioctx(to_tray)

if from_md5 is not None:
Expand Down Expand Up @@ -763,7 +807,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
"FILE and CEPH", f"Cannot copy file {from_path} to CEPH object {to_path} : {e}"
)

elif from_type == StorageType.CEPH and to_type == StorageType.CEPH:
elif from_type == StorageType.CEPH and to_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
from_ioctx = __get_ceph_ioctx(from_tray)
to_ioctx = __get_ceph_ioctx(to_tray)

Expand Down Expand Up @@ -795,7 +839,7 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
except Exception as e:
raise StorageError("CEPH", f"Cannot copy CEPH object {from_path} to {to_path} : {e}")

elif from_type == StorageType.CEPH and to_type == StorageType.S3:
elif from_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE and to_type == StorageType.S3:
from_ioctx = __get_ceph_ioctx(from_tray)

s3_client, to_bucket = __get_s3_client(to_tray)
Expand Down Expand Up @@ -853,8 +897,10 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
)

elif (
from_type == StorageType.HTTP or from_type == StorageType.HTTPS
) and to_type == StorageType.CEPH:
(from_type == StorageType.HTTP or from_type == StorageType.HTTPS)
and to_type == StorageType.CEPH
and CEPH_RADOS_AVAILABLE
):
to_ioctx = __get_ceph_ioctx(to_tray)

try:
Expand Down Expand Up @@ -896,9 +942,8 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
)

else:
raise StorageError(
f"{from_type.name} and {to_type.name}",
f"Cannot copy from {from_type.name} to {to_type.name}",
raise NotImplementedError(
f"Cannot copy data from storage type {from_type.name} to storage type {to_type.name}"
)


Expand All @@ -911,8 +956,9 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None:
hard (bool, optional): hard link rather than symbolic. Only for FILE storage. Defaults to False.

Raises:
StorageError: Unhandled link or link issue
StorageError: link issue
MissingEnvironmentError: Missing object storage informations
NotImplementedError: Storage type not handled
"""

target_type, target_path, target_tray, target_base_name = get_infos_from_path(target_path)
Expand Down Expand Up @@ -947,7 +993,7 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None:
except Exception as e:
raise StorageError("S3", e)

elif target_type == StorageType.CEPH:
elif target_type == StorageType.CEPH and CEPH_RADOS_AVAILABLE:
ioctx = __get_ceph_ioctx(link_tray)

try:
Expand All @@ -965,7 +1011,7 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None:
raise StorageError("FILE", e)

else:
raise StorageError("UNKNOWN", "Unhandled storage type to make link")
raise NotImplementedError(f"Cannot make link for storage type {target_type.name}")


def get_osgeo_path(path: str) -> str:
Expand Down Expand Up @@ -1013,6 +1059,7 @@ def size_path(path: str) -> int:
Raises:
StorageError: Unhandled link or link issue
MissingEnvironmentError: Missing object storage informations
NotImplementedError: Storage type not handled

Returns:
int: size of the path
Expand Down Expand Up @@ -1052,9 +1099,9 @@ def size_path(path: str) -> int:
except Exception as e:
raise StorageError("S3", e)

elif storage_type == StorageType.CEPH:
raise NotImplementedError
else:
raise StorageError("UNKNOWN", "Unhandled storage type to calculate size")
raise NotImplementedError(
f"Cannot get prefix path size for storage type {storage_type.name}"
)

return total