Skip to content

db

stac_fastapi.pgstac.db

Database connection handling.

DB

DB class that can be used with context manager.

Source code in stac_fastapi/pgstac/db.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@attr.s
class DB:
    """DB class that can be used with context manager."""

    connection_string = attr.ib(default=None)
    _pool = attr.ib(default=None)
    _connection = attr.ib(default=None)

    async def create_pool(self, connection_string: str, settings):
        """Create a connection pool."""
        pool = await asyncpg.create_pool(
            connection_string,
            min_size=settings.db_min_conn_size,
            max_size=settings.db_max_conn_size,
            max_queries=settings.db_max_queries,
            max_inactive_connection_lifetime=settings.db_max_inactive_conn_lifetime,
            init=con_init,
            server_settings=settings.server_settings.model_dump(),
        )
        return pool

create_pool async

create_pool(connection_string: str, settings)

Create a connection pool.

Source code in stac_fastapi/pgstac/db.py
144
145
146
147
148
149
150
151
152
153
154
155
async def create_pool(self, connection_string: str, settings):
    """Create a connection pool."""
    pool = await asyncpg.create_pool(
        connection_string,
        min_size=settings.db_min_conn_size,
        max_size=settings.db_max_conn_size,
        max_queries=settings.db_max_queries,
        max_inactive_connection_lifetime=settings.db_max_inactive_conn_lifetime,
        init=con_init,
        server_settings=settings.server_settings.model_dump(),
    )
    return pool

close_db_connection async

close_db_connection(app: FastAPI) -> None

Close connection.

Source code in stac_fastapi/pgstac/db.py
73
74
75
76
async def close_db_connection(app: FastAPI) -> None:
    """Close connection."""
    await app.state.readpool.close()
    await app.state.writepool.close()

con_init async

con_init(conn)

Use orjson for json returns.

Source code in stac_fastapi/pgstac/db.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async def con_init(conn):
    """Use orjson for json returns."""
    await conn.set_type_codec(
        "json",
        encoder=orjson.dumps,
        decoder=orjson.loads,
        schema="pg_catalog",
    )
    await conn.set_type_codec(
        "jsonb",
        encoder=orjson.dumps,
        decoder=orjson.loads,
        schema="pg_catalog",
    )

connect_to_db async

connect_to_db(
    app: FastAPI,
    get_conn: Optional[ConnectionGetter] = None,
    postgres_settings: Optional[PostgresSettings] = None,
) -> None

Create connection pools & connection retriever on application.

Source code in stac_fastapi/pgstac/db.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async def connect_to_db(
    app: FastAPI,
    get_conn: Optional[ConnectionGetter] = None,
    postgres_settings: Optional[PostgresSettings] = None,
) -> None:
    """Create connection pools & connection retriever on application."""
    app_settings = app.state.settings

    if not postgres_settings:
        postgres_settings = PostgresSettings()

    if app_settings.testing:
        readpool = writepool = postgres_settings.testing_connection_string
    else:
        readpool = postgres_settings.reader_connection_string
        writepool = postgres_settings.writer_connection_string

    db = DB()
    app.state.readpool = await db.create_pool(readpool, postgres_settings)
    app.state.writepool = await db.create_pool(writepool, postgres_settings)
    app.state.get_connection = get_conn if get_conn else get_connection

dbfunc async

dbfunc(conn: Connection, func: str, arg: Union[str, Dict, List])

Wrap PLPGSQL Functions.

Keyword arguments: pool -- the asyncpg pool to use to connect to the database func -- the name of the PostgreSQL function to call arg -- the argument to the PostgreSQL function as either a string or a dict that will be converted into jsonb

Source code in stac_fastapi/pgstac/db.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
async def dbfunc(conn: Connection, func: str, arg: Union[str, Dict, List]):
    """Wrap PLPGSQL Functions.

    Keyword arguments:
    pool -- the asyncpg pool to use to connect to the database
    func -- the name of the PostgreSQL function to call
    arg -- the argument to the PostgreSQL function as either a string
    or a dict that will be converted into jsonb
    """
    with translate_pgstac_errors():
        if isinstance(arg, str):
            q, p = render(
                """
                SELECT * FROM :func(:item::text);
                """,
                func=V(func),
                item=arg,
            )
            return await conn.fetchval(q, *p)
        else:
            q, p = render(
                """
                SELECT * FROM :func(:item::text::jsonb);
                """,
                func=V(func),
                item=json.dumps(arg),
            )
            return await conn.fetchval(q, *p)

get_connection async

get_connection(request: Request, readwrite: Literal['r', 'w'] = 'r') -> AsyncIterator[Connection]

Retrieve connection from database conection pool.

Source code in stac_fastapi/pgstac/db.py
79
80
81
82
83
84
85
86
87
88
@asynccontextmanager
async def get_connection(
    request: Request,
    readwrite: Literal["r", "w"] = "r",
) -> AsyncIterator[Connection]:
    """Retrieve connection from database conection pool."""
    pool = request.app.state.writepool if readwrite == "w" else request.app.state.readpool
    with translate_pgstac_errors():
        async with pool.acquire() as conn:
            yield conn

translate_pgstac_errors

translate_pgstac_errors() -> Generator[None, None, None]

Context manager that translates pgstac errors into FastAPI errors.

Source code in stac_fastapi/pgstac/db.py
121
122
123
124
125
126
127
128
129
130
131
132
133
@contextmanager
def translate_pgstac_errors() -> Generator[None, None, None]:
    """Context manager that translates pgstac errors into FastAPI errors."""
    try:
        yield
    except exceptions.UniqueViolationError as e:
        raise ConflictError from e
    except exceptions.NoDataFoundError as e:
        raise NotFoundError from e
    except exceptions.NotNullViolationError as e:
        raise DatabaseError from e
    except exceptions.ForeignKeyViolationError as e:
        raise ForeignKeyError from e