Domain Models
The domain layer contains all the pure Python types used throughout the system.
Nothing here imports from infrastructure — these models are the shared language
between every layer.
Models
The key object flow is:
SearchRequest → [pipeline] → ClassifyResponse
└── results: list[ClassifyResult]
(internally via Candidate objects)
models
domain/models.py
──────────────────────────────────────────────────────────────────────────────
Pure domain objects — Pydantic models with no imports from adapters or ports.
These models are the lingua franca of the entire system
• adapters produce and consume them
• services orchestrate them
• interfaces (CLI, Streamlit, future API) serialise them
Adding a FastAPI layer later is trivial because Pydantic models serialise
directly to JSON schema — no extra DTOs or marshallers required.
SearchMode
Bases: str, Enum
Controls which pipeline stages are executed.
Source code in prod/domain/models.py
| class SearchMode(str, Enum):
"""Controls which pipeline stages are executed."""
FAST = "fast" # Stage 1 only: hybrid retrieval → RRF
HIGH_FIDELITY = "high_fidelity" # Stage 1 + Stage 2: retrieval + Gemini re-rank
|
SearchRequest
Bases: BaseModel
Validated input to the ClassifierPipeline.
Source code in prod/domain/models.py
| class SearchRequest(BaseModel):
"""Validated input to the ClassifierPipeline."""
query: str = Field(..., min_length=1, max_length=2000,
description="Occupation or business description to classify")
mode: SearchMode = Field(SearchMode.HIGH_FIDELITY,
description="FAST = retrieval only; HIGH_FIDELITY = + LLM re-rank")
top_k: int = Field(5, ge=1, le=20,
description="Maximum number of results to return")
retrieval_n: int = Field(20, ge=5, le=50,
description="RRF candidate pool size per search system")
evaluate: bool = Field(True,
description="Attach programmatic evaluation scores to the response")
@field_validator("query")
@classmethod
def strip_query(cls, v: str) -> str:
return v.strip()
|
Candidate
Bases: BaseModel
A single ANZSIC code retrieved by the hybrid search (Stage 1).
Source code in prod/domain/models.py
| class Candidate(BaseModel):
"""A single ANZSIC code retrieved by the hybrid search (Stage 1)."""
anzsic_code: str
anzsic_desc: str
# Hierarchical classification fields
class_code: Optional[str] = None
class_desc: Optional[str] = None
group_code: Optional[str] = None
group_desc: Optional[str] = None
subdivision_desc: Optional[str] = None
division_desc: Optional[str] = None
class_exclusions: Optional[str] = None
enriched_text: Optional[str] = None
# RRF fusion metadata
rrf_score: float = 0.0
in_vector: bool = False
in_fts: bool = False
vector_rank: Optional[int] = None
fts_rank: Optional[int] = None
@property
def source_label(self) -> str:
"""Human-readable source badge: BOTH / VEC / FTS."""
if self.in_vector and self.in_fts:
return "BOTH"
if self.in_vector:
return "VEC"
if self.in_fts:
return "FTS"
return "—"
|
source_label
property
Human-readable source badge: BOTH / VEC / FTS.
ClassifyResult
Bases: BaseModel
A single ANZSIC code after LLM re-ranking (Stage 2).
In FAST mode the results are assembled directly from Candidate objects
(no reason field). In HIGH_FIDELITY mode Gemini populates 'reason'.
Source code in prod/domain/models.py
| class ClassifyResult(BaseModel):
"""A single ANZSIC code after LLM re-ranking (Stage 2).
In FAST mode the results are assembled directly from Candidate objects
(no reason field). In HIGH_FIDELITY mode Gemini populates 'reason'.
"""
rank: int
anzsic_code: str
anzsic_desc: str
class_desc: Optional[str] = None
division_desc: Optional[str] = None
reason: Optional[str] = None
score: Optional[int] = None # 0–1000 LLM confidence (1000 = perfect match)
# Carry-through from Stage 1 for display purposes
group_desc: Optional[str] = None
subdivision_desc: Optional[str] = None
class_exclusions: Optional[str] = None
rrf_score: Optional[float] = None
in_vector: Optional[bool] = None
in_fts: Optional[bool] = None
vector_rank: Optional[int] = None
fts_rank: Optional[int] = None
@property
def source_label(self) -> str:
if self.in_vector and self.in_fts:
return "BOTH"
if self.in_vector:
return "VEC"
if self.in_fts:
return "FTS"
return "—"
|
ClassifyResponse
Bases: BaseModel
Complete response from ClassifierPipeline.classify().
This is the object serialised to JSON when serving via an API endpoint.
Source code in prod/domain/models.py
| class ClassifyResponse(BaseModel):
"""Complete response from ClassifierPipeline.classify().
This is the object serialised to JSON when serving via an API endpoint.
"""
query: str
mode: str
results: list[ClassifyResult]
candidates_retrieved: int
generated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
embed_model: str = ""
llm_model: str = ""
evaluation: Optional[EvaluationReport] = None
def to_dict(self) -> dict:
"""Serialise to a plain dict (JSON-safe floats/bools)."""
return self.model_dump(mode="json")
|
to_dict
Serialise to a plain dict (JSON-safe floats/bools).
Source code in prod/domain/models.py
| def to_dict(self) -> dict:
"""Serialise to a plain dict (JSON-safe floats/bools)."""
return self.model_dump(mode="json")
|
Exceptions
All errors raised by the system are subclasses of ANZSICError.
This means callers can catch the base class for broad handling, or individual
subclasses for fine-grained recovery.
from prod.domain.exceptions import ANZSICError, AuthenticationError
try:
response = pipeline.classify(request)
except AuthenticationError:
# Token expired — re-authenticate and retry
...
except ANZSICError as e:
# Any other classifier error
logger.error("Classification failed: %s", e)
When a FastAPI layer is added, the exception hierarchy maps directly to HTTP
status codes (see docstring in exceptions.py).
exceptions
domain/exceptions.py
──────────────────────────────────────────────────────────────────────────────
Custom exception hierarchy.
All exceptions are rooted at ANZSICError so callers can catch broadly
(except ANZSICError) or narrowly (except EmbeddingError).
When adding a FastAPI layer, map these to appropriate HTTP status codes:
AuthenticationError → 401
DatabaseError → 503
EmbeddingError → 502
LLMError → 502
ValidationError → 422 (Pydantic handles this automatically)
ANZSICError
Bases: Exception
Base exception for all application errors.
Source code in prod/domain/exceptions.py
| class ANZSICError(Exception):
"""Base exception for all application errors."""
|
ConfigurationError
Bases: ANZSICError
Raised when required configuration is missing or invalid.
Source code in prod/domain/exceptions.py
| class ConfigurationError(ANZSICError):
"""Raised when required configuration is missing or invalid."""
|
AuthenticationError
Bases: ANZSICError
Raised when GCP auth token acquisition fails.
Source code in prod/domain/exceptions.py
| class AuthenticationError(ANZSICError):
"""Raised when GCP auth token acquisition fails."""
|
EmbeddingError
Bases: ANZSICError
Raised when the embedding API call fails or returns invalid output.
Source code in prod/domain/exceptions.py
| class EmbeddingError(ANZSICError):
"""Raised when the embedding API call fails or returns invalid output."""
|
LLMError
Bases: ANZSICError
Raised when the LLM API call fails or returns unparseable output.
Source code in prod/domain/exceptions.py
| class LLMError(ANZSICError):
"""Raised when the LLM API call fails or returns unparseable output."""
|
DatabaseError
Bases: ANZSICError
Raised when a database operation fails.
Source code in prod/domain/exceptions.py
| class DatabaseError(ANZSICError):
"""Raised when a database operation fails."""
|
RetrievalError
Bases: ANZSICError
Raised when Stage 1 retrieval returns no usable candidates.
Source code in prod/domain/exceptions.py
| class RetrievalError(ANZSICError):
"""Raised when Stage 1 retrieval returns no usable candidates."""
|
RerankError
Bases: ANZSICError
Raised when Stage 2 re-ranking fails after all retries.
Source code in prod/domain/exceptions.py
| class RerankError(ANZSICError):
"""Raised when Stage 2 re-ranking fails after all retries."""
|