Skip to content

Services

Services contain the core business logic of the ANZSIC classifier. They import only Port protocols and domain models — never concrete adapters.

The dependency injection container (container.py) is the one exception — it is the single file that names concrete adapter classes and wires the graph together.


ClassifierPipeline

The main entry point for all interfaces (CLI, Streamlit, future API). Call classify(SearchRequest)ClassifyResponse.

classifier

services/classifier.py ────────────────────────────────────────────────────────────────────────────── Pipeline orchestrator: wires Stage 1 (retrieval) + Stage 2 (reranking) into a single classify(SearchRequest) → ClassifyResponse call.

This is the primary entry point for all interfaces (CLI, Streamlit, future API). It knows nothing about infrastructure — it only speaks in domain objects.

SearchMode.FAST: Stage 1 only. Returns retrieval results sorted by RRF score, formatted as ClassifyResult objects. No LLM call. Suitable for interactive exploration.

SearchMode.HIGH_FIDELITY: Stage 1 + Stage 2. RRF candidates are re-ranked by the LLM, which adds a natural-language reason for each match. Recommended for production.

ClassifierPipeline

Two-stage ANZSIC classification pipeline.

Inject via services/container.py — do not instantiate directly in application code.

Parameters:

Name Type Description Default
retriever HybridRetriever

HybridRetriever (Stage 1).

required
reranker LLMReranker

LLMReranker (Stage 2).

required
evaluator ANZSICEvaluator | None

ANZSICEvaluator (Stage 3, optional quality check).

None
settings Settings

Shared application settings.

required
Source code in prod/services/classifier.py
class ClassifierPipeline:
    """Two-stage ANZSIC classification pipeline.

    Inject via services/container.py — do not instantiate directly in
    application code.

    Args:
        retriever:  HybridRetriever (Stage 1).
        reranker:   LLMReranker (Stage 2).
        evaluator:  ANZSICEvaluator (Stage 3, optional quality check).
        settings:   Shared application settings.
    """

    def __init__(
        self,
        retriever: HybridRetriever,
        reranker: LLMReranker,
        settings: Settings,
        evaluator: ANZSICEvaluator | None = None,
    ) -> None:
        self._retriever = retriever
        self._reranker = reranker
        self._settings = settings
        self._evaluator = evaluator

    # ── Public API ─────────────────────────────────────────────────────────

    def classify(self, request: SearchRequest) -> ClassifyResponse:
        """Classify an occupation/business description into ANZSIC codes.

        Args:
            request: Validated SearchRequest (query, mode, top_k, retrieval_n).

        Returns:
            ClassifyResponse with ranked results and metadata.
        """
        logger.info(
            "classify | query=%r mode=%s top_k=%d retrieval_n=%d",
            request.query[:80],
            request.mode.value,
            request.top_k,
            request.retrieval_n,
        )
        _t_total = time.perf_counter()

        # ── Stage 1: Hybrid Retrieval ──────────────────────────────────────
        _t1 = time.perf_counter()
        candidates = self._retriever.retrieve(
            query=request.query,
            n=request.retrieval_n,
        )
        _stage1_elapsed = time.perf_counter() - _t1
        logger.info(
            "⏱ [Classifier] stage=1_retrieval elapsed=%.3fs candidates=%d",
            _stage1_elapsed,
            len(candidates),
        )

        # ── Stage 2 (optional): LLM Re-ranking ────────────────────────────
        if request.mode == SearchMode.HIGH_FIDELITY:
            _t2 = time.perf_counter()
            results = self._reranker.rerank(
                query=request.query,
                candidates=candidates,
                top_k=request.top_k,
            )
            _stage2_elapsed = time.perf_counter() - _t2
            logger.info(
                "⏱ [Classifier] stage=2_llm_rerank elapsed=%.3fs results=%d",
                _stage2_elapsed,
                len(results),
            )
            llm_model = self._reranker._llm.model_name
        else:
            # FAST mode: convert top-k candidates directly to ClassifyResult
            results = [
                _candidate_to_result(c, rank=i + 1)
                for i, c in enumerate(candidates[: request.top_k])
            ]
            _stage2_elapsed = 0.0
            llm_model = ""

        _total_elapsed = time.perf_counter() - _t_total
        logger.info(
            "⏱ [Classifier] stage=total elapsed=%.3fs "
            "stage1=%.3fs stage2=%.3fs mode=%s",
            _total_elapsed,
            _stage1_elapsed,
            _stage2_elapsed,
            request.mode.value,
        )

        return ClassifyResponse(
            query=request.query,
            mode=request.mode.value,
            results=results,
            candidates_retrieved=len(candidates),
            generated_at=datetime.now(tz=timezone.utc),
            embed_model=self._retriever._embedder.model_name,
            llm_model=llm_model,
            evaluation=(
                self._evaluator.evaluate(
                    query=request.query,
                    results=results,
                    candidates=candidates,
                    top_k=request.top_k,
                )
                if self._evaluator and request.evaluate
                else None
            ),
        )

classify

classify(request: SearchRequest) -> ClassifyResponse

Classify an occupation/business description into ANZSIC codes.

Parameters:

Name Type Description Default
request SearchRequest

Validated SearchRequest (query, mode, top_k, retrieval_n).

required

Returns:

Type Description
ClassifyResponse

ClassifyResponse with ranked results and metadata.

Source code in prod/services/classifier.py
def classify(self, request: SearchRequest) -> ClassifyResponse:
    """Classify an occupation/business description into ANZSIC codes.

    Args:
        request: Validated SearchRequest (query, mode, top_k, retrieval_n).

    Returns:
        ClassifyResponse with ranked results and metadata.
    """
    logger.info(
        "classify | query=%r mode=%s top_k=%d retrieval_n=%d",
        request.query[:80],
        request.mode.value,
        request.top_k,
        request.retrieval_n,
    )
    _t_total = time.perf_counter()

    # ── Stage 1: Hybrid Retrieval ──────────────────────────────────────
    _t1 = time.perf_counter()
    candidates = self._retriever.retrieve(
        query=request.query,
        n=request.retrieval_n,
    )
    _stage1_elapsed = time.perf_counter() - _t1
    logger.info(
        "⏱ [Classifier] stage=1_retrieval elapsed=%.3fs candidates=%d",
        _stage1_elapsed,
        len(candidates),
    )

    # ── Stage 2 (optional): LLM Re-ranking ────────────────────────────
    if request.mode == SearchMode.HIGH_FIDELITY:
        _t2 = time.perf_counter()
        results = self._reranker.rerank(
            query=request.query,
            candidates=candidates,
            top_k=request.top_k,
        )
        _stage2_elapsed = time.perf_counter() - _t2
        logger.info(
            "⏱ [Classifier] stage=2_llm_rerank elapsed=%.3fs results=%d",
            _stage2_elapsed,
            len(results),
        )
        llm_model = self._reranker._llm.model_name
    else:
        # FAST mode: convert top-k candidates directly to ClassifyResult
        results = [
            _candidate_to_result(c, rank=i + 1)
            for i, c in enumerate(candidates[: request.top_k])
        ]
        _stage2_elapsed = 0.0
        llm_model = ""

    _total_elapsed = time.perf_counter() - _t_total
    logger.info(
        "⏱ [Classifier] stage=total elapsed=%.3fs "
        "stage1=%.3fs stage2=%.3fs mode=%s",
        _total_elapsed,
        _stage1_elapsed,
        _stage2_elapsed,
        request.mode.value,
    )

    return ClassifyResponse(
        query=request.query,
        mode=request.mode.value,
        results=results,
        candidates_retrieved=len(candidates),
        generated_at=datetime.now(tz=timezone.utc),
        embed_model=self._retriever._embedder.model_name,
        llm_model=llm_model,
        evaluation=(
            self._evaluator.evaluate(
                query=request.query,
                results=results,
                candidates=candidates,
                top_k=request.top_k,
            )
            if self._evaluator and request.evaluate
            else None
        ),
    )

_candidate_to_result

_candidate_to_result(candidate: Candidate, rank: int) -> ClassifyResult

Convert a Stage 1 Candidate to a ClassifyResult for FAST mode.

Source code in prod/services/classifier.py
def _candidate_to_result(candidate: Candidate, rank: int) -> ClassifyResult:
    """Convert a Stage 1 Candidate to a ClassifyResult for FAST mode."""
    return ClassifyResult(
        rank=rank,
        anzsic_code=candidate.anzsic_code,
        anzsic_desc=candidate.anzsic_desc,
        class_desc=candidate.class_desc,
        division_desc=candidate.division_desc,
        reason=f"RRF score: {candidate.rrf_score:.6f} "
               f"(vector={'✓' if candidate.in_vector else '✗'}, "
               f"fts={'✓' if candidate.in_fts else '✗'})",
    )

HybridRetriever

Stage 1 of the pipeline. Orchestrates embedding, dual search, RRF fusion, and record fetch into a list of Candidate objects.

The compute_rrf() function is extracted at module level (not inside the class) so it can be called directly in unit tests without any adapter or fixture setup.

retriever

services/retriever.py ────────────────────────────────────────────────────────────────────────────── Stage 1 of the classification pipeline: Hybrid Retrieval with RRF Fusion.

Architecture

• Accepts any EmbeddingPort and DatabasePort via constructor injection. • _compute_rrf() is a pure Python function — no I/O, easily unit-tested. • HybridRetriever.retrieve() is the single public entry point.

Reciprocal Rank Fusion formula

score(d) = Σᵢ 1 / (k + rankᵢ(d))

Where k = RRF_K (default 60) and rankᵢ(d) is the rank of document d in system i. Documents that appear in both systems receive a combined score.

References

Cormack, G.V., Clarke, C.L.A., & Buettcher, S. (2009). Reciprocal Rank Fusion Outperforms Condorcet and Individual Rank Learning Methods. SIGIR.

HybridRetriever

Hybrid retrieval using vector ANN search + FTS fused via RRF.

To change retrieval behaviour, swap the injected DatabasePort or EmbeddingPort — no code changes here.

Parameters:

Name Type Description Default
db DatabasePort

Any object satisfying DatabasePort.

required
embedder EmbeddingPort

Any object satisfying EmbeddingPort.

required
settings Settings

Shared application settings.

required
Source code in prod/services/retriever.py
class HybridRetriever:
    """Hybrid retrieval using vector ANN search + FTS fused via RRF.

    To change retrieval behaviour, swap the injected DatabasePort or
    EmbeddingPort — no code changes here.

    Args:
        db:       Any object satisfying DatabasePort.
        embedder: Any object satisfying EmbeddingPort.
        settings: Shared application settings.
    """

    def __init__(
        self,
        db: DatabasePort,
        embedder: EmbeddingPort,
        settings: Settings,
    ) -> None:
        self._db = db
        self._embedder = embedder
        self._rrf_k = settings.rrf_k
        logger.debug(
            "HybridRetriever init | embed_model=%s rrf_k=%d",
            embedder.model_name,
            self._rrf_k,
        )

    # ── Public API ─────────────────────────────────────────────────────────

    def retrieve(self, query: str, n: int) -> list[Candidate]:
        """Run hybrid retrieval for a query.

        Workflow:
          1. Embed query  (RETRIEVAL_QUERY task type)
          2. Vector ANN search → top-n (code, rank) pairs
          3. FTS search        → top-n (code, rank) pairs
          4. RRF fusion        → merged, scored list (pure Python)
          5. Fetch full DB records for top-n fused codes
          6. Assemble and return Candidate objects

        Args:
            query: Natural-language query.
            n:     Maximum number of candidates to return.

        Returns:
            List of Candidate objects sorted by RRF score descending.

        Raises:
            EmbeddingError: If the embedding call fails.
            RetrievalError: If candidate assembly fails.
        """
        logger.info("Retrieving candidates | query=%r n=%d", query[:80], n)

        # ── 1. Embed (skipped when NullEmbeddingAdapter returns []) ───────────
        query_vec = self._embedder.embed_query(query)

        # ── 2 & 3. Search ───────────────────────────────────────────
        # Empty query_vec = EMBED_PROVIDER=none (NullEmbeddingAdapter).
        # Skip vector search entirely; RRF still works with vec_hits=[].
        if query_vec:
            vec_hits = self._db.vector_search(query_vec, limit=n)
        else:
            logger.info("Vector search skipped (no embedding) — FTS-only mode")
            vec_hits = []
        fts_hits = self._db.fts_search(query, limit=n)
        logger.debug("vec_hits=%d  fts_hits=%d", len(vec_hits), len(fts_hits))

        # ── 4. RRF fusion (pure Python — no I/O) ──────────────────────────
        rrf_results = compute_rrf(vec_hits, fts_hits, k=self._rrf_k)
        top_rrf = sorted(rrf_results, key=lambda r: r.rrf_score, reverse=True)[:n]

        # ── 5. Fetch full records ──────────────────────────────────────────
        top_codes = [r.anzsic_code for r in top_rrf]
        records = self._db.fetch_by_codes(top_codes)

        # ── 6. Assemble Candidate objects ──────────────────────────────────
        candidates: list[Candidate] = []
        for rrf in top_rrf:
            rec = records.get(rrf.anzsic_code)
            if rec is None:
                logger.warning(
                    "Code %s in RRF results but missing from fetch_by_codes",
                    rrf.anzsic_code,
                )
                continue
            candidates.append(
                Candidate(
                    **rec,
                    rrf_score=round(rrf.rrf_score, 6),
                    in_vector=rrf.in_vector,
                    in_fts=rrf.in_fts,
                    vector_rank=rrf.vector_rank,
                    fts_rank=rrf.fts_rank,
                )
            )

        top_score = candidates[0].rrf_score if candidates else 0.0
        logger.info(
            "Retrieval complete | candidates=%d top_rrf=%.6f",
            len(candidates),
            top_score,
        )
        return candidates

retrieve

retrieve(query: str, n: int) -> list[Candidate]

Run hybrid retrieval for a query.

Workflow
  1. Embed query (RETRIEVAL_QUERY task type)
  2. Vector ANN search → top-n (code, rank) pairs
  3. FTS search → top-n (code, rank) pairs
  4. RRF fusion → merged, scored list (pure Python)
  5. Fetch full DB records for top-n fused codes
  6. Assemble and return Candidate objects

Parameters:

Name Type Description Default
query str

Natural-language query.

required
n int

Maximum number of candidates to return.

required

Returns:

Type Description
list[Candidate]

List of Candidate objects sorted by RRF score descending.

Raises:

Type Description
EmbeddingError

If the embedding call fails.

RetrievalError

If candidate assembly fails.

Source code in prod/services/retriever.py
def retrieve(self, query: str, n: int) -> list[Candidate]:
    """Run hybrid retrieval for a query.

    Workflow:
      1. Embed query  (RETRIEVAL_QUERY task type)
      2. Vector ANN search → top-n (code, rank) pairs
      3. FTS search        → top-n (code, rank) pairs
      4. RRF fusion        → merged, scored list (pure Python)
      5. Fetch full DB records for top-n fused codes
      6. Assemble and return Candidate objects

    Args:
        query: Natural-language query.
        n:     Maximum number of candidates to return.

    Returns:
        List of Candidate objects sorted by RRF score descending.

    Raises:
        EmbeddingError: If the embedding call fails.
        RetrievalError: If candidate assembly fails.
    """
    logger.info("Retrieving candidates | query=%r n=%d", query[:80], n)

    # ── 1. Embed (skipped when NullEmbeddingAdapter returns []) ───────────
    query_vec = self._embedder.embed_query(query)

    # ── 2 & 3. Search ───────────────────────────────────────────
    # Empty query_vec = EMBED_PROVIDER=none (NullEmbeddingAdapter).
    # Skip vector search entirely; RRF still works with vec_hits=[].
    if query_vec:
        vec_hits = self._db.vector_search(query_vec, limit=n)
    else:
        logger.info("Vector search skipped (no embedding) — FTS-only mode")
        vec_hits = []
    fts_hits = self._db.fts_search(query, limit=n)
    logger.debug("vec_hits=%d  fts_hits=%d", len(vec_hits), len(fts_hits))

    # ── 4. RRF fusion (pure Python — no I/O) ──────────────────────────
    rrf_results = compute_rrf(vec_hits, fts_hits, k=self._rrf_k)
    top_rrf = sorted(rrf_results, key=lambda r: r.rrf_score, reverse=True)[:n]

    # ── 5. Fetch full records ──────────────────────────────────────────
    top_codes = [r.anzsic_code for r in top_rrf]
    records = self._db.fetch_by_codes(top_codes)

    # ── 6. Assemble Candidate objects ──────────────────────────────────
    candidates: list[Candidate] = []
    for rrf in top_rrf:
        rec = records.get(rrf.anzsic_code)
        if rec is None:
            logger.warning(
                "Code %s in RRF results but missing from fetch_by_codes",
                rrf.anzsic_code,
            )
            continue
        candidates.append(
            Candidate(
                **rec,
                rrf_score=round(rrf.rrf_score, 6),
                in_vector=rrf.in_vector,
                in_fts=rrf.in_fts,
                vector_rank=rrf.vector_rank,
                fts_rank=rrf.fts_rank,
            )
        )

    top_score = candidates[0].rrf_score if candidates else 0.0
    logger.info(
        "Retrieval complete | candidates=%d top_rrf=%.6f",
        len(candidates),
        top_score,
    )
    return candidates

compute_rrf

compute_rrf(vec_hits: list[tuple[str, int]], fts_hits: list[tuple[str, int]], k: int = 60) -> list[_RRFResult]

Compute Reciprocal Rank Fusion scores for two ranked lists.

This is a pure function — deterministic, no side-effects, no I/O. It is the ideal target for unit tests.

Parameters:

Name Type Description Default
vec_hits list[tuple[str, int]]

List of (anzsic_code, rank) from vector search. rank is 1-indexed, 1 = best match.

required
fts_hits list[tuple[str, int]]

List of (anzsic_code, rank) from full-text search.

required
k int

RRF smoothing constant (standard value = 60).

60

Returns:

Type Description
list[_RRFResult]

List of _RRFResult objects with combined scores.

list[_RRFResult]

Not sorted — caller decides sort order.

Examples:

>>> results = compute_rrf([("A", 1), ("B", 2)], [("A", 2), ("C", 1)])
>>> scores = {r.anzsic_code: r.rrf_score for r in results}
>>> scores["A"] > scores["B"]   # A appears in both systems
True
>>> scores["A"] > scores["C"]   # A's combined score beats FTS-only C
True
Source code in prod/services/retriever.py
def compute_rrf(
    vec_hits: list[tuple[str, int]],
    fts_hits: list[tuple[str, int]],
    k: int = 60,
) -> list[_RRFResult]:
    """Compute Reciprocal Rank Fusion scores for two ranked lists.

    This is a *pure function* — deterministic, no side-effects, no I/O.
    It is the ideal target for unit tests.

    Args:
        vec_hits: List of (anzsic_code, rank) from vector search.
                  rank is 1-indexed, 1 = best match.
        fts_hits: List of (anzsic_code, rank) from full-text search.
        k:        RRF smoothing constant (standard value = 60).

    Returns:
        List of _RRFResult objects with combined scores.
        Not sorted — caller decides sort order.

    Examples:
        >>> results = compute_rrf([("A", 1), ("B", 2)], [("A", 2), ("C", 1)])
        >>> scores = {r.anzsic_code: r.rrf_score for r in results}
        >>> scores["A"] > scores["B"]   # A appears in both systems
        True
        >>> scores["A"] > scores["C"]   # A's combined score beats FTS-only C
        True
    """
    vec_map: dict[str, int] = dict(vec_hits)
    fts_map: dict[str, int] = dict(fts_hits)
    all_codes = set(vec_map) | set(fts_map)

    results: list[_RRFResult] = []
    for code in all_codes:
        score = 0.0
        v_rank = vec_map.get(code)
        f_rank = fts_map.get(code)
        if v_rank is not None:
            score += 1.0 / (k + v_rank)
        if f_rank is not None:
            score += 1.0 / (k + f_rank)
        results.append(
            _RRFResult(
                anzsic_code=code,
                rrf_score=score,
                in_vector=v_rank is not None,
                in_fts=f_rank is not None,
                vector_rank=v_rank,
                fts_rank=f_rank,
            )
        )
    return results

LLMReranker

Stage 2 of the pipeline. Builds a structured prompt, calls the LLM, parses the JSON response, and implements the CSV fallback strategy.

CSV fallback strategy

flowchart TD
    A[Call LLM\nnormal prompt] --> B{Results\nempty?}
    B -- No --> C[Return results ✓]
    B -- Yes --> D[Retry with CSV\nreference injected]
    D --> E{Results\nempty?}
    E -- No --> F[Return results ✓]
    E -- Yes --> G[Return empty list\nlog error]

The first call never includes the 5,236-code CSV reference — keeping the prompt short (~2K tokens). Only if Gemini returns an empty array does the reranker retry with the full reference injected (~63K tokens). This retry-on-empty strategy outperforms always-inject because:

  • Most queries match within the 20 Stage 1 candidates — no CSV needed
  • Gemini's attention is not diluted by 63K tokens of irrelevant codes on the majority of calls
  • The retry adds latency only for the rare low-confidence case

reranker

services/reranker.py ────────────────────────────────────────────────────────────────────────────── Stage 2 of the classification pipeline: LLM Re-ranking.

Responsibilities
  1. Build a structured prompt from Stage 1 candidates (via config/prompts.py).
  2. Call the LLMPort to generate a ranked JSON response.
  3. Parse and validate the JSON into ClassifyResult objects.
  4. CSV fallback: if Gemini returns an empty list, retry with the full ANZSIC master CSV injected into the system prompt.
Fallback strategy
  • Normal call first (no CSV reference) — keeps the prompt concise.
  • If results list is empty, log a warning and retry with CSV injected.
  • Two failed attempts → return empty list (caller handles gracefully).

The CSV reference is loaded ONCE at construction time and reused across all classify calls (amortised startup cost).

LLMReranker

Re-rank Stage 1 candidates using an LLM.

Parameters:

Name Type Description Default
llm LLMPort

Any object satisfying LLMPort.

required
settings Settings

Shared application settings.

required
Source code in prod/services/reranker.py
class LLMReranker:
    """Re-rank Stage 1 candidates using an LLM.

    Args:
        llm:      Any object satisfying LLMPort.
        settings: Shared application settings.
    """

    def __init__(self, llm: LLMPort, settings: Settings) -> None:
        self._llm = llm
        self._settings = settings
        self._csv_reference = self._load_csv_reference()
        has_ref = bool(self._csv_reference)
        logger.debug(
            "LLMReranker init | model=%s csv_reference_loaded=%s",
            llm.model_name,
            has_ref,
        )

    # ── Public API ─────────────────────────────────────────────────────────

    def rerank(
        self,
        query: str,
        candidates: list[Candidate],
        top_k: int,
    ) -> list[ClassifyResult]:
        """Re-rank Stage 1 candidates using the LLM.

        Args:
            query:      Original search query.
            candidates: Ordered list of Stage 1 Candidate objects.
            top_k:      How many results to return.

        Returns:
            Ordered list of ClassifyResult objects (best match first).
            Empty list if both LLM attempts fail.
        """
        if not candidates:
            if not self._csv_reference:
                # No candidates AND no CSV reference — nothing to give the LLM.
                logger.warning(
                    "LLMReranker.rerank: no candidates and no CSV reference loaded "
                    "— cannot classify %r", query
                )
                return []
            # No Stage 1 candidates, but we have the full CSV.
            # Let the LLM search the reference directly for the best matches.
            logger.warning(
                "LLMReranker.rerank: no candidates for %r — "
                "falling back to CSV-only LLM call", query
            )

        # ── Attempt 1: candidates only (no full CSV) ─────────────────────
        # Keep the prompt concise — Stage 1 retrieval should already surface
        # the right codes.  Skipping the 5 000-row CSV reference shaves tokens
        # and latency from every call.
        results = self._call_llm(query, candidates, top_k, include_reference=False)
        if results:
            return results

        # ── Attempt 2: retry WITH full CSV reference ───────────────────────
        # Only reached when Attempt 1 returns nothing (rare edge case).
        # The CSV gives the LLM broader context to find an obscure match.
        logger.warning(
            "LLM returned empty results for %r — retrying with CSV reference", query
        )
        results = self._call_llm(query, candidates, top_k, include_reference=True)
        if results:
            logger.info("Retry succeeded for %r", query)
            return results

        logger.error("LLMReranker: both attempts failed for query %r", query)
        return []

    # ── Private helpers ────────────────────────────────────────────────────

    def _call_llm(
        self,
        query: str,
        candidates: list[Candidate],
        top_k: int,
        include_reference: bool,
    ) -> list[ClassifyResult]:
        """Build prompt, call LLM, parse response."""
        candidate_dicts = [c.model_dump() for c in candidates]
        system = build_system_prompt(
            include_reference=include_reference,
            csv_reference=self._csv_reference,
        )
        user = build_user_message(query, candidate_dicts, top_k)

        raw = self._llm.generate_json(system, user)
        if not raw:
            return []
        return self._parse_response(raw, top_k)

    def _parse_response(self, raw: str, top_k: int) -> list[ClassifyResult]:
        """Parse the LLM JSON response into ClassifyResult objects.

        The model may return a bare JSON array or an object wrapping one.
        Both formats are handled gracefully.

        Returns:
            List of ClassifyResult objects (empty on parse failure).
        """
        if not raw:
            return []
        try:
            parsed = json.loads(raw)
        except (json.JSONDecodeError, TypeError):
            logger.error("LLMReranker: failed to parse JSON: %.200s", raw)
            return []

        # Unwrap if the model returned {"results": [...]} or similar
        if isinstance(parsed, dict):
            items = next(
                (v for v in parsed.values() if isinstance(v, list)),
                [],
            )
        elif isinstance(parsed, list):
            items = parsed
        else:
            logger.error("LLMReranker: unexpected JSON type %s", type(parsed).__name__)
            return []

        results: list[ClassifyResult] = []
        for item in items[:top_k]:
            try:
                results.append(ClassifyResult(**item))
            except Exception as exc:
                logger.warning("Skipping malformed result item %s: %s", item, exc)

        return results

    def _load_csv_reference(self) -> str:
        """Load the ANZSIC master CSV as a compact CODE: description string.

        Loads only anzsic_code + anzsic_desc to keep token count low.
        Returns empty string if the file is missing (fallback is simply skipped).
        """
        csv_path = Path(self._settings.master_csv_path)
        if not csv_path.exists():
            logger.warning(
                "master_csv_path not found: %s — CSV fallback disabled", csv_path
            )
            return ""

        try:
            lines: list[str] = []
            with csv_path.open(encoding="utf-8") as fh:
                reader = csv.DictReader(fh)
                for row in reader:
                    code = (row.get("anzsic_code") or "").strip()
                    desc = (row.get("anzsic_desc") or "").strip()
                    if code and desc:
                        lines.append(f"{code}: {desc}")
            reference = "\n".join(lines)
            logger.info(
                "CSV reference loaded: %d entries (%d chars)",
                len(lines),
                len(reference),
            )
            return reference
        except Exception as exc:
            logger.error("Failed to load CSV reference: %s", exc)
            return ""

rerank

rerank(query: str, candidates: list[Candidate], top_k: int) -> list[ClassifyResult]

Re-rank Stage 1 candidates using the LLM.

Parameters:

Name Type Description Default
query str

Original search query.

required
candidates list[Candidate]

Ordered list of Stage 1 Candidate objects.

required
top_k int

How many results to return.

required

Returns:

Type Description
list[ClassifyResult]

Ordered list of ClassifyResult objects (best match first).

list[ClassifyResult]

Empty list if both LLM attempts fail.

Source code in prod/services/reranker.py
def rerank(
    self,
    query: str,
    candidates: list[Candidate],
    top_k: int,
) -> list[ClassifyResult]:
    """Re-rank Stage 1 candidates using the LLM.

    Args:
        query:      Original search query.
        candidates: Ordered list of Stage 1 Candidate objects.
        top_k:      How many results to return.

    Returns:
        Ordered list of ClassifyResult objects (best match first).
        Empty list if both LLM attempts fail.
    """
    if not candidates:
        if not self._csv_reference:
            # No candidates AND no CSV reference — nothing to give the LLM.
            logger.warning(
                "LLMReranker.rerank: no candidates and no CSV reference loaded "
                "— cannot classify %r", query
            )
            return []
        # No Stage 1 candidates, but we have the full CSV.
        # Let the LLM search the reference directly for the best matches.
        logger.warning(
            "LLMReranker.rerank: no candidates for %r — "
            "falling back to CSV-only LLM call", query
        )

    # ── Attempt 1: candidates only (no full CSV) ─────────────────────
    # Keep the prompt concise — Stage 1 retrieval should already surface
    # the right codes.  Skipping the 5 000-row CSV reference shaves tokens
    # and latency from every call.
    results = self._call_llm(query, candidates, top_k, include_reference=False)
    if results:
        return results

    # ── Attempt 2: retry WITH full CSV reference ───────────────────────
    # Only reached when Attempt 1 returns nothing (rare edge case).
    # The CSV gives the LLM broader context to find an obscure match.
    logger.warning(
        "LLM returned empty results for %r — retrying with CSV reference", query
    )
    results = self._call_llm(query, candidates, top_k, include_reference=True)
    if results:
        logger.info("Retry succeeded for %r", query)
        return results

    logger.error("LLMReranker: both attempts failed for query %r", query)
    return []

Container (Dependency Injection)

The wiring point. Instantiates all adapters and injects them into services.

Swapping the LLM — change exactly one line
# Before
from prod.adapters.gemini_llm import GeminiLLMAdapter
llm = GeminiLLMAdapter(auth, settings)

# After (hypothetical OpenAI swap)
from prod.adapters.openai_llm import OpenAILLMAdapter
llm = OpenAILLMAdapter(settings)

container

services/container.py ────────────────────────────────────────────────────────────────────────────── Dependency Injection container.

THIS IS THE ONLY FILE THAT NAMES CONCRETE ADAPTER CLASSES.

Provider selection is driven entirely by environment variables — no code changes are needed to switch between providers:

EMBED_PROVIDER=vertex (default) → VertexEmbeddingAdapter EMBED_PROVIDER=openai → OpenAIEmbeddingAdapter EMBED_PROVIDER=none → NullEmbeddingAdapter (FTS-only, no vector search)

LLM_PROVIDER=vertex (default) → GeminiLLMAdapter LLM_PROVIDER=openai → OpenAILLMAdapter LLM_PROVIDER=geni → GeniLLMAdapter LLM_PROVIDER=langchain_gemini → GeminiLangChainLLMAdapter

Mix-and-match is supported (e.g. OpenAI embeddings + Gemini LLM). GCPAuthManager is only instantiated when at least one GCP adapter is used.

Replace the database
  • from prod.adapters.postgres_db import PostgresDatabaseAdapter
  • from prod.adapters.weaviate_db import WeaviateDatabaseAdapter
Thread safety

@lru_cache(maxsize=1) makes get_pipeline() return the same instance across calls. For Streamlit this is fine (single process, one pipeline per run). For FastAPI with multiple workers, each worker process gets its own pipeline instance (one per process — correct behaviour for psycopg2 connections).

get_pipeline cached

get_pipeline() -> ClassifierPipeline

Build and return the fully wired ClassifierPipeline singleton.

Provider selection is read from EMBED_PROVIDER and LLM_PROVIDER environment variables. The @lru_cache ensures this runs only once per process lifetime.

Returns:

Type Description
ClassifierPipeline

Fully initialised ClassifierPipeline ready for use.

Raises:

Type Description
ConfigurationError

If an unknown provider name is given.

AuthenticationError

If required API keys / credentials are missing.

Source code in prod/services/container.py
@lru_cache(maxsize=1)
def get_pipeline() -> ClassifierPipeline:
    """Build and return the fully wired ClassifierPipeline singleton.

    Provider selection is read from ``EMBED_PROVIDER`` and ``LLM_PROVIDER``
    environment variables.  The ``@lru_cache`` ensures this runs only once
    per process lifetime.

    Returns:
        Fully initialised ClassifierPipeline ready for use.

    Raises:
        ConfigurationError: If an unknown provider name is given.
        AuthenticationError: If required API keys / credentials are missing.
    """
    settings = get_settings()
    logger.info(
        "Building ClassifierPipeline | embed_provider=%s llm_provider=%s",
        settings.embed_provider,
        settings.llm_provider,
    )

    # ── Infrastructure adapters (provider-selected) ────────────────────────
    embedder = _build_embedder(settings)   # EmbeddingPort
    llm      = _build_llm(settings)        # LLMPort
    db       = PostgresDatabaseAdapter(settings)   # DatabasePort

    # ── Services (receive only Port interfaces, not concrete types) ────────
    retriever = HybridRetriever(db=db, embedder=embedder, settings=settings)
    reranker  = LLMReranker(llm=llm, settings=settings)
    evaluator = ANZSICEvaluator(settings.master_csv_path)

    pipeline = ClassifierPipeline(
        retriever=retriever,
        reranker=reranker,
        settings=settings,
        evaluator=evaluator,
    )

    logger.info(
        "ClassifierPipeline ready | embedder=%s llm=%s",
        embedder.model_name,
        llm.model_name,
    )
    return pipeline