Skip to content

Module stac_fastapi.opensearch.database_logic

Database logic.

Variables

COLLECTIONS_INDEX
DEFAULT_SORT
ES_COLLECTIONS_MAPPINGS
ES_INDEX_NAME_UNSUPPORTED_CHARS
ES_ITEMS_MAPPINGS
ES_ITEMS_SETTINGS
ES_MAPPINGS_DYNAMIC_TEMPLATES
ITEMS_INDEX_PREFIX
ITEM_INDICES
MAX_LIMIT
NumType
logger

Functions

create_collection_index

def create_collection_index(

) -> None

Create the index for a Collection. The settings of the index template will be used implicitly.

Returns:

Type Description
None None

create_index_templates

def create_index_templates(

) -> None

Create index templates for the Collection and Item indices.

Returns:

Type Description
None None

create_item_index

def create_item_index(
    collection_id: str
)

Create the index for Items. The settings of the index template will be used implicitly.

Parameters:

Name Type Description Default
collection_id str Collection identifier. None

Returns:

Type Description
None None

delete_item_index

def delete_item_index(
    collection_id: str
)

Delete the index for items in a collection.

Parameters:

Name Type Description Default
collection_id str The ID of the collection whose items index will be deleted. None

index_by_collection_id

def index_by_collection_id(
    collection_id: str
) -> str

Translate a collection id into an Elasticsearch index name.

Parameters:

Name Type Description Default
collection_id str The collection id to translate into an index name. None

Returns:

Type Description
str The index name derived from the collection id.

indices

def indices(
    collection_ids: Union[List[str], NoneType]
) -> str

Get a comma-separated string of index names for a given list of collection ids.

Parameters:

Name Type Description Default
collection_ids None A list of collection ids. None

Returns:

Type Description
None A string of comma-separated index names. If collection_ids is None, returns the default indices.

mk_actions

def mk_actions(
    collection_id: str,
    processed_items: List[stac_fastapi.types.stac.Item]
)

Create Elasticsearch bulk actions for a list of processed items.

Parameters:

Name Type Description Default
collection_id str The identifier for the collection the items belong to. None
processed_items List[Item] The list of processed items to be bulk indexed. None

Returns:

Type Description
List[Dict[str, Union[str, Dict]]] The list of bulk actions to be executed,
each action being a dictionary with the following keys:
- _index: the index to store the document in.
- _id: the document's identifier.
- _source: the source of the document.

mk_item_id

def mk_item_id(
    item_id: str,
    collection_id: str
)

Create the document id for an Item in Elasticsearch.

Parameters:

Name Type Description Default
item_id str The id of the Item. None
collection_id str The id of the Collection that the Item belongs to. None

Returns:

Type Description
str The document id for the Item, combining the Item id and the Collection id, separated by a | character.

Classes

DatabaseLogic

class DatabaseLogic(
    item_serializer: Type[stac_fastapi.core.serializers.ItemSerializer] = <class 'stac_fastapi.core.serializers.ItemSerializer'>,
    collection_serializer: Type[stac_fastapi.core.serializers.CollectionSerializer] = <class 'stac_fastapi.core.serializers.CollectionSerializer'>,
    extensions: List[str] = NOTHING
)

Database logic.

Class variables

aggregation_mapping
client
sync_client

Static methods

apply_bbox_filter

def apply_bbox_filter(
    search: opensearchpy.helpers.search.Search,
    bbox: List
)

Filter search results based on bounding box.

Parameters:

Name Type Description Default
search Search The search object to apply the filter to. None
bbox List The bounding box coordinates, represented as a list of four values [minx, miny, maxx, maxy]. None

Returns:

Type Description
None search (Search): The search object with the bounding box filter applied.

apply_collections_filter

def apply_collections_filter(
    search: opensearchpy.helpers.search.Search,
    collection_ids: List[str]
)

Database logic to search a list of STAC collection ids.

apply_cql2_filter

def apply_cql2_filter(
    search: opensearchpy.helpers.search.Search,
    _filter: Union[Dict[str, Any], NoneType]
)

Apply a CQL2 filter to an Opensearch Search object.

This method transforms a dictionary representing a CQL2 filter into an Opensearch query and applies it to the provided Search object. If the filter is None, the original Search object is returned unmodified.

Parameters:

Name Type Description Default
search Search The Opensearch Search object to which the filter will be applied. None
_filter Optional[Dict[str, Any]] The filter in dictionary form that needs to be applied
to the search. The dictionary should follow the structure
required by the to_es function which converts it
to an Opensearch query.
None

Returns:

Type Description
Search The modified Search object with the filter applied if a filter is provided,
otherwise the original Search object.

apply_datetime_filter

def apply_datetime_filter(
    search: opensearchpy.helpers.search.Search,
    datetime_search
)

Apply a filter to search based on datetime field.

Parameters:

Name Type Description Default
search Search The search object to filter. None
datetime_search dict The datetime filter criteria. None

Returns:

Type Description
Search The filtered search object.

apply_free_text_filter

def apply_free_text_filter(
    search: opensearchpy.helpers.search.Search,
    free_text_queries: Union[List[str], NoneType]
)

Database logic to perform query for search endpoint.

apply_ids_filter

def apply_ids_filter(
    search: opensearchpy.helpers.search.Search,
    item_ids: List[str]
)

Database logic to search a list of STAC item ids.

apply_intersects_filter

def apply_intersects_filter(
    search: opensearchpy.helpers.search.Search,
    intersects: stac_fastapi.opensearch.database_logic.Geometry
)

Filter search results based on intersecting geometry.

Parameters:

Name Type Description Default
search Search The search object to apply the filter to. None
intersects Geometry The intersecting geometry, represented as a GeoJSON-like object. None

Returns:

Type Description
None search (Search): The search object with the intersecting geometry filter applied.

apply_stacql_filter

def apply_stacql_filter(
    search: opensearchpy.helpers.search.Search,
    op: str,
    field: str,
    value: float
)

Filter search results based on a comparison between a field and a value.

Parameters:

Name Type Description Default
search Search The search object to apply the filter to. None
op str The comparison operator to use. Can be 'eq' (equal), 'gt' (greater than), 'gte' (greater than or equal),
'lt' (less than), or 'lte' (less than or equal).
None
field str The field to perform the comparison on. None
value float The value to compare the field against. None

Returns:

Type Description
None search (Search): The search object with the specified filter applied.
def make_search(

)

Database logic to create a Search instance.

populate_sort

def populate_sort(
    sortby: List
) -> Union[Dict[str, Dict[str, str]], NoneType]

Database logic to sort search instance.

Methods

aggregate

def aggregate(
    self,
    collection_ids: Union[List[str], NoneType],
    aggregations: List[str],
    search: opensearchpy.helpers.search.Search,
    centroid_geohash_grid_precision: int,
    centroid_geohex_grid_precision: int,
    centroid_geotile_grid_precision: int,
    geometry_geohash_grid_precision: int,
    geometry_geotile_grid_precision: int,
    datetime_frequency_interval: str,
    ignore_unavailable: Union[bool, NoneType] = True
)

Return aggregations of STAC Items.

bulk_async

def bulk_async(
    self,
    collection_id: str,
    processed_items: List[stac_fastapi.types.stac.Item],
    refresh: bool = False
) -> None

Perform a bulk insert of items into the database asynchronously.

Parameters:

Name Type Description Default
self None The instance of the object calling this function. None
collection_id str The ID of the collection to which the items belong. None
processed_items List[Item] A list of Item objects to be inserted into the database. None
refresh bool Whether to refresh the index after the bulk insert (default: False). None

bulk_sync

def bulk_sync(
    self,
    collection_id: str,
    processed_items: List[stac_fastapi.types.stac.Item],
    refresh: bool = False
) -> None

Perform a bulk insert of items into the database synchronously.

Parameters:

Name Type Description Default
self None The instance of the object calling this function. None
collection_id str The ID of the collection to which the items belong. None
processed_items List[Item] A list of Item objects to be inserted into the database. None
refresh bool Whether to refresh the index after the bulk insert (default: False). None

check_collection_exists

def check_collection_exists(
    self,
    collection_id: str
)

Database logic to check if a collection exists.

create_collection

def create_collection(
    self,
    collection: stac_fastapi.types.stac.Collection,
    refresh: bool = False
)

Create a single collection in the database.

Parameters:

Name Type Description Default
collection Collection The Collection object to be created. None
refresh bool Whether to refresh the index after the creation. Default is False. None

Raises:

Type Description
ConflictError If a Collection with the same id already exists in the database.

create_item

def create_item(
    self,
    item: stac_fastapi.types.stac.Item,
    refresh: bool = False
)

Database logic for creating one item.

Parameters:

Name Type Description Default
item Item The item to be created. None
refresh bool Refresh the index after performing the operation. Defaults to False. False

Returns:

Type Description
None None

Raises:

Type Description
ConflictError If the item already exists in the database.

delete_collection

def delete_collection(
    self,
    collection_id: str,
    refresh: bool = False
)

Delete a collection from the database.

Parameters:

Name Type Description Default
self None The instance of the object calling this function. None
collection_id str The ID of the collection to be deleted. None
refresh bool Whether to refresh the index after the deletion (default: False). None

Raises:

Type Description
NotFoundError If the collection with the given collection_id is not found in the database.

delete_collections

def delete_collections(
    self
) -> None

Danger. this is only for tests.

delete_item

def delete_item(
    self,
    item_id: str,
    collection_id: str,
    refresh: bool = False
)

Delete a single item from the database.

Parameters:

Name Type Description Default
item_id str The id of the Item to be deleted. None
collection_id str The id of the Collection that the Item belongs to. None
refresh bool Whether to refresh the index after the deletion. Default is False. None

Raises:

Type Description
NotFoundError If the Item does not exist in the database.

delete_items

def delete_items(
    self
) -> None

Danger. this is only for tests.

def execute_search(
    self,
    search: opensearchpy.helpers.search.Search,
    limit: int,
    token: Union[str, NoneType],
    sort: Union[Dict[str, Dict[str, str]], NoneType],
    collection_ids: Union[List[str], NoneType],
    ignore_unavailable: bool = True
) -> Tuple[Iterable[Dict[str, Any]], Union[int, NoneType], Union[str, NoneType]]

Execute a search query with limit and other optional parameters.

Parameters:

Name Type Description Default
search Search The search query to be executed. None
limit int The maximum number of results to be returned. None
token Optional[str] The token used to return the next set of results. None
sort Optional[Dict[str, Dict[str, str]]] Specifies how the results should be sorted. None
collection_ids Optional[List[str]] The collection ids to search. None
ignore_unavailable bool Whether to ignore unavailable collections. Defaults to True. True

Returns:

Type Description
Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]] A tuple containing:
- An iterable of search results, where each result is a dictionary with keys and values representing the
fields and values of each document.
- The total number of results (if the count could be computed), or None if the count could not be
computed.
- The token to be used to retrieve the next set of results, or None if there are no more results.

Raises:

Type Description
NotFoundError If the collections specified in collection_ids do not exist.

find_collection

def find_collection(
    self,
    collection_id: str
) -> stac_fastapi.types.stac.Collection

Find and return a collection from the database.

Parameters:

Name Type Description Default
self None The instance of the object calling this function. None
collection_id str The ID of the collection to be found. None

Returns:

Type Description
Collection The found collection, represented as a Collection object.

Raises:

Type Description
NotFoundError If the collection with the given collection_id is not found in the database.

get_all_collections

def get_all_collections(
    self,
    token: Union[str, NoneType],
    limit: int,
    request: starlette.requests.Request
) -> Tuple[List[Dict[str, Any]], Union[str, NoneType]]

Retrieve a list of all collections from Opensearch, supporting pagination.

Parameters:

Name Type Description Default
token Optional[str] The pagination token. None
limit int The number of results to return. None

Returns:

Type Description
None A tuple of (collections, next pagination token if any).

get_one_item

def get_one_item(
    self,
    collection_id: str,
    item_id: str
) -> Dict

Retrieve a single item from the database.

Parameters:

Name Type Description Default
collection_id str The id of the Collection that the Item belongs to. None
item_id str The id of the Item. None

Returns:

Type Description
None item (Dict): A dictionary containing the source data for the Item.

Raises:

Type Description
NotFoundError If the specified Item does not exist in the Collection.

prep_create_item

def prep_create_item(
    self,
    item: stac_fastapi.types.stac.Item,
    base_url: str,
    exist_ok: bool = False
) -> stac_fastapi.types.stac.Item

Preps an item for insertion into the database.

Parameters:

Name Type Description Default
item Item The item to be prepped for insertion. None
base_url str The base URL used to create the item's self URL. None
exist_ok bool Indicates whether the item can exist already. None

Returns:

Type Description
Item The prepped item.

Raises:

Type Description
ConflictError If the item already exists in the database.

sync_prep_create_item

def sync_prep_create_item(
    self,
    item: stac_fastapi.types.stac.Item,
    base_url: str,
    exist_ok: bool = False
) -> stac_fastapi.types.stac.Item

Prepare an item for insertion into the database.

This method performs pre-insertion preparation on the given item, such as checking if the collection the item belongs to exists, and optionally verifying that an item with the same ID does not already exist in the database.

Parameters:

Name Type Description Default
item Item The item to be inserted into the database. None
base_url str The base URL used for constructing URLs for the item. None
exist_ok bool Indicates whether the item can exist already. None

Returns:

Type Description
Item The item after preparation is done.

Raises:

Type Description
NotFoundError If the collection that the item belongs to does not exist in the database.
ConflictError If an item with the same ID already exists in the collection.

update_collection

def update_collection(
    self,
    collection_id: str,
    collection: stac_fastapi.types.stac.Collection,
    refresh: bool = False
)

Update a collection from the database.

Args: self: The instance of the object calling this function. collection_id (str): The ID of the collection to be updated. collection (Collection): The Collection object to be used for the update.

Raises: NotFoundError: If the collection with the given collection_id is not found in the database.

Notes: This function updates the collection in the database using the specified collection_id and with the collection specified in the Collection object. If the collection is not found, a NotFoundError is raised.

Geometry

class Geometry(
    *args,
    **kwargs
)

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with

Ancestors (in MRO)

  • typing.Protocol
  • typing.Generic