Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## Summary

## Changelog

<!--
### [Added]

Expand Down
96 changes: 92 additions & 4 deletions src/rok4/Storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

Available storage types are :
- S3 (path are preffixed with `s3://`)
- CEPH (path are preffixed with `ceph://`)
- FILE (path are preffixed with `file://`, but it is the default paths' interpretation)
- CEPH (path are prefixed with `ceph://`)
- FILE (path are prefixed with `file://`, but it is the default paths' interpretation)
- HTTP (path are prefixed with `http://`)
- HTTPS (path are prefixed with `https://`)

According to functions, all storage types are not necessarily available.

Expand Down Expand Up @@ -35,6 +37,7 @@
import os
import rados
import hashlib
import requests
from typing import Dict, List, Tuple, Union
from enum import Enum
from shutil import copyfile
Expand All @@ -49,6 +52,8 @@ class StorageType(Enum):
FILE = "file://"
S3 = "s3://"
CEPH = "ceph://"
HTTP = "http://"
HTTPS = "https://"


__S3_CLIENTS = {}
Expand All @@ -70,6 +75,7 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s
Returns:
Tuple[Dict[str, Union['boto3.client',str]], str, str]: the S3 informations (client, host, key, secret) and the simple bucket name
"""

global __S3_CLIENTS, __S3_DEFAULT_CLIENT

if not __S3_CLIENTS:
Expand Down Expand Up @@ -181,7 +187,6 @@ def __get_ceph_ioctx(pool: str) -> "rados.Ioctx":

def disconnect_ceph_clients() -> None:
"""Clean CEPH clients"""

global __CEPH_CLIENT, __CEPH_IOCTXS
__CEPH_CLIENT = None
__CEPH_IOCTXS = {}
Expand Down Expand Up @@ -213,6 +218,10 @@ def get_infos_from_path(path: str) -> Tuple[StorageType, str, str, str]:
return StorageType.CEPH, path[7:], pool_name, object_name
elif path.startswith("file://"):
return StorageType.FILE, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
elif path.startswith("http://"):
return StorageType.HTTP, path[7:], os.path.dirname(path[7:]), os.path.basename(path[7:])
elif path.startswith("https://"):
return StorageType.HTTPS, path[8:], os.path.dirname(path[8:]), os.path.basename(path[8:])
else:
return StorageType.FILE, path, os.path.dirname(path), os.path.basename(path)

Expand Down Expand Up @@ -285,7 +294,6 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
Returns:
str: Data binary content
"""

storage_type, path, tray_name, base_name = get_infos_from_path(path)

if storage_type == StorageType.S3:
Expand Down Expand Up @@ -354,6 +362,19 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
except Exception as e:
raise StorageError("FILE", e)

elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

if range is None :
try:
reponse = requests.get(f"{storage_type.value}{path}", stream=True)
data = reponse.content
if reponse.status_code == 404 :
raise FileNotFoundError(f"{storage_type.value}{path}")
except Exception as e:
raise StorageError(storage_type.name, e)
else :
raise NotImplementedError

else:
raise StorageError("UNKNOWN", "Unhandled storage type to read binary data")

Expand Down Expand Up @@ -449,6 +470,15 @@ def get_size(path: str) -> int:
except Exception as e:
raise StorageError("FILE", e)

elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

try:
# Le stream=True permet de ne télécharger que le header initialement
reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"]
return reponse
except Exception as e:
raise StorageError(storage_type.name, e)

else:
raise StorageError("UNKNOWN", "Unhandled storage type to get size")

Expand Down Expand Up @@ -495,6 +525,17 @@ def exists(path: str) -> bool:
elif storage_type == StorageType.FILE:
return os.path.exists(path)

elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

try:
response = requests.get(storage_type.value + path, stream=True)
if response.status_code == 200 :
return True
else :
return False
except Exception as e:
raise StorageError(storage_type.name, e)

else:
raise StorageError("UNKNOWN", "Unhandled storage type to test if exists")

Expand Down Expand Up @@ -798,6 +839,53 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
f"CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}"
)

elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.FILE :

try:
response = requests.get(from_type.value + from_path, stream = True)
with open(to_path, "wb") as f:
for chunk in response.iter_content(chunk_size=65536) :
if chunk:
f.write(chunk)

except Exception as e:
raise StorageError(f"HTTP(S) and FILE", f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}")

elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.CEPH :

to_ioctx = __get_ceph_ioctx(to_tray)

try:
response = requests.get(from_type.value + from_path, stream = True)
offset = 0
for chunk in response.iter_content(chunk_size=65536) :
if chunk:
size = len(chunk)
to_ioctx.write(to_base_name, chunk, offset)
offset += size

except Exception as e:
raise StorageError(f"HTTP(S) and CEPH", f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}")

elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.S3 :

to_s3_client, to_bucket = __get_s3_client(to_tray)

try:
response = requests.get(from_type.value + from_path, stream = True)
with tempfile.NamedTemporaryFile("w+b",delete=False) as f:
name_fich = f.name
for chunk in response.iter_content(chunk_size=65536) :
if chunk:
f.write(chunk)

to_s3_client["client"].upload_file(name_fich, to_tray, to_base_name)

os.remove(name_fich)

except Exception as e:
raise StorageError(f"HTTP(S) and S3", f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}")

else:
raise StorageError(
f"{from_type.name} and {to_type.name}",
Expand Down
Loading