Module stac_fastapi.opensearch.database_logic¶
Database logic.
Variables¶
COLLECTIONS_INDEX
DEFAULT_SORT
ES_COLLECTIONS_MAPPINGS
ES_INDEX_NAME_UNSUPPORTED_CHARS
ES_ITEMS_MAPPINGS
ES_ITEMS_SETTINGS
ES_MAPPINGS_DYNAMIC_TEMPLATES
ITEMS_INDEX_PREFIX
ITEM_INDICES
MAX_LIMIT
NumType
logger
Functions¶
create_collection_index¶
def create_collection_index(
) -> None
Create the index for a Collection. The settings of the index template will be used implicitly.
Returns:
Type | Description |
---|---|
None | None |
create_index_templates¶
def create_index_templates(
) -> None
Create index templates for the Collection and Item indices.
Returns:
Type | Description |
---|---|
None | None |
create_item_index¶
def create_item_index(
collection_id: str
)
Create the index for Items. The settings of the index template will be used implicitly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection_id | str | Collection identifier. | None |
Returns:
Type | Description |
---|---|
None | None |
delete_item_index¶
def delete_item_index(
collection_id: str
)
Delete the index for items in a collection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection_id | str | The ID of the collection whose items index will be deleted. | None |
index_by_collection_id¶
def index_by_collection_id(
collection_id: str
) -> str
Translate a collection id into an Elasticsearch index name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection_id | str | The collection id to translate into an index name. | None |
Returns:
Type | Description |
---|---|
str | The index name derived from the collection id. |
indices¶
def indices(
collection_ids: Union[List[str], NoneType]
) -> str
Get a comma-separated string of index names for a given list of collection ids.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection_ids | None | A list of collection ids. | None |
Returns:
Type | Description |
---|---|
None | A string of comma-separated index names. If collection_ids is None, returns the default indices. |
mk_actions¶
def mk_actions(
collection_id: str,
processed_items: List[stac_fastapi.types.stac.Item]
)
Create Elasticsearch bulk actions for a list of processed items.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection_id | str | The identifier for the collection the items belong to. | None |
processed_items | List[Item] | The list of processed items to be bulk indexed. | None |
Returns:
Type | Description |
---|---|
List[Dict[str, Union[str, Dict]]] | The list of bulk actions to be executed, each action being a dictionary with the following keys: - _index : the index to store the document in.- _id : the document's identifier.- _source : the source of the document. |
mk_item_id¶
def mk_item_id(
item_id: str,
collection_id: str
)
Create the document id for an Item in Elasticsearch.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item_id | str | The id of the Item. | None |
collection_id | str | The id of the Collection that the Item belongs to. | None |
Returns:
Type | Description |
---|---|
str | The document id for the Item, combining the Item id and the Collection id, separated by a | character. |
Classes¶
DatabaseLogic¶
class DatabaseLogic(
item_serializer: Type[stac_fastapi.core.serializers.ItemSerializer] = <class 'stac_fastapi.core.serializers.ItemSerializer'>,
collection_serializer: Type[stac_fastapi.core.serializers.CollectionSerializer] = <class 'stac_fastapi.core.serializers.CollectionSerializer'>,
extensions: List[str] = NOTHING
)
Database logic.
Class variables¶
aggregation_mapping
client
sync_client
Static methods¶
apply_bbox_filter¶
def apply_bbox_filter(
search: opensearchpy.helpers.search.Search,
bbox: List
)
Filter search results based on bounding box.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
search | Search | The search object to apply the filter to. | None |
bbox | List | The bounding box coordinates, represented as a list of four values [minx, miny, maxx, maxy]. | None |
Returns:
Type | Description |
---|---|
None | search (Search): The search object with the bounding box filter applied. |
apply_collections_filter¶
def apply_collections_filter(
search: opensearchpy.helpers.search.Search,
collection_ids: List[str]
)
Database logic to search a list of STAC collection ids.
apply_cql2_filter¶
def apply_cql2_filter(
search: opensearchpy.helpers.search.Search,
_filter: Union[Dict[str, Any], NoneType]
)
Apply a CQL2 filter to an Opensearch Search object.
This method transforms a dictionary representing a CQL2 filter into an Opensearch query and applies it to the provided Search object. If the filter is None, the original Search object is returned unmodified.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
search | Search | The Opensearch Search object to which the filter will be applied. | None |
_filter | Optional[Dict[str, Any]] | The filter in dictionary form that needs to be applied to the search. The dictionary should follow the structure required by the to_es function which converts itto an Opensearch query. |
None |
Returns:
Type | Description |
---|---|
Search | The modified Search object with the filter applied if a filter is provided, otherwise the original Search object. |
apply_datetime_filter¶
def apply_datetime_filter(
search: opensearchpy.helpers.search.Search,
datetime_search
)
Apply a filter to search based on datetime field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
search | Search | The search object to filter. | None |
datetime_search | dict | The datetime filter criteria. | None |
Returns:
Type | Description |
---|---|
Search | The filtered search object. |
apply_free_text_filter¶
def apply_free_text_filter(
search: opensearchpy.helpers.search.Search,
free_text_queries: Union[List[str], NoneType]
)
Database logic to perform query for search endpoint.
apply_ids_filter¶
def apply_ids_filter(
search: opensearchpy.helpers.search.Search,
item_ids: List[str]
)
Database logic to search a list of STAC item ids.
apply_intersects_filter¶
def apply_intersects_filter(
search: opensearchpy.helpers.search.Search,
intersects: stac_fastapi.opensearch.database_logic.Geometry
)
Filter search results based on intersecting geometry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
search | Search | The search object to apply the filter to. | None |
intersects | Geometry | The intersecting geometry, represented as a GeoJSON-like object. | None |
Returns:
Type | Description |
---|---|
None | search (Search): The search object with the intersecting geometry filter applied. |
apply_stacql_filter¶
def apply_stacql_filter(
search: opensearchpy.helpers.search.Search,
op: str,
field: str,
value: float
)
Filter search results based on a comparison between a field and a value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
search | Search | The search object to apply the filter to. | None |
op | str | The comparison operator to use. Can be 'eq' (equal), 'gt' (greater than), 'gte' (greater than or equal), 'lt' (less than), or 'lte' (less than or equal). |
None |
field | str | The field to perform the comparison on. | None |
value | float | The value to compare the field against. | None |
Returns:
Type | Description |
---|---|
None | search (Search): The search object with the specified filter applied. |
make_search¶
def make_search(
)
Database logic to create a Search instance.
populate_sort¶
def populate_sort(
sortby: List
) -> Union[Dict[str, Dict[str, str]], NoneType]
Database logic to sort search instance.
Methods¶
aggregate¶
def aggregate(
self,
collection_ids: Union[List[str], NoneType],
aggregations: List[str],
search: opensearchpy.helpers.search.Search,
centroid_geohash_grid_precision: int,
centroid_geohex_grid_precision: int,
centroid_geotile_grid_precision: int,
geometry_geohash_grid_precision: int,
geometry_geotile_grid_precision: int,
datetime_frequency_interval: str,
ignore_unavailable: Union[bool, NoneType] = True
)
Return aggregations of STAC Items.
bulk_async¶
def bulk_async(
self,
collection_id: str,
processed_items: List[stac_fastapi.types.stac.Item],
refresh: bool = False
) -> None
Perform a bulk insert of items into the database asynchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self | None | The instance of the object calling this function. | None |
collection_id | str | The ID of the collection to which the items belong. | None |
processed_items | List[Item] | A list of Item objects to be inserted into the database. |
None |
refresh | bool | Whether to refresh the index after the bulk insert (default: False). | None |
bulk_sync¶
def bulk_sync(
self,
collection_id: str,
processed_items: List[stac_fastapi.types.stac.Item],
refresh: bool = False
) -> None
Perform a bulk insert of items into the database synchronously.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self | None | The instance of the object calling this function. | None |
collection_id | str | The ID of the collection to which the items belong. | None |
processed_items | List[Item] | A list of Item objects to be inserted into the database. |
None |
refresh | bool | Whether to refresh the index after the bulk insert (default: False). | None |
check_collection_exists¶
def check_collection_exists(
self,
collection_id: str
)
Database logic to check if a collection exists.
create_collection¶
def create_collection(
self,
collection: stac_fastapi.types.stac.Collection,
refresh: bool = False
)
Create a single collection in the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection | Collection | The Collection object to be created. | None |
refresh | bool | Whether to refresh the index after the creation. Default is False. | None |
Raises:
Type | Description |
---|---|
ConflictError | If a Collection with the same id already exists in the database. |
create_item¶
def create_item(
self,
item: stac_fastapi.types.stac.Item,
refresh: bool = False
)
Database logic for creating one item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item | Item | The item to be created. | None |
refresh | bool | Refresh the index after performing the operation. Defaults to False. | False |
Returns:
Type | Description |
---|---|
None | None |
Raises:
Type | Description |
---|---|
ConflictError | If the item already exists in the database. |
delete_collection¶
def delete_collection(
self,
collection_id: str,
refresh: bool = False
)
Delete a collection from the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self | None | The instance of the object calling this function. | None |
collection_id | str | The ID of the collection to be deleted. | None |
refresh | bool | Whether to refresh the index after the deletion (default: False). | None |
Raises:
Type | Description |
---|---|
NotFoundError | If the collection with the given collection_id is not found in the database. |
delete_collections¶
def delete_collections(
self
) -> None
Danger. this is only for tests.
delete_item¶
def delete_item(
self,
item_id: str,
collection_id: str,
refresh: bool = False
)
Delete a single item from the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item_id | str | The id of the Item to be deleted. | None |
collection_id | str | The id of the Collection that the Item belongs to. | None |
refresh | bool | Whether to refresh the index after the deletion. Default is False. | None |
Raises:
Type | Description |
---|---|
NotFoundError | If the Item does not exist in the database. |
delete_items¶
def delete_items(
self
) -> None
Danger. this is only for tests.
execute_search¶
def execute_search(
self,
search: opensearchpy.helpers.search.Search,
limit: int,
token: Union[str, NoneType],
sort: Union[Dict[str, Dict[str, str]], NoneType],
collection_ids: Union[List[str], NoneType],
ignore_unavailable: bool = True
) -> Tuple[Iterable[Dict[str, Any]], Union[int, NoneType], Union[str, NoneType]]
Execute a search query with limit and other optional parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
search | Search | The search query to be executed. | None |
limit | int | The maximum number of results to be returned. | None |
token | Optional[str] | The token used to return the next set of results. | None |
sort | Optional[Dict[str, Dict[str, str]]] | Specifies how the results should be sorted. | None |
collection_ids | Optional[List[str]] | The collection ids to search. | None |
ignore_unavailable | bool | Whether to ignore unavailable collections. Defaults to True. | True |
Returns:
Type | Description |
---|---|
Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]] | A tuple containing: - An iterable of search results, where each result is a dictionary with keys and values representing the fields and values of each document. - The total number of results (if the count could be computed), or None if the count could not be computed. - The token to be used to retrieve the next set of results, or None if there are no more results. |
Raises:
Type | Description |
---|---|
NotFoundError | If the collections specified in collection_ids do not exist. |
find_collection¶
def find_collection(
self,
collection_id: str
) -> stac_fastapi.types.stac.Collection
Find and return a collection from the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self | None | The instance of the object calling this function. | None |
collection_id | str | The ID of the collection to be found. | None |
Returns:
Type | Description |
---|---|
Collection | The found collection, represented as a Collection object. |
Raises:
Type | Description |
---|---|
NotFoundError | If the collection with the given collection_id is not found in the database. |
get_all_collections¶
def get_all_collections(
self,
token: Union[str, NoneType],
limit: int,
request: starlette.requests.Request
) -> Tuple[List[Dict[str, Any]], Union[str, NoneType]]
Retrieve a list of all collections from Opensearch, supporting pagination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token | Optional[str] | The pagination token. | None |
limit | int | The number of results to return. | None |
Returns:
Type | Description |
---|---|
None | A tuple of (collections, next pagination token if any). |
get_one_item¶
def get_one_item(
self,
collection_id: str,
item_id: str
) -> Dict
Retrieve a single item from the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection_id | str | The id of the Collection that the Item belongs to. | None |
item_id | str | The id of the Item. | None |
Returns:
Type | Description |
---|---|
None | item (Dict): A dictionary containing the source data for the Item. |
Raises:
Type | Description |
---|---|
NotFoundError | If the specified Item does not exist in the Collection. |
prep_create_item¶
def prep_create_item(
self,
item: stac_fastapi.types.stac.Item,
base_url: str,
exist_ok: bool = False
) -> stac_fastapi.types.stac.Item
Preps an item for insertion into the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item | Item | The item to be prepped for insertion. | None |
base_url | str | The base URL used to create the item's self URL. | None |
exist_ok | bool | Indicates whether the item can exist already. | None |
Returns:
Type | Description |
---|---|
Item | The prepped item. |
Raises:
Type | Description |
---|---|
ConflictError | If the item already exists in the database. |
sync_prep_create_item¶
def sync_prep_create_item(
self,
item: stac_fastapi.types.stac.Item,
base_url: str,
exist_ok: bool = False
) -> stac_fastapi.types.stac.Item
Prepare an item for insertion into the database.
This method performs pre-insertion preparation on the given item
,
such as checking if the collection the item belongs to exists,
and optionally verifying that an item with the same ID does not already exist in the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item | Item | The item to be inserted into the database. | None |
base_url | str | The base URL used for constructing URLs for the item. | None |
exist_ok | bool | Indicates whether the item can exist already. | None |
Returns:
Type | Description |
---|---|
Item | The item after preparation is done. |
Raises:
Type | Description |
---|---|
NotFoundError | If the collection that the item belongs to does not exist in the database. |
ConflictError | If an item with the same ID already exists in the collection. |
update_collection¶
def update_collection(
self,
collection_id: str,
collection: stac_fastapi.types.stac.Collection,
refresh: bool = False
)
Update a collection from the database.
Args: self: The instance of the object calling this function. collection_id (str): The ID of the collection to be updated. collection (Collection): The Collection object to be used for the update.
Raises:
NotFoundError: If the collection with the given collection_id
is not
found in the database.
Notes:
This function updates the collection in the database using the specified
collection_id
and with the collection specified in the Collection
object.
If the collection is not found, a NotFoundError
is raised.
Geometry¶
class Geometry(
*args,
**kwargs
)
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol):
def meth(self) -> int:
...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::
class C:
def meth(self) -> int:
return 0
def func(x: Proto) -> int:
return x.meth()
func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with
Ancestors (in MRO)¶
- typing.Protocol
- typing.Generic