Services¶
Services contain the core business logic of the ANZSIC classifier. They import only Port protocols and domain models — never concrete adapters.
The dependency injection container (container.py) is the one exception —
it is the single file that names concrete adapter classes and wires the graph
together.
ClassifierPipeline¶
The main entry point for all interfaces (CLI, Streamlit, future API).
Call classify(SearchRequest) → ClassifyResponse.
classifier ¶
services/classifier.py ────────────────────────────────────────────────────────────────────────────── Pipeline orchestrator: wires Stage 1 (retrieval) + Stage 2 (reranking) into a single classify(SearchRequest) → ClassifyResponse call.
This is the primary entry point for all interfaces (CLI, Streamlit, future API). It knows nothing about infrastructure — it only speaks in domain objects.
SearchMode.FAST: Stage 1 only. Returns retrieval results sorted by RRF score, formatted as ClassifyResult objects. No LLM call. Suitable for interactive exploration.
SearchMode.HIGH_FIDELITY: Stage 1 + Stage 2. RRF candidates are re-ranked by the LLM, which adds a natural-language reason for each match. Recommended for production.
ClassifierPipeline ¶
Two-stage ANZSIC classification pipeline.
Inject via services/container.py — do not instantiate directly in application code.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retriever
|
HybridRetriever
|
HybridRetriever (Stage 1). |
required |
reranker
|
LLMReranker
|
LLMReranker (Stage 2). |
required |
evaluator
|
ANZSICEvaluator | None
|
ANZSICEvaluator (Stage 3, optional quality check). |
None
|
settings
|
Settings
|
Shared application settings. |
required |
Source code in prod/services/classifier.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | |
classify ¶
Classify an occupation/business description into ANZSIC codes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
request
|
SearchRequest
|
Validated SearchRequest (query, mode, top_k, retrieval_n). |
required |
Returns:
| Type | Description |
|---|---|
ClassifyResponse
|
ClassifyResponse with ranked results and metadata. |
Source code in prod/services/classifier.py
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | |
_candidate_to_result ¶
Convert a Stage 1 Candidate to a ClassifyResult for FAST mode.
Source code in prod/services/classifier.py
HybridRetriever¶
Stage 1 of the pipeline. Orchestrates embedding, dual search, RRF fusion,
and record fetch into a list of Candidate objects.
The compute_rrf() function is extracted at module level (not inside the class)
so it can be called directly in unit tests without any adapter or fixture setup.
retriever ¶
services/retriever.py ────────────────────────────────────────────────────────────────────────────── Stage 1 of the classification pipeline: Hybrid Retrieval with RRF Fusion.
Architecture
• Accepts any EmbeddingPort and DatabasePort via constructor injection. • _compute_rrf() is a pure Python function — no I/O, easily unit-tested. • HybridRetriever.retrieve() is the single public entry point.
Reciprocal Rank Fusion formula
score(d) = Σᵢ 1 / (k + rankᵢ(d))
Where k = RRF_K (default 60) and rankᵢ(d) is the rank of document d in system i. Documents that appear in both systems receive a combined score.
References
Cormack, G.V., Clarke, C.L.A., & Buettcher, S. (2009). Reciprocal Rank Fusion Outperforms Condorcet and Individual Rank Learning Methods. SIGIR.
HybridRetriever ¶
Hybrid retrieval using vector ANN search + FTS fused via RRF.
To change retrieval behaviour, swap the injected DatabasePort or EmbeddingPort — no code changes here.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db
|
DatabasePort
|
Any object satisfying DatabasePort. |
required |
embedder
|
EmbeddingPort
|
Any object satisfying EmbeddingPort. |
required |
settings
|
Settings
|
Shared application settings. |
required |
Source code in prod/services/retriever.py
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | |
retrieve ¶
Run hybrid retrieval for a query.
Workflow
- Embed query (RETRIEVAL_QUERY task type)
- Vector ANN search → top-n (code, rank) pairs
- FTS search → top-n (code, rank) pairs
- RRF fusion → merged, scored list (pure Python)
- Fetch full DB records for top-n fused codes
- Assemble and return Candidate objects
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Natural-language query. |
required |
n
|
int
|
Maximum number of candidates to return. |
required |
Returns:
| Type | Description |
|---|---|
list[Candidate]
|
List of Candidate objects sorted by RRF score descending. |
Raises:
| Type | Description |
|---|---|
EmbeddingError
|
If the embedding call fails. |
RetrievalError
|
If candidate assembly fails. |
Source code in prod/services/retriever.py
compute_rrf ¶
compute_rrf(vec_hits: list[tuple[str, int]], fts_hits: list[tuple[str, int]], k: int = 60) -> list[_RRFResult]
Compute Reciprocal Rank Fusion scores for two ranked lists.
This is a pure function — deterministic, no side-effects, no I/O. It is the ideal target for unit tests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
vec_hits
|
list[tuple[str, int]]
|
List of (anzsic_code, rank) from vector search. rank is 1-indexed, 1 = best match. |
required |
fts_hits
|
list[tuple[str, int]]
|
List of (anzsic_code, rank) from full-text search. |
required |
k
|
int
|
RRF smoothing constant (standard value = 60). |
60
|
Returns:
| Type | Description |
|---|---|
list[_RRFResult]
|
List of _RRFResult objects with combined scores. |
list[_RRFResult]
|
Not sorted — caller decides sort order. |
Examples:
>>> results = compute_rrf([("A", 1), ("B", 2)], [("A", 2), ("C", 1)])
>>> scores = {r.anzsic_code: r.rrf_score for r in results}
>>> scores["A"] > scores["B"] # A appears in both systems
True
>>> scores["A"] > scores["C"] # A's combined score beats FTS-only C
True
Source code in prod/services/retriever.py
LLMReranker¶
Stage 2 of the pipeline. Builds a structured prompt, calls the LLM, parses the JSON response, and implements the CSV fallback strategy.
CSV fallback strategy¶
flowchart TD
A[Call LLM\nnormal prompt] --> B{Results\nempty?}
B -- No --> C[Return results ✓]
B -- Yes --> D[Retry with CSV\nreference injected]
D --> E{Results\nempty?}
E -- No --> F[Return results ✓]
E -- Yes --> G[Return empty list\nlog error]
The first call never includes the 5,236-code CSV reference — keeping the prompt short (~2K tokens). Only if Gemini returns an empty array does the reranker retry with the full reference injected (~63K tokens). This retry-on-empty strategy outperforms always-inject because:
- Most queries match within the 20 Stage 1 candidates — no CSV needed
- Gemini's attention is not diluted by 63K tokens of irrelevant codes on the majority of calls
- The retry adds latency only for the rare low-confidence case
reranker ¶
services/reranker.py ────────────────────────────────────────────────────────────────────────────── Stage 2 of the classification pipeline: LLM Re-ranking.
Responsibilities
- Build a structured prompt from Stage 1 candidates (via config/prompts.py).
- Call the LLMPort to generate a ranked JSON response.
- Parse and validate the JSON into ClassifyResult objects.
- CSV fallback: if Gemini returns an empty list, retry with the full ANZSIC master CSV injected into the system prompt.
Fallback strategy
- Normal call first (no CSV reference) — keeps the prompt concise.
- If results list is empty, log a warning and retry with CSV injected.
- Two failed attempts → return empty list (caller handles gracefully).
The CSV reference is loaded ONCE at construction time and reused across all classify calls (amortised startup cost).
LLMReranker ¶
Re-rank Stage 1 candidates using an LLM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
llm
|
LLMPort
|
Any object satisfying LLMPort. |
required |
settings
|
Settings
|
Shared application settings. |
required |
Source code in prod/services/reranker.py
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | |
rerank ¶
Re-rank Stage 1 candidates using the LLM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Original search query. |
required |
candidates
|
list[Candidate]
|
Ordered list of Stage 1 Candidate objects. |
required |
top_k
|
int
|
How many results to return. |
required |
Returns:
| Type | Description |
|---|---|
list[ClassifyResult]
|
Ordered list of ClassifyResult objects (best match first). |
list[ClassifyResult]
|
Empty list if both LLM attempts fail. |
Source code in prod/services/reranker.py
Container (Dependency Injection)¶
The wiring point. Instantiates all adapters and injects them into services.
# Before
from prod.adapters.gemini_llm import GeminiLLMAdapter
llm = GeminiLLMAdapter(auth, settings)
# After (hypothetical OpenAI swap)
from prod.adapters.openai_llm import OpenAILLMAdapter
llm = OpenAILLMAdapter(settings)
container ¶
services/container.py ────────────────────────────────────────────────────────────────────────────── Dependency Injection container.
THIS IS THE ONLY FILE THAT NAMES CONCRETE ADAPTER CLASSES.
Provider selection is driven entirely by environment variables — no code changes are needed to switch between providers:
EMBED_PROVIDER=vertex (default) → VertexEmbeddingAdapter EMBED_PROVIDER=openai → OpenAIEmbeddingAdapter EMBED_PROVIDER=none → NullEmbeddingAdapter (FTS-only, no vector search)
LLM_PROVIDER=vertex (default) → GeminiLLMAdapter LLM_PROVIDER=openai → OpenAILLMAdapter LLM_PROVIDER=geni → GeniLLMAdapter LLM_PROVIDER=langchain_gemini → GeminiLangChainLLMAdapter
Mix-and-match is supported (e.g. OpenAI embeddings + Gemini LLM). GCPAuthManager is only instantiated when at least one GCP adapter is used.
Replace the database
- from prod.adapters.postgres_db import PostgresDatabaseAdapter
- from prod.adapters.weaviate_db import WeaviateDatabaseAdapter
Thread safety
@lru_cache(maxsize=1) makes get_pipeline() return the same instance across calls. For Streamlit this is fine (single process, one pipeline per run). For FastAPI with multiple workers, each worker process gets its own pipeline instance (one per process — correct behaviour for psycopg2 connections).
get_pipeline
cached
¶
Build and return the fully wired ClassifierPipeline singleton.
Provider selection is read from EMBED_PROVIDER and LLM_PROVIDER
environment variables. The @lru_cache ensures this runs only once
per process lifetime.
Returns:
| Type | Description |
|---|---|
ClassifierPipeline
|
Fully initialised ClassifierPipeline ready for use. |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If an unknown provider name is given. |
AuthenticationError
|
If required API keys / credentials are missing. |