diff --git a/src/rok4/Raster.py b/src/rok4/Raster.py index 80c058e..1d7d6b5 100644 --- a/src/rok4/Raster.py +++ b/src/rok4/Raster.py @@ -1,18 +1,20 @@ -"""Provide functions to read information on raster data from file path or object path +"""Provide functions to read information on raster data The module contains the following class : - - 'Raster' - Structure describing raster data. - + - Raster - Structure describing raster data. + - RasterSet - Structure describing a set of raster data. """ +import copy +import json import re from enum import Enum +from typing import Tuple, Dict from osgeo import ogr, gdal -from typing import Tuple -from rok4.Storage import exists, get_osgeo_path +from rok4.Storage import exists, get_osgeo_path, put_data_str from rok4.Utils import ColorFormat, compute_bbox,compute_format # Enable GDAL/OGR exceptions @@ -23,12 +25,18 @@ class Raster: """A structure describing raster data Attributes : - path (str): path to the file/object (ex: file:///path/to/image.tif or s3://bucket/path/to/image.tif) - bbox (Tuple[float, float, float, float]): bounding rectange in the data projection + path (str): path to the file/object (ex: + file:///path/to/image.tif or s3://bucket/path/to/image.tif) + bbox (Tuple[float, float, float, float]): bounding rectange + in the data projection bands (int): number of color bands (or channels) - format (ColorFormat): numeric variable format for color values. Bit depth, as bits per channel, can be derived from it. - mask (str): path to the associated mask file or object, if any, or None (same path as the image, but with a ".msk" extension and TIFF format. ex: file:///path/to/image.msk or s3://bucket/path/to/image.msk) - dimensions (Tuple[int, int]): image width and height expressed in pixels + format (ColorFormat): numeric variable format for color values. + Bit depth, as bits per channel, can be derived from it. + mask (str): path to the associated mask file or object, if any, + or None (same path as the image, but with a ".msk" extension + and TIFF format. ex: + file:///path/to/image.msk or s3://bucket/path/to/image.msk) + dimensions (Tuple[int, int]): image width and height, in pixels """ def __init__(self) -> None: @@ -40,7 +48,7 @@ def __init__(self) -> None: self.path = None @classmethod - def from_file(cls, path: str) -> 'Raster': + def from_file(cls, path: str) -> "Raster": """Creates a Raster object from an image Args: @@ -53,7 +61,9 @@ def from_file(cls, path: str) -> 'Raster': from rok4.Raster import Raster try: - raster = Raster.from_file("file:///data/images/SC1000_TIFF_LAMB93_FXX/SC1000_0040_6150_L93.tif") + raster = Raster.from_file( + "file:///data/SC1000/0040_6150_L93.tif" + ) except Exception as e: print(f"Cannot load information from image : {e}") @@ -65,7 +75,6 @@ def from_file(cls, path: str) -> 'Raster': Returns: Raster: a Raster instance """ - if not exists(path): raise Exception(f"No file or object found at path '{path}'.") @@ -76,15 +85,16 @@ def from_file(cls, path: str) -> 'Raster': image_datasource = gdal.Open(work_image_path) self.path = path - path_pattern = re.compile('(/[^/]+?)[.][a-zA-Z0-9_-]+$') - mask_path = path_pattern.sub('\\1.msk', path) + path_pattern = re.compile("(/[^/]+?)[.][a-zA-Z0-9_-]+$") + mask_path = path_pattern.sub("\\1.msk", path) if exists(mask_path): work_mask_path = get_osgeo_path(mask_path) mask_driver = gdal.IdentifyDriver(work_mask_path).ShortName if 'GTiff' != mask_driver: - raise Exception(f"Mask file '{mask_path}' is not a TIFF image. (GDAL driver : '{mask_driver}'") - + message = (f"Mask file '{mask_path}' is not a TIFF image." + + f" (GDAL driver : '{mask_driver}'") + raise Exception(message) self.mask = mask_path else: self.mask = None @@ -97,28 +107,48 @@ def from_file(cls, path: str) -> 'Raster': return self @classmethod - def from_parameters(cls, path: str, bands: int, bbox: Tuple[float, float, float, float], dimensions: Tuple[int, int], format: ColorFormat, mask: str = None) -> 'Raster': + def from_parameters( + cls, path: str, bands: int, bbox: Tuple[float, float, float, float], + dimensions: Tuple[int, int], format: ColorFormat, mask: str = None) -> "Raster": """Creates a Raster object from parameters Args: - path (str): path to the file/object (ex: file:///path/to/image.tif or s3://bucket/path/to/image.tif) + path (str): path to the file/object (ex: + file:///path/to/image.tif or s3://bucket/image.tif) bands (int): number of color bands (or channels) - bbox (Tuple[float, float, float, float]): bounding rectange in the data projection - dimensions (Tuple[int, int]): image width and height expressed in pixels - format (ColorFormat): numeric variable format for color values. Bit depth, as bits per channel, can be derived from it. - mask (str, optionnal): path to the associated mask file or object, if any, or None (same path as the image, but with a ".msk" extension and TIFF format. ex: file:///path/to/image.msk or s3://bucket/path/to/image.msk) + bbox (Tuple[float, float, float, float]): bounding rectange + in the data projection + dimensions (Tuple[int, int]): image width and height + expressed in pixels + format (ColorFormat): numeric format for color values. + Bit depth, as bits per channel, can be derived from it. + mask (str, optionnal): path to the associated mask, if any, + or None (same path as the image, but with a + ".msk" extension and TIFF format. ex: + file:///path/to/image.msk or s3://bucket/image.msk) Examples: - Loading informations from parameters, related to a TIFF main image coupled to a TIFF mask image + Loading informations from parameters, related to + a TIFF main image coupled to a TIFF mask image from rok4.Raster import Raster try: - raster = Raster.from_parameters(path="file:///data/images/SC1000_TIFF_LAMB93_FXX/SC1000_0040_6150_L93.tif", mask="file:///data/images/SC1000_TIFF_LAMB93_FXX/SC1000_0040_6150_L93.msk", bands=3, format=ColorFormat.UINT8, dimensions=(2000, 2000), bbox=(40000.000, 5950000.000, 240000.000, 6150000.000)) + raster = Raster.from_parameters( + path="file:///data/SC1000/_0040_6150_L93.tif", + mask="file:///data/SC1000/0040_6150_L93.msk", + bands=3, + format=ColorFormat.UINT8, + dimensions=(2000, 2000), + bbox=(40000.000, 5950000.000, + 240000.000, 6150000.000) + ) except Exception as e: - print(f"Cannot load information from parameters : {e}") + print( + f"Cannot load information from parameters : {e}" + ) Raises: KeyError: a mandatory argument is missing @@ -126,7 +156,6 @@ def from_parameters(cls, path: str, bands: int, bbox: Tuple[float, float, float, Returns: Raster: a Raster instance """ - self = cls() self.path = path @@ -135,5 +164,192 @@ def from_parameters(cls, path: str, bands: int, bbox: Tuple[float, float, float, self.dimensions = dimensions self.format = format self.mask = mask + return self + + +class RasterSet: + """A structure describing a set of raster data + + Attributes : + raster_list (List[Raster]): List of Raster instances in the set + colors (List[Dict]): List of color properties for each raster + instance. Contains only one element if + the set is homogenous. + Element properties: + bands (int): number of color bands (or channels) + format (ColorFormat): numeric variable format for + color values. Bit depth, as bits per channel, + can be derived from it. + srs (str): Name of the set's spatial reference system + bbox (Tuple[float, float, float, float]): bounding rectange + in the data projection, enclosing the whole set + """ + + def __init__(self) -> None: + self.bbox = (None, None, None, None) + self.colors = [] + self.raster_list = [] + self.srs = None + + @classmethod + def from_list(cls, path: str, srs: str) -> "RasterSet": + """Instanciate a RasterSet from an images list path and a srs + + Args: + path (str): path to the images list file or object + (each line in this list contains the path to + an image file or object in the set) + + Examples: + + Loading informations from a file stored list + + from rok4.Raster import RasterSet + + try: + raster_set = RasterSet.from_list( + path="file:///data/SC1000.list", + srs="EPSG:3857" + ) + + except Exception as e: + print( + f"Cannot load information from list file : {e}" + ) + + Raises: + RuntimeError: raised by OGR/GDAL if anything goes wrong + NotImplementedError: Storage type not handled + + Returns: + RasterSet: a RasterSet instance + """ + self = cls() + self.srs = srs + + local_list_path = get_osgeo_path(path) + image_list = [] + with open(file=local_list_path, mode="r") as list_file: + for line in list_file: + image_path = line.strip(" \t\n\r") + image_list.append(image_path) + + temp_bbox = [None, None, None, None] + for image_path in image_list: + raster = Raster.from_file(image_path) + self.raster_list.append(raster) + if temp_bbox == [None, None, None, None]: + for i in range(0, 4, 1): + temp_bbox[i] = raster.bbox[i] + else: + if temp_bbox[0] > raster.bbox[0]: + temp_bbox[0] = raster.bbox[0] + if temp_bbox[1] > raster.bbox[1]: + temp_bbox[1] = raster.bbox[1] + if temp_bbox[2] < raster.bbox[2]: + temp_bbox[2] = raster.bbox[2] + if temp_bbox[3] < raster.bbox[3]: + temp_bbox[3] = raster.bbox[3] + color_dict = {"bands": raster.bands, "format": raster.format} + if color_dict not in self.colors: + self.colors.append(color_dict) + self.bbox = tuple(temp_bbox) + return self + + @classmethod + def from_descriptor(cls, path: str) -> "RasterSet": + """Creates a RasterSet object from a descriptor file or object + + Args: + path (str): path to the descriptor file or object + + Examples: + + Loading informations from a file stored descriptor + + from rok4.Raster import RasterSet + + try: + raster_set = RasterSet.from_descriptor( + "file:///data/images/descriptor.json" + ) + + except Exception as e: + message = ("Cannot load information from " + + f"descriptor file : {e}") + print(message) + Raises: + RuntimeError: raised by OGR/GDAL if anything goes wrong + NotImplementedError: Storage type not handled + + Returns: + RasterSet: a RasterSet instance + """ + self = cls() + descriptor_path = get_osgeo_path(path) + with open(file=descriptor_path, mode="r") as file_handle: + raw_content = file_handle.read() + serialization = json.loads(raw_content) + self.srs = serialization["srs"] + self.raster_list = [] + for raster_dict in serialization["raster_list"]: + parameters = copy.deepcopy(raster_dict) + parameters["bbox"] = tuple(raster_dict["bbox"]) + parameters["dimensions"] = tuple(raster_dict["dimensions"]) + parameters["format"] = ColorFormat[ raster_dict["format"] ] + self.raster_list.append(Raster.from_parameters(**parameters)) + self.bbox = tuple(serialization["bbox"]) + self.colors = [] + for color_dict in serialization["colors"]: + color_item = copy.deepcopy(color_dict) + color_item["format"] = ColorFormat[ color_dict["format"] ] + self.colors.append(color_item) return self + + @property + def serializable(self) -> Dict: + """Get the dict version of the raster set, descriptor compliant + + Returns: + Dict: descriptor structured object description + """ + serialization = { + "bbox": list(self.bbox), + "srs": self.srs, + "colors": [], + "raster_list" : [] + } + for color in self.colors: + color_serial = { + "bands": color["bands"], + "format": color["format"].name + } + serialization["colors"].append(color_serial) + for raster in self.raster_list: + raster_dict = { + "path": raster.path, + "dimensions": list(raster.dimensions), + "bbox": list(raster.bbox), + "bands": raster.bands, + "format": raster.format.name + } + if raster.mask is not None: + raster_dict["mask"] = raster.mask + serialization["raster_list"].append(raster_dict) + return serialization + + def write_descriptor(self, path: str = None) -> None: + """Print raster set's descriptor as JSON + + Args: + path (str, optional): Complete path (file or object) + where to print the raster set's JSON. Defaults to None, + JSON is printed to standard output. + """ + content = json.dumps(self.serializable, sort_keys=True) + if path is None: + print(content) + else: + put_data_str(content, path) + diff --git a/src/rok4/Utils.py b/src/rok4/Utils.py index 0e4c480..f31f872 100644 --- a/src/rok4/Utils.py +++ b/src/rok4/Utils.py @@ -15,7 +15,9 @@ class ColorFormat(Enum): """A color format enumeration. - Except from "BIT", the member's name matches a common variable format name. The member's value is the allocated bit size associated to this format. + Except from "BIT", the member's name matches + a common variable format name. The member's value is + the allocated bit size associated to this format. """ BIT = 1 UINT8 = 8 @@ -171,11 +173,12 @@ def reproject_point(point: Tuple[float, float], sr_src: 'osgeo.osr.SpatialRefere return (x_dst, y_dst) -def compute_bbox(source_dataset: gdal.Dataset) -> tuple: +def compute_bbox(source_dataset: gdal.Dataset) -> Tuple: """Image boundingbox computing method Args: - source_dataset (gdal.Dataset): Dataset object created from the raster image + source_dataset (gdal.Dataset): Dataset instanciated + from the raster image Limitations: Image's axis must be parallel to SRS' axis @@ -184,64 +187,59 @@ def compute_bbox(source_dataset: gdal.Dataset) -> tuple: AttributeError: source_dataset is not a gdal.Dataset instance. Exception: The dataset does not contain transform data. """ - bbox = None transform_vector = source_dataset.GetGeoTransform() - if transform_vector is None: - raise Exception(f"No transform vector found in the dataset created from the following file : {source_dataset.GetFileList()[0]}") - + raise Exception( + f"No transform vector found in the dataset created from " + + f"the following file : {source_dataset.GetFileList()[0]}" + ) width = source_dataset.RasterXSize height = source_dataset.RasterYSize - x_range = ( transform_vector[0], transform_vector[0] + width * transform_vector[1] + height * transform_vector[2] ) - y_range = ( transform_vector[3], transform_vector[3] + width * transform_vector[4] + height * transform_vector[5] ) - spatial_ref = source_dataset.GetSpatialRef() if spatial_ref is not None and spatial_ref.GetDataAxisToSRSAxisMapping() == [2, 1]: - # Coordonnées terrain de type (latitude, longitude) => on permute les coordonnées terrain par rapport à l'image + # Coordonnées terrain de type (latitude, longitude) + # => on permute les coordonnées terrain par rapport à l'image bbox = ( min(y_range), min(x_range), max(y_range), max(x_range) ) - elif spatial_ref is None or spatial_ref.GetDataAxisToSRSAxisMapping() == [1, 2]: - # Coordonnées terrain de type (longitude, latitude) ou pas de SRS => les coordonnées terrain sont dans le même ordre que celle de l'image + else: + # Coordonnées terrain de type (longitude, latitude) ou pas de SRS + # => les coordonnées terrain sont dans le même ordre que celle de l'image bbox = ( min(x_range), min(y_range), max(x_range), max(y_range) ) - return bbox -def compute_format(dataset: gdal.Dataset, path: str = None) -> 'ColorFormat': +def compute_format(dataset: gdal.Dataset, path: str = None) -> ColorFormat: """Image color format computing method Args: - dataset (gdal.Dataset): Dataset object created from the raster image + dataset (gdal.Dataset): Dataset instanciated from the image path (str, optionnal): path to the original file/object Raises: AttributeError: source_dataset is not a gdal.Dataset instance. - Exception: Image has no color band or its color format is unsupported. + Exception: No color band found or unsupported color format. """ - - format = None - + color_format = None if path is None: path = dataset.GetFileList()[0] - if dataset.RasterCount < 1: raise Exception(f"Image {path} contains no color band.") @@ -252,16 +250,17 @@ def compute_format(dataset: gdal.Dataset, path: str = None) -> 'ColorFormat': color_name = None if color_interpretation is not None: color_name = gdal.GetColorInterpretationName(color_interpretation) - compression_regex_match = re.search(r'COMPRESSION\s*=\s*PACKBITS', gdal.Info(dataset)) - + compression_regex_match = re.search(r"COMPRESSION\s*=\s*PACKBITS", gdal.Info(dataset)) - if data_type_name == "Byte" and data_type_size == 8 and color_name == "Palette" and compression_regex_match: - format = ColorFormat.BIT + if (data_type_name == "Byte" and data_type_size == 8 + and color_name == "Palette" and compression_regex_match): + # Compris par libTIFF comme du noir et blanc sur 1 bit + color_format = ColorFormat.BIT elif data_type_name == "Byte" and data_type_size == 8: - format = ColorFormat.UINT8 + color_format = ColorFormat.UINT8 elif data_type_name == "Float32" and data_type_size == 32: - format = ColorFormat.FLOAT32 + color_format = ColorFormat.FLOAT32 else: - raise Exception(f"Unsupported color format for image {path} : '{data_type_name}' ({data_type_size} bits)") - - return format + raise Exception(f"Unsupported color format for image {path} : " + + f"'{data_type_name}' ({data_type_size} bits)") + return color_format diff --git a/tests/test_Raster.py b/tests/test_Raster.py index 6cc526e..608c3e2 100644 --- a/tests/test_Raster.py +++ b/tests/test_Raster.py @@ -1,17 +1,44 @@ -"""Describes unit tests for the rok4.Raster module.""" +"""Describes unit tests for the rok4.Raster module. -from rok4.Raster import Raster -from rok4.Utils import ColorFormat +There is one test class for each tested functionnality. +See internal docstrings for more information. +Each variable prefixed by "m_" is a mock, or part of it. +""" +import copy import math - +import json +import random import pytest from unittest import mock, TestCase -from unittest.mock import * +from unittest.mock import call, MagicMock, Mock, mock_open, patch + +from rok4.Raster import Raster, RasterSet +from rok4.Utils import ColorFormat + + +# rok4.Raster.Raster class tests + + +class TestRasterInit(TestCase): + """rok4.Raster.Raster default constructor.""" + def test_default(self): + """Default property values.""" + raster = Raster() -class TestFromFile(TestCase): - """Test class for the rok4.Raster.Raster.from_file(path) class constructor.""" + assert raster.bands is None + assert (isinstance(raster.bbox, tuple) and len(raster.bbox) == 4 + and all(coordinate is None for coordinate in raster.bbox)) + assert (isinstance(raster.dimensions, tuple) and len(raster.dimensions) == 2 + and all(dimension is None for dimension in raster.dimensions)) + assert raster.format is None + assert raster.mask is None + assert raster.path is None + + +class TestRasterFromFile(TestCase): + """rok4.Raster.Raster.from_file(path) class constructor.""" def setUp(self): self.source_image_path = "file:///home/user/image.tif" @@ -23,157 +50,470 @@ def setUp(self): return super().setUp() def test_empty(self): - """Test case : Constructor called without the expected path argument.""" - + """Constructor called without the expected path argument.""" with pytest.raises(TypeError): Raster.from_file() - @mock.patch('rok4.Raster.exists', return_value=False) - def test_image_not_found(self, mocked_exists): - """Test case : Constructor called on a path matching no file or object.""" - + @mock.patch("rok4.Raster.exists", return_value=False) + def test_image_not_found(self, m_exists): + """Constructor called on a path matching no file or object.""" with pytest.raises(Exception): Raster.from_file(self.source_image_path) - - mocked_exists.assert_called_once_with(self.source_image_path) - - @mock.patch('rok4.Raster.get_osgeo_path') - @mock.patch('rok4.Raster.compute_format', return_value=ColorFormat.UINT8) - @mock.patch('rok4.Raster.gdal.Open') - @mock.patch('rok4.Raster.compute_bbox') - @mock.patch('rok4.Raster.exists', side_effect=[True, False]) - def test_image(self, mocked_exists, mocked_compute_bbox, mocked_gdal_open, mocked_compute_format, mocked_get_osgeo_path): - """Test case : Constructor called nominally on an image without mask.""" - - mocked_compute_bbox.return_value = self.bbox - mocked_gdal_open.return_value = type('', (object,), {'RasterCount': 3, 'RasterXSize': self.image_size[0], 'RasterYSize': self.image_size[1]}) - mocked_get_osgeo_path.return_value = self.osgeo_image_path - - raster_object = Raster.from_file( self.source_image_path ) - - mocked_exists.assert_has_calls([ call(self.source_image_path), call(self.source_mask_path) ]) - mocked_get_osgeo_path.assert_called_once_with(self.source_image_path) - mocked_gdal_open.assert_called_once_with( self.osgeo_image_path ) - assert raster_object.path == self.source_image_path - assert raster_object.mask is None - - mocked_compute_bbox.assert_called_once() - assert math.isclose(raster_object.bbox[0], self.bbox[0], rel_tol=1e-5) - assert math.isclose(raster_object.bbox[1], self.bbox[1], rel_tol=1e-5) - assert math.isclose(raster_object.bbox[2], self.bbox[2], rel_tol=1e-5) - assert math.isclose(raster_object.bbox[3], self.bbox[3], rel_tol=1e-5) - assert raster_object.bands == 3 - mocked_compute_format.assert_called_once() - assert raster_object.format == ColorFormat.UINT8 - assert raster_object.dimensions == self.image_size - - @mock.patch('rok4.Raster.get_osgeo_path') - @mock.patch('rok4.Raster.compute_format', return_value=ColorFormat.UINT8) - @mock.patch('rok4.Raster.gdal.IdentifyDriver') - @mock.patch('rok4.Raster.gdal.Open') - @mock.patch('rok4.Raster.compute_bbox') - @mock.patch('rok4.Raster.exists', side_effect=[True, True]) - def test_image_and_mask(self, mocked_exists, mocked_compute_bbox, mocked_gdal_open, mocked_identifydriver, mocked_compute_format, mocked_get_osgeo_path): - """Test case : Constructor called nominally on an image with mask.""" - - mocked_compute_bbox.return_value = self.bbox - mocked_gdal_open.return_value = type('', (object,), {'RasterCount': 3, 'RasterXSize': self.image_size[0], 'RasterYSize': self.image_size[1]}) - mocked_get_osgeo_path.side_effect=[self.osgeo_image_path, self.osgeo_mask_path] - # This next line emulates the return of gdal.IdentifyDriver() - mocked_identifydriver.return_value = type('', (object,), {'ShortName': 'GTiff'}) - - raster_object = Raster.from_file(self.source_image_path) - - mocked_exists.assert_has_calls([ call(self.source_image_path), call(self.source_mask_path) ]) - mocked_get_osgeo_path.assert_has_calls([ call(self.source_image_path), call(self.source_mask_path) ]) - mocked_identifydriver.assert_called_once_with(self.osgeo_mask_path) - mocked_gdal_open.assert_called_once_with(self.osgeo_image_path) - assert raster_object.path == self.source_image_path - assert raster_object.mask == self.source_mask_path - - mocked_compute_bbox.assert_called_once() - assert math.isclose(raster_object.bbox[0], self.bbox[0], rel_tol=1e-5) - assert math.isclose(raster_object.bbox[1], self.bbox[1], rel_tol=1e-5) - assert math.isclose(raster_object.bbox[2], self.bbox[2], rel_tol=1e-5) - assert math.isclose(raster_object.bbox[3], self.bbox[3], rel_tol=1e-5) - assert raster_object.bands == 3 - mocked_compute_format.assert_called_once() - assert raster_object.format == ColorFormat.UINT8 - assert raster_object.dimensions == self.image_size - - @mock.patch('rok4.Raster.get_osgeo_path') - @mock.patch('rok4.Raster.gdal.Open', side_effect=RuntimeError) - @mock.patch('rok4.Raster.exists', side_effect=[True, False]) - def test_unsupported_image_format(self, mocked_exists, mocked_gdal_open, mocked_get_osgeo_path): - """Test case : Constructor called on an unsupported 'image' file or object.""" - - mocked_get_osgeo_path.return_value = self.osgeo_image_path + m_exists.assert_called_once_with(self.source_image_path) + + @mock.patch("rok4.Raster.get_osgeo_path") + @mock.patch("rok4.Raster.compute_format", return_value=ColorFormat.UINT8) + @mock.patch("rok4.Raster.gdal.Open") + @mock.patch("rok4.Raster.compute_bbox") + @mock.patch("rok4.Raster.exists", side_effect=[True, False]) + def test_image(self, m_exists, m_compute_bbox, m_gdal_open, m_compute_format, + m_get_osgeo_path): + """Constructor called nominally on an image without mask.""" + m_compute_bbox.return_value = self.bbox + m_dataset_properties = {"RasterCount": 3, "RasterXSize": self.image_size[0], + "RasterYSize": self.image_size[1]} + m_gdal_open.return_value = type("", (object,), m_dataset_properties) + m_get_osgeo_path.return_value = self.osgeo_image_path + + raster = Raster.from_file(self.source_image_path) + + m_exists.assert_has_calls([call(self.source_image_path), call(self.source_mask_path)]) + m_get_osgeo_path.assert_called_once_with(self.source_image_path) + m_gdal_open.assert_called_once_with(self.osgeo_image_path) + assert raster.path == self.source_image_path + assert raster.mask is None + m_compute_bbox.assert_called_once() + assert (isinstance(raster.bbox, tuple) and len(raster.bbox) == 4 + and math.isclose(raster.bbox[0], self.bbox[0], rel_tol=1e-5) + and math.isclose(raster.bbox[1], self.bbox[1], rel_tol=1e-5) + and math.isclose(raster.bbox[2], self.bbox[2], rel_tol=1e-5) + and math.isclose(raster.bbox[3], self.bbox[3], rel_tol=1e-5)) + assert raster.bands == 3 + m_compute_format.assert_called_once() + assert raster.format == ColorFormat.UINT8 + assert raster.dimensions == self.image_size + + @mock.patch("rok4.Raster.get_osgeo_path") + @mock.patch("rok4.Raster.compute_format", return_value=ColorFormat.UINT8) + @mock.patch("rok4.Raster.gdal.IdentifyDriver") + @mock.patch("rok4.Raster.gdal.Open") + @mock.patch("rok4.Raster.compute_bbox") + @mock.patch("rok4.Raster.exists", side_effect=[True, True]) + def test_image_and_mask(self, m_exists, m_compute_bbox, m_gdal_open, m_identifydriver, + m_compute_format, m_get_osgeo_path): + """Constructor called nominally on an image with mask.""" + m_compute_bbox.return_value = self.bbox + m_dataset_properties = {"RasterCount": 3, "RasterXSize": self.image_size[0], + "RasterYSize": self.image_size[1]} + m_gdal_open.return_value = type("", (object,), m_dataset_properties) + m_get_osgeo_path.side_effect=[self.osgeo_image_path, self.osgeo_mask_path] + m_identifydriver.return_value = type("", (object,), {"ShortName": "GTiff"}) + + raster = Raster.from_file(self.source_image_path) + + m_exists.assert_has_calls([call(self.source_image_path), call(self.source_mask_path)]) + m_get_osgeo_path.assert_has_calls([call(self.source_image_path), + call(self.source_mask_path)]) + m_identifydriver.assert_called_once_with(self.osgeo_mask_path) + m_gdal_open.assert_called_once_with(self.osgeo_image_path) + assert raster.path == self.source_image_path + assert raster.mask == self.source_mask_path + m_compute_bbox.assert_called_once() + assert (isinstance(raster.bbox, tuple) and len(raster.bbox) == 4 + and math.isclose(raster.bbox[0], self.bbox[0], rel_tol=1e-5) + and math.isclose(raster.bbox[1], self.bbox[1], rel_tol=1e-5) + and math.isclose(raster.bbox[2], self.bbox[2], rel_tol=1e-5) + and math.isclose(raster.bbox[3], self.bbox[3], rel_tol=1e-5)) + assert raster.bands == 3 + m_compute_format.assert_called_once() + assert raster.format == ColorFormat.UINT8 + assert raster.dimensions == self.image_size + + @mock.patch("rok4.Raster.get_osgeo_path") + @mock.patch("rok4.Raster.gdal.Open", side_effect=RuntimeError) + @mock.patch("rok4.Raster.exists", side_effect=[True, False]) + def test_unsupported_image_format(self, m_exists, m_gdal_open, m_get_osgeo_path): + """Test case : Constructor called on an unsupported image file or object.""" + m_get_osgeo_path.return_value = self.osgeo_image_path with pytest.raises(RuntimeError): Raster.from_file(self.source_image_path) - mocked_exists.assert_called_once_with(self.source_image_path) - mocked_get_osgeo_path.assert_called_once_with(self.source_image_path) - mocked_gdal_open.assert_called_once_with(self.osgeo_image_path) + m_exists.assert_called_once_with(self.source_image_path) + m_get_osgeo_path.assert_called_once_with(self.source_image_path) + m_gdal_open.assert_called_once_with(self.osgeo_image_path) - @mock.patch('rok4.Raster.get_osgeo_path') - @mock.patch('rok4.Raster.gdal.IdentifyDriver') - @mock.patch('rok4.Raster.gdal.Open', side_effect=None) - @mock.patch('rok4.Raster.exists', side_effect=[True, True]) - def test_unsupported_mask_format(self, mocked_exists, mocked_gdal_open, mocked_identifydriver, mocked_get_osgeo_path): + @mock.patch("rok4.Raster.get_osgeo_path") + @mock.patch("rok4.Raster.gdal.IdentifyDriver") + @mock.patch("rok4.Raster.gdal.Open", side_effect=None) + @mock.patch("rok4.Raster.exists", side_effect=[True, True]) + def test_unsupported_mask_format(self, m_exists, m_gdal_open, m_identifydriver, + m_get_osgeo_path): """Test case : Constructor called on an unsupported mask file or object.""" - - mocked_get_osgeo_path.side_effect=[self.osgeo_image_path, self.osgeo_mask_path] - # This next line emulates the return of gdal.IdentifyDriver() - mocked_identifydriver.return_value = type('', (object,), {'ShortName': 'JPG'}) + m_get_osgeo_path.side_effect=[self.osgeo_image_path, self.osgeo_mask_path] + m_identifydriver.return_value = type("", (object,), {"ShortName": "JPG"}) with pytest.raises(Exception): Raster.from_file(self.source_image_path) - mocked_exists.assert_has_calls([ call(self.source_image_path), call(self.source_mask_path) ]) - mocked_get_osgeo_path.assert_has_calls([ call(self.source_image_path), call(self.source_mask_path) ]) - mocked_identifydriver.assert_called_once_with(self.osgeo_mask_path) - mocked_gdal_open.assert_called_once_with(self.osgeo_image_path) + m_exists.assert_has_calls([call(self.source_image_path), call(self.source_mask_path)]) + m_get_osgeo_path.assert_has_calls([call(self.source_image_path), + call(self.source_mask_path)]) + m_identifydriver.assert_called_once_with(self.osgeo_mask_path) + m_gdal_open.assert_called_once_with(self.osgeo_image_path) -class TestFromParameters(TestCase): - """Test class for the rok4.Raster.Raster.from_parameters(**kwargs) class constructor.""" +class TestRasterFromParameters(TestCase): + """rok4.Raster.Raster.from_parameters(**kwargs) class constructor.""" def test_image(self): - i_path = "file:///path/to/image.tif" - i_bbox = (-5.4, 41.3, 9.8, 51.3) - i_bands = 4 - i_format = ColorFormat.UINT8 - i_dimensions = (1920, 1080) - - result = Raster.from_parameters(path=i_path, bbox=i_bbox, bands=i_bands, format=i_format, dimensions=i_dimensions) - - assert result.path == i_path - assert math.isclose(result.bbox[0], i_bbox[0], rel_tol=1e-5) - assert math.isclose(result.bbox[1], i_bbox[1], rel_tol=1e-5) - assert math.isclose(result.bbox[2], i_bbox[2], rel_tol=1e-5) - assert math.isclose(result.bbox[3], i_bbox[3], rel_tol=1e-5) - assert result.bands == i_bands - assert result.format == i_format - assert result.dimensions == i_dimensions - assert result.mask is None + """Parameters describing an image without mask""" + parameters = { + "bands": 4, + "bbox": (-5.4, 41.3, 9.8, 51.3), + "dimensions": (1920, 1080), + "format": ColorFormat.UINT8, + "path": "file:///path/to/image.tif" + } + + raster = Raster.from_parameters(**parameters) + + assert raster.path == parameters["path"] + assert (isinstance(raster.bbox, tuple) and len(raster.bbox) == 4 + and math.isclose(raster.bbox[0], parameters["bbox"][0], rel_tol=1e-5) + and math.isclose(raster.bbox[1], parameters["bbox"][1], rel_tol=1e-5) + and math.isclose(raster.bbox[2], parameters["bbox"][2], rel_tol=1e-5) + and math.isclose(raster.bbox[3], parameters["bbox"][3], rel_tol=1e-5)) + assert raster.bands == parameters["bands"] + assert raster.format == parameters["format"] + assert raster.dimensions == parameters["dimensions"] + assert raster.mask is None def test_image_and_mask(self): - i_path = "file:///path/to/image.tif" - i_mask = "file:///path/to/image.msk" - i_bbox = (-5.4, 41.3, 9.8, 51.3) - i_bands = 4 - i_format = ColorFormat.UINT8 - i_dimensions = (1920, 1080) - - result = Raster.from_parameters(path=i_path, bbox=i_bbox, bands=i_bands, format=i_format, dimensions=i_dimensions, mask=i_mask) - - assert result.path == i_path - assert math.isclose(result.bbox[0], i_bbox[0], rel_tol=1e-5) - assert math.isclose(result.bbox[1], i_bbox[1], rel_tol=1e-5) - assert math.isclose(result.bbox[2], i_bbox[2], rel_tol=1e-5) - assert math.isclose(result.bbox[3], i_bbox[3], rel_tol=1e-5) - assert result.bands == i_bands - assert result.format == i_format - assert result.dimensions == i_dimensions - assert result.mask == i_mask + + """Parameters describing an image with mask""" + parameters = { + "bands": 4, + "bbox": (-5.4, 41.3, 9.8, 51.3), + "dimensions": (1920, 1080), + "format": ColorFormat.UINT8, + "mask": "file:///path/to/image.msk", + "path": "file:///path/to/image.tif" + } + + raster = Raster.from_parameters(**parameters) + + assert raster.path == parameters["path"] + assert (isinstance(raster.bbox, tuple) and len(raster.bbox) == 4 + and math.isclose(raster.bbox[0], parameters["bbox"][0], rel_tol=1e-5) + and math.isclose(raster.bbox[1], parameters["bbox"][1], rel_tol=1e-5) + and math.isclose(raster.bbox[2], parameters["bbox"][2], rel_tol=1e-5) + and math.isclose(raster.bbox[3], parameters["bbox"][3], rel_tol=1e-5)) + assert raster.bands == parameters["bands"] + assert raster.format == parameters["format"] + assert raster.dimensions == parameters["dimensions"] + assert raster.mask == parameters["mask"] + + +# rok4.Raster.RasterSet class tests + + +class TestRasterSetInit(TestCase): + """rok4.Raster.RasterSet default constructor.""" + + def test_default(self): + """Default property values.""" + rasterset = RasterSet() + + assert (isinstance(rasterset.bbox, tuple) and len(rasterset.bbox) == 4 + and all(coordinate is None for coordinate in rasterset.bbox)) + assert (isinstance(rasterset.colors, list) and not rasterset.colors) + assert (isinstance(rasterset.raster_list, list) and not rasterset.raster_list) + assert rasterset.srs is None + + +class TestRasterSetFromList(TestCase): + """rok4.Raster.RasterSet.from_list(path, srs) class constructor.""" + + @mock.patch("rok4.Raster.get_osgeo_path") + @mock.patch("rok4.Raster.Raster.from_file") + def test_ok_at_least_3_files(self, m_from_file, m_get_osgeo_path): + """List of 3 or more valid image files""" + file_number = random.randint(3, 100) + file_list = [] + for n in range(0, file_number, 1): + file_list.append(f"s3://test_bucket/image_{n+1}.tif") + file_list_string = "\n".join(file_list) + m_open = mock_open(read_data = file_list_string) + list_path = "s3://test_bucket/raster_set.list" + list_local_path = "/tmp/raster_set.list" + m_get_osgeo_path.return_value = list_local_path + raster_list = [] + colors = [] + serial_in = { + "raster_list": [], + "colors": [] + } + for n in range(0, file_number, 1): + raster = MagicMock(Raster) + raster.path = file_list[n] + raster.bbox = ( + -0.75 + math.floor(n/3), + -1.33 + n - 3 * math.floor(n/3), + 0.25 + math.floor(n/3), + -0.33 + n - 3 * math.floor(n/3) + ) + raster.format = random.choice([ColorFormat.BIT, ColorFormat.UINT8, + ColorFormat.FLOAT32]) + if raster.format == ColorFormat.BIT: + raster.bands = 1 + else: + raster.bands = random.randint(1, 4) + if random.randint(0, 1) == 1: + raster.mask = raster.path.replace(".tif", ".msk") + else: + raster.mask = None + color_dict = {"bands": raster.bands, "format": raster.format} + if color_dict not in colors: + colors.append(color_dict) + serial_in["colors"].append({"bands": raster.bands, + "format": raster.format.name}) + raster.dimensions = (5000, 5000) + raster_list.append(raster) + raster_serial = {"path": raster.path, "bands": raster.bands, + "format": raster.format.name, "bbox": list(raster.bbox), + "dimensions": list(raster.dimensions)} + if raster.mask: + raster_serial["mask"] = raster.mask + serial_in["raster_list"].append(raster_serial) + m_from_file.side_effect = raster_list + srs = "EPSG:4326" + serial_in["srs"] = srs + bbox = ( + -0.75, + -1.33, + 0.25 + math.floor((file_number-1)/3), + 1.67 + ) + serial_in["bbox"] = list(bbox) + + with mock.patch("rok4.Raster.open", m_open): + rasterset = RasterSet.from_list(list_path, srs) + + serial_out = rasterset.serializable + assert rasterset.srs == srs + m_get_osgeo_path.assert_called_once_with(list_path) + m_open.assert_called_once_with(file=list_local_path, mode="r") + assert rasterset.raster_list == raster_list + assert isinstance(serial_out["bbox"], list) + for i in range(0, 4, 1): + assert math.isclose(rasterset.bbox[i], bbox[i], rel_tol=1e-5) + assert math.isclose(serial_out["bbox"][i], serial_in["bbox"][i], rel_tol=1e-5) + assert len(rasterset.colors) > 0 + assert rasterset.colors == colors + for key in serial_in.keys(): + if key != "bbox": + assert serial_out[key] == serial_in[key] + assert isinstance(serial_out["bbox"], list) + + +class TestRasterSetFromDescriptor(TestCase): + """rok4.Raster.RasterSet.from_descriptor(path) class constructor.""" + + @mock.patch("rok4.Raster.get_osgeo_path") + @mock.patch("rok4.Raster.Raster.from_parameters") + def test_simple_ok(self, m_from_parameters, m_get_osgeo_path): + serial_in = { + "bbox": [550000.000, 6210000.000, 570000.000, 6230000.000], + "colors": [{"bands": 3, "format": "UINT8"}], + "raster_list": [ + { + "bands": 3, + "bbox": [550000.000, 6210000.000, 560000.000, 6220000.000], + "dimensions": [5000, 5000], + "format": "UINT8", + "mask": "file:///path/to/images/550000_6220000.msk", + "path": "file:///path/to/images/550000_6220000.tif" + }, + { + "bands": 3, + "bbox": [560000.000, 6210000.000, 570000.000, 6220000.000], + "dimensions": [5000, 5000], + "format": "UINT8", + "mask": "file:///path/to/images/560000_6220000.msk", + "path": "file:///path/to/images/560000_6220000.tif" + }, + { + "bands": 3, + "bbox": [550000.000, 6220000.000, 560000.000, 6230000.000], + "dimensions": [5000, 5000], + "format": "UINT8", + "mask": "file:///path/to/images/550000_6230000.msk", + "path": "file:///path/to/images/550000_6230000.tif" + } + ], + "srs": "IGNF:LAMB93" + } + desc_path = "file:///path/to/descriptor.json" + local_path = "/path/to/descriptor.json" + desc_content = json.dumps(serial_in) + raster_list = [] + raster_args_list = [] + for raster_dict in serial_in["raster_list"]: + raster_properties = copy.deepcopy(raster_dict) + raster_properties["format"] = ColorFormat[raster_dict["format"]] + raster_properties["bbox"] = tuple(raster_dict["bbox"]) + raster_properties["dimensions"] = tuple(raster_dict["dimensions"]) + + raster = MagicMock(Raster, **raster_properties) + raster_list.append(raster) + raster_args_list.append(raster_properties) + m_from_parameters.side_effect = raster_list + m_get_osgeo_path.return_value = local_path + m_open = mock_open(read_data = desc_content) + + with mock.patch("rok4.Raster.open", m_open): + rasterset = RasterSet.from_descriptor(desc_path) + + m_get_osgeo_path.assert_called_once_with(desc_path) + m_open.assert_called_once_with(file=local_path, mode="r") + assert rasterset.srs == serial_in["srs"] + m_from_parameters.assert_called() + assert m_from_parameters.call_count == 3 + for i in range(0, len(raster_args_list), 1): + assert m_from_parameters.call_args_list[i] == call(**raster_args_list[i]) + assert rasterset.raster_list == raster_list + assert (isinstance(rasterset.bbox, tuple) and len(rasterset.bbox) == 4) + assert isinstance(rasterset.colors, list) and rasterset.colors + for i in range(0, len(serial_in["colors"]), 1): + expected_color = copy.deepcopy(serial_in["colors"][i]) + expected_color["format"] = ColorFormat[serial_in["colors"][i]["format"]] + assert rasterset.colors[i] == expected_color + serial_out = rasterset.serializable + assert (isinstance(serial_out["bbox"], list) + and len(serial_out["bbox"]) == 4) + for i in range(0, 4, 1): + assert math.isclose(rasterset.bbox[i], serial_in["bbox"][i], rel_tol=1e-5) + assert math.isclose(serial_out["bbox"][i], serial_in["bbox"][i], rel_tol=1e-5) + for key in serial_in.keys(): + if key != "bbox": + assert serial_out[key] == serial_in[key] + + +class TestRasterSetWriteDescriptor(TestCase): + """rok4.Raster.RasterSet.write_descriptor(path) class method.""" + + @mock.patch("rok4.Raster.put_data_str") + def test_ok_with_output_path(self, m_put_data_str): + serial_in = { + "bbox": [550000.000, 6210000.000, 570000.000, 6230000.000], + "colors": [{"bands": 3, "format": "UINT8"}], + "raster_list": [ + { + "bands": 3, + "bbox": [550000.000, 6210000.000, 560000.000, 6220000.000], + "dimensions": [5000, 5000], + "format": "UINT8", + "mask": "s3://rok4bucket/images/550000_6220000.msk", + "path": "s3://rok4bucket/images/550000_6220000.tif" + }, + { + "bands": 3, + "bbox": [560000.000, 6210000.000, 570000.000, 6220000.000], + "dimensions": [5000, 5000], + "format": "UINT8", + "mask": "s3://rok4bucket/images/560000_6220000.msk", + "path": "s3://rok4bucket/images/560000_6220000.tif" + }, + { + "bands": 3, + "bbox": [550000.000, 6220000.000, 560000.000, 6230000.000], + "dimensions": [5000, 5000], + "format": "UINT8", + "mask": "s3://rok4bucket/images/550000_6230000.msk", + "path": "s3://rok4bucket/images/550000_6230000.tif" + } + ], + "srs": "IGNF:LAMB93" + } + content = json.dumps(serial_in, sort_keys=True) + path = "s3://rok4bucket/dst_descriptor.json" + rasterset = RasterSet() + rasterset.bbox = tuple(serial_in["bbox"]) + rasterset.srs = serial_in["srs"] + rasterset.colors = [] + for color_dict in serial_in["colors"]: + rasterset.colors.append({"bands": color_dict["bands"], + "format": ColorFormat[color_dict["format"]]}) + rasterset.raster_list = [] + for raster_dict in serial_in["raster_list"]: + raster_args = copy.deepcopy(raster_dict) + raster_args["format"] = ColorFormat[raster_dict["format"]] + raster_args["bbox"] = tuple(raster_dict["bbox"]) + raster_args["dimensions"] = tuple(raster_dict["dimensions"]) + rasterset.raster_list.append(MagicMock(Raster, **raster_args)) + + try: + rasterset.write_descriptor(path) + except Exception as exc: + assert False, f"Writing RasterSet's descriptor raises an exception: {exc}" + + m_put_data_str.assert_called_once_with(content, path) + + + @mock.patch("rok4.Raster.print") + def test_ok_no_output_path(self, m_print): + serial_in = { + "bbox": [550000.000, 6210000.000, 570000.000, 6230000.000], + "colors": [{"bands": 3, "format": "UINT8"}], + "raster_list": [ + { + "bands": 3, + "bbox": [550000.000, 6210000.000, 560000.000, 6220000.000], + "dimensions": [5000, 5000], + "format": "UINT8", + "mask": "s3://rok4bucket/images/550000_6220000.msk", + "path": "s3://rok4bucket/images/550000_6220000.tif" + }, + { + "bands": 3, + "bbox": [560000.000, 6210000.000, 570000.000, 6220000.000], + "dimensions": [5000, 5000], + "format": "UINT8", + "mask": "s3://rok4bucket/images/560000_6220000.msk", + "path": "s3://rok4bucket/images/560000_6220000.tif" + }, + { + "bands": 3, + "bbox": [550000.000, 6220000.000, 560000.000, 6230000.000], + "dimensions": [5000, 5000], + "format": "UINT8", + "mask": "s3://rok4bucket/images/550000_6230000.msk", + "path": "s3://rok4bucket/images/550000_6230000.tif" + } + ], + "srs": "IGNF:LAMB93" + } + content = json.dumps(serial_in, sort_keys=True) + rasterset = RasterSet() + rasterset.bbox = tuple(serial_in["bbox"]) + rasterset.srs = serial_in["srs"] + rasterset.colors = [] + for color_dict in serial_in["colors"]: + rasterset.colors.append({"bands": color_dict["bands"], + "format": ColorFormat[color_dict["format"]]}) + rasterset.raster_list = [] + for raster_dict in serial_in["raster_list"]: + raster_args = copy.deepcopy(raster_dict) + raster_args["format"] = ColorFormat[raster_dict["format"]] + raster_args["bbox"] = tuple(raster_dict["bbox"]) + raster_args["dimensions"] = tuple(raster_dict["dimensions"]) + rasterset.raster_list.append(MagicMock(Raster, **raster_args)) + + try: + rasterset.write_descriptor() + except Exception as exc: + assert False, f"Writing RasterSet's descriptor raises an exception: {exc}" + + m_print.assert_called_once_with(content) +