Skip to content

db

stac_fastapi.pgstac.db

Database connection handling.

close_db_connection async

close_db_connection(app: FastAPI) -> None

Close connection.

Source code in stac_fastapi/pgstac/db.py
84
85
86
87
88
async def close_db_connection(app: FastAPI) -> None:
    """Close connection."""
    await app.state.readpool.close()
    if pool := getattr(app.state, "writepool", None):
        await pool.close()

con_init async

con_init(conn)

Use orjson for json returns.

Source code in stac_fastapi/pgstac/db.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
async def con_init(conn):
    """Use orjson for json returns."""
    await conn.set_type_codec(
        "json",
        encoder=orjson.dumps,
        decoder=orjson.loads,
        schema="pg_catalog",
    )
    await conn.set_type_codec(
        "jsonb",
        encoder=orjson.dumps,
        decoder=orjson.loads,
        schema="pg_catalog",
    )

connect_to_db async

connect_to_db(
    app: FastAPI,
    get_conn: Optional[ConnectionGetter] = None,
    postgres_settings: Optional[PostgresSettings] = None,
    add_write_connection_pool: bool = False,
    write_postgres_settings: Optional[PostgresSettings] = None,
) -> None

Create connection pools & connection retriever on application.

Source code in stac_fastapi/pgstac/db.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async def connect_to_db(
    app: FastAPI,
    get_conn: Optional[ConnectionGetter] = None,
    postgres_settings: Optional[PostgresSettings] = None,
    add_write_connection_pool: bool = False,
    write_postgres_settings: Optional[PostgresSettings] = None,
) -> None:
    """Create connection pools & connection retriever on application."""
    if not postgres_settings:
        postgres_settings = PostgresSettings()

    app.state.readpool = await _create_pool(postgres_settings)

    if add_write_connection_pool:
        if not write_postgres_settings:
            write_postgres_settings = postgres_settings

        app.state.writepool = await _create_pool(write_postgres_settings)

    app.state.get_connection = get_conn if get_conn else get_connection

dbfunc async

dbfunc(conn: Connection, func: str, arg: Union[str, Dict, List])

Wrap PLPGSQL Functions.

Keyword arguments: pool -- the asyncpg pool to use to connect to the database func -- the name of the PostgreSQL function to call arg -- the argument to the PostgreSQL function as either a string or a dict that will be converted into jsonb

Source code in stac_fastapi/pgstac/db.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def dbfunc(conn: Connection, func: str, arg: Union[str, Dict, List]):
    """Wrap PLPGSQL Functions.

    Keyword arguments:
    pool -- the asyncpg pool to use to connect to the database
    func -- the name of the PostgreSQL function to call
    arg -- the argument to the PostgreSQL function as either a string
    or a dict that will be converted into jsonb
    """
    with translate_pgstac_errors():
        if isinstance(arg, str):
            q, p = render(
                """
                SELECT * FROM :func(:item::text);
                """,
                func=V(func),
                item=arg,
            )
            return await conn.fetchval(q, *p)
        else:
            q, p = render(
                """
                SELECT * FROM :func(:item::text::jsonb);
                """,
                func=V(func),
                item=json.dumps(arg),
            )
            return await conn.fetchval(q, *p)

get_connection async

get_connection(request: Request, readwrite: Literal['r', 'w'] = 'r') -> AsyncIterator[Connection]

Retrieve connection from database conection pool.

Source code in stac_fastapi/pgstac/db.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@asynccontextmanager
async def get_connection(
    request: Request,
    readwrite: Literal["r", "w"] = "r",
) -> AsyncIterator[Connection]:
    """Retrieve connection from database conection pool."""
    pool = request.app.state.readpool
    if readwrite == "w":
        pool = getattr(request.app.state, "writepool", None)
        if not pool:
            raise HTTPException(
                status_code=500,
                detail="Could not find connection pool for write operations",
            )

    with translate_pgstac_errors():
        async with pool.acquire() as conn:
            yield conn

translate_pgstac_errors

translate_pgstac_errors() -> Generator[None, None, None]

Context manager that translates pgstac errors into FastAPI errors.

Source code in stac_fastapi/pgstac/db.py
141
142
143
144
145
146
147
148
149
150
151
152
153
@contextmanager
def translate_pgstac_errors() -> Generator[None, None, None]:
    """Context manager that translates pgstac errors into FastAPI errors."""
    try:
        yield
    except exceptions.UniqueViolationError as e:
        raise ConflictError from e
    except exceptions.NoDataFoundError as e:
        raise NotFoundError from e
    except exceptions.NotNullViolationError as e:
        raise DatabaseError from e
    except exceptions.ForeignKeyViolationError as e:
        raise ForeignKeyError from e