Adapters¶
Adapters are the concrete implementations of the Ports. Each adapter binds the system to one specific external technology.
Container is the only entry point
Never import an adapter class directly in your application code.
Always go through services/container.py — that is the single wiring
point where adapters are named. This preserves swappability.
Provider overview¶
Two providers are supported for both embedding and LLM. Selection is controlled entirely by environment variables — no code changes required.
| Component | EMBED_PROVIDER=vertex (default) |
EMBED_PROVIDER=openai |
|---|---|---|
| Embedding adapter | VertexEmbeddingAdapter |
OpenAIEmbeddingAdapter |
| Auth required | gcloud ADC token | OPENAI_API_KEY |
| Model default | text-embedding-005 |
text-embedding-3-small |
| Component | LLM_PROVIDER=vertex (default) |
LLM_PROVIDER=openai |
|---|---|---|
| LLM adapter | GeminiLLMAdapter |
OpenAILLMAdapter |
| Auth required | gcloud ADC token | OPENAI_API_KEY |
| Model default | gemini-2.5-flash |
gpt-4o |
Mix-and-match is supported (e.g. EMBED_PROVIDER=openai LLM_PROVIDER=vertex).
GCPAuthManager¶
Shared across both GCP adapters (VertexEmbeddingAdapter and GeminiLLMAdapter).
Manages the bearer token lifecycle — fetching, caching, and refreshing — so
that both adapters share a single token rather than making separate gcloud
subprocess calls.
Only instantiated when EMBED_PROVIDER=vertex or LLM_PROVIDER=vertex.
gcp_auth ¶
adapters/gcp_auth.py ────────────────────────────────────────────────────────────────────────────── GCP authentication manager.
Wraps the gcloud auth print-access-token subprocess call, caches the token
in memory, and refreshes automatically when it is within TOKEN_REFRESH_MARGIN
seconds of expiry.
Design notes
- All adapters that need GCP auth receive a GCPAuthManager instance via DI.
- A single GCPAuthManager is created in services/container.py and shared across VertexEmbeddingAdapter and GeminiLLMAdapter → one token, one call.
- Thread-safe: uses a threading.Lock for token refresh in multi-threaded contexts (Streamlit, FastAPI worker threads).
GCPAuthManager ¶
Manages a GCP Application Default Credentials access token.
Usage (injected by container.py — do not instantiate manually): auth = GCPAuthManager(settings) token = auth.get_token() # fresh or cached
Source code in prod/adapters/gcp_auth.py
get_token ¶
Return a valid access token, refreshing if necessary.
Returns:
| Type | Description |
|---|---|
str
|
Bearer token string. |
Raises:
| Type | Description |
|---|---|
AuthenticationError
|
If gcloud fails. |
Source code in prod/adapters/gcp_auth.py
invalidate ¶
Force the next call to get_token() to fetch a fresh token.
VertexEmbeddingAdapter¶
Implements EmbeddingPort using the Vertex AI Predict REST API.
Key behaviours:
embed_queryusesRETRIEVAL_QUERYtask type (asymmetric retrieval)embed_documentusesRETRIEVAL_DOCUMENTtask type- Batch calls chunk to
embed_batch_size(default 50) to stay within API limits - HTTP 401 → invalidates the cached token and retries once
- HTTP 429 / 503 → exponential back-off with up to
embed_retriesattempts
vertex_embedding ¶
adapters/vertex_embedding.py ────────────────────────────────────────────────────────────────────────────── Implements EmbeddingPort using Vertex AI text-embedding-005.
Key behaviour
- embed_query → RETRIEVAL_QUERY task type (asymmetric retrieval)
- embed_document → RETRIEVAL_DOCUMENT task type
- embed_documents_batch → single API call for up to embed_batch_size items
- Retries on transient HTTP errors (429, 503) with exponential back-off
- Corporate proxy support via settings.https_proxy
- Token 401 → triggers GCPAuthManager.invalidate() then retries once
To swap to a different embedding model (e.g. OpenAI text-embedding-3-large): 1. Write OpenAIEmbeddingAdapter implementing EmbeddingPort 2. Change ONE import in services/container.py 3. Update EMBED_DIM in settings / .env
VertexEmbeddingAdapter ¶
Vertex AI text-embedding-005 adapter.
Injected into HybridRetriever via services/container.py.
Source code in prod/adapters/vertex_embedding.py
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | |
embed_query ¶
embed_document ¶
Embed a document with RETRIEVAL_DOCUMENT task type.
embed_documents_batch ¶
embed_documents_batch(texts: list[str], titles: list[str] | None = None) -> list[list[float] | None]
Embed multiple documents in batches.
Source code in prod/adapters/vertex_embedding.py
GeminiLLMAdapter¶
Implements LLMPort using the Vertex AI generateContent REST API.
Key behaviours:
- Requests
responseMimeType: application/json— guaranteed valid JSON output temperature: 0.1— low temperature for consistent, deterministic re-ranking- HTTP 401 → token refresh + retry
- HTTP 429 / 503 → exponential back-off
gemini_llm ¶
adapters/gemini_llm.py ────────────────────────────────────────────────────────────────────────────── Implements LLMPort using Vertex AI Gemini (generateContent REST API).
Key behaviour
- Sends systemInstruction + contents in the Vertex AI REST format
- Requests JSON output via responseMimeType: application/json
- Token 401 → triggers GCPAuthManager.invalidate() then retries once
- Retries on 429/503 with exponential back-off
- Returns raw JSON string (caller parses)
To swap to OpenAI-compatible endpoints
- Write OpenAILLMAdapter implementing LLMPort
- Change ONE import in services/container.py
GeminiLLMAdapter ¶
Vertex AI Gemini adapter.
Injected into LLMReranker via services/container.py.
Source code in prod/adapters/gemini_llm.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | |
generate_json ¶
Send a prompt and return the raw JSON response string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system_prompt
|
str
|
System-level instruction for Gemini. |
required |
user_message
|
str
|
User-turn message content. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Raw JSON string from the model, or None on recoverable failure. |
Raises:
| Type | Description |
|---|---|
LLMError
|
On unrecoverable API failure. |
Source code in prod/adapters/gemini_llm.py
OpenAIEmbeddingAdapter¶
Implements EmbeddingPort using the OpenAI Embeddings REST API.
Activated when EMBED_PROVIDER=openai.
Key behaviours:
- Uses
/v1/embeddingsdirectly viarequests(no SDK dependency) - Passes
dimensions=EMBED_DIMto the API so output matches the pgvector column width embed_queryandembed_documentuse the same endpoint (OpenAI embeddings are symmetric)- HTTP 401 → raises
AuthenticationErrorimmediately (key is wrong, no retry) - HTTP 429 / 500 / 503 → exponential back-off
Dimension compatibility
If your database was initialised with vector(768) (the Vertex AI default), set
EMBED_DIM=768 when using OpenAI models. text-embedding-3-small and
text-embedding-3-large both accept a dimensions parameter to reduce their
native output to any target size. For a fresh installation with OpenAI, setting
EMBED_DIM=1536 gives better quality from text-embedding-3-small.
openai_embedding ¶
adapters/openai_embedding.py ────────────────────────────────────────────────────────────────────────────── Implements EmbeddingPort using the OpenAI Embeddings API.
Key behaviour
- Uses /v1/embeddings via raw requests (no openai SDK dependency)
- Passes
dimensions=settings.embed_dimso output matches whatever pgvector column width the DB was initialised with - embed_query and embed_document call the same endpoint (OpenAI embeddings are symmetric — no RETRIEVAL_QUERY / RETRIEVAL_DOCUMENT distinction)
- Batches embed_documents_batch to stay within 2048-token-per-item limit
- Retries on 429 / 500 with exponential back-off
Required env vars
OPENAI_API_KEY — your OpenAI secret key (sk-...) OPENAI_EMBED_MODEL — default: text-embedding-3-small EMBED_DIM — default: 768; 1536 recommended for text-embedding-3-small
To enable
Set EMBED_PROVIDER=openai in your .env file.
OpenAIEmbeddingAdapter ¶
OpenAI text-embedding adapter.
Injected into HybridRetriever via services/container.py when
EMBED_PROVIDER=openai is set in the environment.
Note on dimensions
OpenAI's text-embedding-3-* models accept a dimensions
parameter to reduce the output to any size ≤ the model's native
dimension. This adapter always passes settings.embed_dim so
output vectors are compatible with the pgvector column width that
the database was initialised with.
Source code in prod/adapters/openai_embedding.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | |
embed_query ¶
Embed a search query.
OpenAI embeddings are symmetric, so the same endpoint and model is used for queries and documents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Natural-language query string. |
required |
Returns:
| Type | Description |
|---|---|
list[float]
|
Dense float vector of length |
Raises:
| Type | Description |
|---|---|
EmbeddingError
|
On API failure or unexpected response shape. |
Source code in prod/adapters/openai_embedding.py
embed_document ¶
Embed a document for storage.
The title parameter is accepted for interface compatibility
but is not used by the OpenAI embeddings API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
Document body text. |
required |
title
|
str
|
Ignored (OpenAI API does not use document titles). |
''
|
Returns:
| Type | Description |
|---|---|
list[float]
|
Dense float vector of length |
Source code in prod/adapters/openai_embedding.py
embed_documents_batch ¶
embed_documents_batch(texts: list[str], titles: list[str] | None = None) -> list[list[float] | None]
Embed multiple documents, batched to respect API limits.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
texts
|
list[str]
|
List of document strings to embed. |
required |
titles
|
list[str] | None
|
Ignored (OpenAI API does not use document titles). |
None
|
Returns:
| Type | Description |
|---|---|
list[list[float] | None]
|
List of float vectors; |
Source code in prod/adapters/openai_embedding.py
OpenAILLMAdapter¶
Implements LLMPort using the OpenAI Chat Completions REST API.
Activated when LLM_PROVIDER=openai.
Key behaviours:
- Uses
/v1/chat/completionswithresponse_format: {"type": "json_object"}— guarantees syntactically valid JSON output (same guarantee as Gemini'sresponseMimeType: application/json) temperature: 0.1— consistent with the Gemini adapter- The existing
build_system_prompt()already includes the word "JSON", satisfying OpenAI's JSON mode requirement with no prompt changes needed - HTTP 401 → raises
AuthenticationErrorimmediately - HTTP 429 / 500 / 503 → exponential back-off
openai_llm ¶
adapters/openai_llm.py ────────────────────────────────────────────────────────────────────────────── Implements LLMPort using the OpenAI Chat Completions API.
Key behaviour
- Uses /v1/chat/completions via raw requests (no openai SDK dependency)
- Requests JSON output via response_format={"type": "json_object"}
- system_prompt → system role message; user_message → user role message
- Retries on 429 / 500 with exponential back-off
- Returns the raw JSON string (caller parses); None on recoverable failure
Required env vars
OPENAI_API_KEY — your OpenAI secret key (sk-...) OPENAI_LLM_MODEL — default: gpt-4o
To enable
Set LLM_PROVIDER=openai in your .env file.
OpenAILLMAdapter ¶
OpenAI GPT chat completions adapter.
Injected into LLMReranker via services/container.py when
LLM_PROVIDER=openai is set in the environment.
JSON mode is enabled via response_format={"type": "json_object"},
which guarantees the model returns syntactically valid JSON without
needing to strip markdown fences.
.. note::
OpenAI's JSON mode requires the word "JSON" to appear somewhere in
the prompt. The existing build_system_prompt() in
config/prompts.py already includes this — no prompt changes are
needed.
Source code in prod/adapters/openai_llm.py
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | |
generate_json ¶
Send a prompt and return the raw JSON response string.
Uses response_format={"type": "json_object"} to enforce valid
JSON output from the model. The caller is responsible for parsing
the returned string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system_prompt
|
str
|
System-level instruction for the model. |
required |
user_message
|
str
|
User-turn message content. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Raw JSON string from the model, or |
Raises:
| Type | Description |
|---|---|
LLMError
|
On unrecoverable API failure (non-2xx after all retries). |
Source code in prod/adapters/openai_llm.py
PostgresDatabaseAdapter¶
Implements DatabasePort using psycopg2 and the pgvector extension.
Key behaviours:
- Connection is opened lazily on first query and reused
- On
OperationalError, reconnects once automatically vector_searchuses the HNSW index via the<=>cosine operatorfts_searchuses the GIN-indexedtsvectorcolumnfetch_by_codesusesANY(%s)for a single round-trip to fetch N records
Scaling to FastAPI
The current implementation uses a single persistent connection — appropriate
for Streamlit (single process, single thread). For a multi-threaded FastAPI
deployment, replace the connection management here with
psycopg2.pool.ThreadedConnectionPool. No other file needs to change.
postgres_db ¶
adapters/postgres_db.py ────────────────────────────────────────────────────────────────────────────── Implements DatabasePort using psycopg2 + pgvector.
Database layout (from ingest.py): Table : anzsic_codes Cols : anzsic_code (PK), anzsic_desc, class_code, class_desc, group_code, group_desc, subdivision_desc, division_desc, class_exclusions, enriched_text, embedding vector(768), fts_vector Index : HNSW cosine (embedding), GIN (fts_vector)
Three atomic methods match DatabasePort
vector_search → ANN search via pgvector <=> operator fts_search → FTS via tsquery fetch_by_codes → bulk SELECT by primary key list
Connection management
- A single connection is opened lazily and reused.
- On OperationalError the connection is reset and one retry is attempted.
- For Streamlit (single-process, single-thread) this is sufficient.
- For FastAPI: replace with a psycopg2 connection pool (e.g. psycopg2.pool. ThreadedConnectionPool) — change only this file.
To swap the database engine (e.g. to Weaviate or Pinecone): 1. Write WeaviateDatabaseAdapter implementing DatabasePort 2. Change ONE import in services/container.py
PostgresDatabaseAdapter ¶
psycopg2 + pgvector implementation of DatabasePort.
Uses a ThreadedConnectionPool so concurrent threads (FastAPI + Uvicorn thread pool) each borrow their own connection. The pool is created once per adapter instance and shared across all threads in a process.
Injected into HybridRetriever via services/container.py.
Source code in prod/adapters/postgres_db.py
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 | |
vector_search ¶
Approximate nearest-neighbour search via pgvector HNSW index.
Returns list of (anzsic_code, rank) tuples, rank starting at 1.
Source code in prod/adapters/postgres_db.py
fts_search ¶
Full-text search using the GIN-indexed tsvector column.
Uses OR between stemmed query tokens so that descriptive free-text queries like "fixes pipes in industries for AC" match records containing ANY of the meaningful terms (pipe, fix, industri, etc.) rather than requiring ALL terms to be present in the same record (AND semantics of plainto_tsquery would return zero hits for most natural-language inputs).
Falls back to an empty list rather than raising if no FTS results (colloquial queries often produce zero FTS hits — vector covers it).
Source code in prod/adapters/postgres_db.py
fetch_by_codes ¶
Fetch full records for a list of ANZSIC codes.
Returns a dict keyed by anzsic_code. Missing codes are absent.
Source code in prod/adapters/postgres_db.py
close ¶
Close all connections in the pool (called on process shutdown).