Module stac_fastapi.sqlalchemy.transactions¶
transactions extension client.
Variables¶
logger
Classes¶
BulkTransactionsClient¶
class BulkTransactionsClient(
session: stac_fastapi.sqlalchemy.session.Session = NOTHING,
debug: bool = False,
item_table: Type[stac_fastapi.sqlalchemy.models.database.Item] = <class 'stac_fastapi.sqlalchemy.models.database.Item'>,
item_serializer: Type[stac_fastapi.sqlalchemy.serializers.Serializer] = <class 'stac_fastapi.sqlalchemy.serializers.ItemSerializer'>
)
Postgres bulk transactions.
Ancestors (in MRO)¶
- stac_fastapi.extensions.third_party.bulk_transactions.BaseBulkTransactionsClient
- abc.ABC
Methods¶
bulk_item_insert¶
def bulk_item_insert(
self,
items: stac_fastapi.extensions.third_party.bulk_transactions.Items,
chunk_size: Optional[int] = None,
**kwargs
) -> str
Bulk item insertion using sqlalchemy core.
TransactionsClient¶
class TransactionsClient(
session: stac_fastapi.sqlalchemy.session.Session = NOTHING,
collection_table: Type[stac_fastapi.sqlalchemy.models.database.Collection] = <class 'stac_fastapi.sqlalchemy.models.database.Collection'>,
item_table: Type[stac_fastapi.sqlalchemy.models.database.Item] = <class 'stac_fastapi.sqlalchemy.models.database.Item'>,
item_serializer: Type[stac_fastapi.sqlalchemy.serializers.Serializer] = <class 'stac_fastapi.sqlalchemy.serializers.ItemSerializer'>,
collection_serializer: Type[stac_fastapi.sqlalchemy.serializers.Serializer] = <class 'stac_fastapi.sqlalchemy.serializers.CollectionSerializer'>
)
Transactions extension specific CRUD operations.
Ancestors (in MRO)¶
- stac_fastapi.types.core.BaseTransactionsClient
- abc.ABC
Methods¶
create_collection¶
def create_collection(
self,
collection: stac_fastapi.types.stac.Collection,
**kwargs
) -> Union[stac_fastapi.types.stac.Collection, starlette.responses.Response, NoneType]
Create collection.
create_item¶
def create_item(
self,
collection_id: str,
item: Union[stac_fastapi.types.stac.Item, stac_fastapi.types.stac.ItemCollection],
**kwargs
) -> Optional[stac_fastapi.types.stac.Item]
Create item.
delete_collection¶
def delete_collection(
self,
collection_id: str,
**kwargs
) -> Union[stac_fastapi.types.stac.Collection, starlette.responses.Response, NoneType]
Delete collection.
delete_item¶
def delete_item(
self,
item_id: str,
collection_id: str,
**kwargs
) -> Union[stac_fastapi.types.stac.Item, starlette.responses.Response, NoneType]
Delete item.
update_collection¶
def update_collection(
self,
collection: stac_fastapi.types.stac.Collection,
**kwargs
) -> Union[stac_fastapi.types.stac.Collection, starlette.responses.Response, NoneType]
Update collection.
update_item¶
def update_item(
self,
collection_id: str,
item_id: str,
item: stac_fastapi.types.stac.Item,
**kwargs
) -> Union[stac_fastapi.types.stac.Item, starlette.responses.Response, NoneType]
Update item.