Skip to content

bulk_transactions

stac_fastapi.extensions.third_party.bulk_transactions

Bulk transactions extension.

AsyncBaseBulkTransactionsClient

Bases: ABC

BulkTransactionsClient.

Source code in stac_fastapi/extensions/third_party/bulk_transactions.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@attr.s  # type: ignore
class AsyncBaseBulkTransactionsClient(abc.ABC):
    """BulkTransactionsClient."""

    @abc.abstractmethod
    async def bulk_item_insert(
        self,
        items: Items,
        **kwargs,
    ) -> str:
        """Bulk creation of items.

        Args:
            items: list of items.

        Returns:
            Message indicating the status of the insert.
        """
        raise NotImplementedError

bulk_item_insert abstractmethod async

bulk_item_insert(items: Items, **kwargs) -> str

Bulk creation of items.

Parameters:

  • items (Items) –

    list of items.

Returns:

  • str

    Message indicating the status of the insert.

Source code in stac_fastapi/extensions/third_party/bulk_transactions.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@abc.abstractmethod
async def bulk_item_insert(
    self,
    items: Items,
    **kwargs,
) -> str:
    """Bulk creation of items.

    Args:
        items: list of items.

    Returns:
        Message indicating the status of the insert.
    """
    raise NotImplementedError

BaseBulkTransactionsClient

Bases: ABC

BulkTransactionsClient.

Source code in stac_fastapi/extensions/third_party/bulk_transactions.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@attr.s  # type: ignore
class BaseBulkTransactionsClient(abc.ABC):
    """BulkTransactionsClient."""

    @staticmethod
    def _chunks(lst, n):
        """Yield successive n-sized chunks from list.

        https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
        """
        for i in range(0, len(lst), n):
            yield lst[i : i + n]

    @abc.abstractmethod
    def bulk_item_insert(
        self,
        items: Items,
        chunk_size: Optional[int] = None,
        **kwargs,
    ) -> str:
        """Bulk creation of items.

        Args:
            items: list of items.
            chunk_size: number of items processed at a time.

        Returns:
            Message indicating the status of the insert.
        """
        raise NotImplementedError

bulk_item_insert abstractmethod

bulk_item_insert(items: Items, chunk_size: Optional[int] = None, **kwargs) -> str

Bulk creation of items.

Parameters:

  • items (Items) –

    list of items.

  • chunk_size (Optional[int], default: None ) –

    number of items processed at a time.

Returns:

  • str

    Message indicating the status of the insert.

Source code in stac_fastapi/extensions/third_party/bulk_transactions.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@abc.abstractmethod
def bulk_item_insert(
    self,
    items: Items,
    chunk_size: Optional[int] = None,
    **kwargs,
) -> str:
    """Bulk creation of items.

    Args:
        items: list of items.
        chunk_size: number of items processed at a time.

    Returns:
        Message indicating the status of the insert.
    """
    raise NotImplementedError

BulkTransactionExtension

Bases: ApiExtension

Bulk Transaction Extension.

Bulk Transaction extension adds the POST /collections/{collection_id}/bulk_items endpoint to the application for efficient bulk insertion of items. The input to this is an object with an attribute "items", that has a value that is an object with a group of attributes that are the ids of each Item, and the value is the Item entity.

Optionally, clients can specify a "method" attribute that is either "insert" or "upsert". If "insert", then the items will be inserted if they do not exist, and an error will be returned if they do. If "upsert", then the items will be inserted if they do not exist, and updated if they do. This defaults to "insert".

{
    "items": {
        "id1": { "type": "Feature", ... },
        "id2": { "type": "Feature", ... },
        "id3": { "type": "Feature", ... }
    },
    "method": "insert"
}
Source code in stac_fastapi/extensions/third_party/bulk_transactions.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@attr.s
class BulkTransactionExtension(ApiExtension):
    """Bulk Transaction Extension.

    Bulk Transaction extension adds the `POST
    /collections/{collection_id}/bulk_items` endpoint to the application for
    efficient bulk insertion of items. The input to this is an object with an
    attribute  "items", that has a value that is an object with a group of
    attributes that are the ids of each Item, and the value is the Item entity.

    Optionally, clients can specify a "method" attribute that is either "insert"
    or "upsert". If "insert", then the items will be inserted if they do not
    exist, and an error will be returned if they do. If "upsert", then the items
    will be inserted if they do not exist, and updated if they do. This defaults
    to "insert".

        {
            "items": {
                "id1": { "type": "Feature", ... },
                "id2": { "type": "Feature", ... },
                "id3": { "type": "Feature", ... }
            },
            "method": "insert"
        }
    """

    client: Union[AsyncBaseBulkTransactionsClient, BaseBulkTransactionsClient] = attr.ib()
    conformance_classes: List[str] = attr.ib(default=list())
    schema_href: Optional[str] = attr.ib(default=None)

    def register(self, app: FastAPI) -> None:
        """Register the extension with a FastAPI application.

        Args:
            app: target FastAPI application.

        Returns:
            None
        """
        items_request_model = create_request_model("Items", base_model=Items)

        router = APIRouter(prefix=app.state.router_prefix)
        router.add_api_route(
            name="Bulk Create Item",
            path="/collections/{collection_id}/bulk_items",
            response_model=str,
            response_model_exclude_unset=True,
            response_model_exclude_none=True,
            methods=["POST"],
            endpoint=create_async_endpoint(
                self.client.bulk_item_insert, items_request_model
            ),
        )
        app.include_router(router, tags=["Bulk Transaction Extension"])

register

register(app: FastAPI) -> None

Register the extension with a FastAPI application.

Parameters:

  • app (FastAPI) –

    target FastAPI application.

Returns:

  • None

    None

Source code in stac_fastapi/extensions/third_party/bulk_transactions.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def register(self, app: FastAPI) -> None:
    """Register the extension with a FastAPI application.

    Args:
        app: target FastAPI application.

    Returns:
        None
    """
    items_request_model = create_request_model("Items", base_model=Items)

    router = APIRouter(prefix=app.state.router_prefix)
    router.add_api_route(
        name="Bulk Create Item",
        path="/collections/{collection_id}/bulk_items",
        response_model=str,
        response_model_exclude_unset=True,
        response_model_exclude_none=True,
        methods=["POST"],
        endpoint=create_async_endpoint(
            self.client.bulk_item_insert, items_request_model
        ),
    )
    app.include_router(router, tags=["Bulk Transaction Extension"])

BulkTransactionMethod

Bases: str, Enum

Bulk Transaction Methods.

Source code in stac_fastapi/extensions/third_party/bulk_transactions.py
16
17
18
19
20
class BulkTransactionMethod(str, Enum):
    """Bulk Transaction Methods."""

    INSERT = "insert"
    UPSERT = "upsert"

Items

Bases: BaseModel

A group of STAC Item objects, in the form of a dictionary from Item.id -> Item.

Source code in stac_fastapi/extensions/third_party/bulk_transactions.py
23
24
25
26
27
28
29
30
31
class Items(BaseModel):
    """A group of STAC Item objects, in the form of a dictionary from Item.id -> Item."""

    items: Dict[str, Any]
    method: BulkTransactionMethod = BulkTransactionMethod.INSERT

    def __iter__(self):
        """Return an iterable of STAC Item objects."""
        return iter(self.items.values())

__iter__

__iter__()

Return an iterable of STAC Item objects.

Source code in stac_fastapi/extensions/third_party/bulk_transactions.py
29
30
31
def __iter__(self):
    """Return an iterable of STAC Item objects."""
    return iter(self.items.values())