LLM Chain: Add foundation for chain execution with database schema#616
LLM Chain: Add foundation for chain execution with database schema#616
Conversation
📝 WalkthroughWalkthroughAdds end-to-end LLM chain orchestration: DB migration and models for chain tracking, new API endpoints (submission + callbacks), CRUD helpers, chain execution framework (blocks, executor, types), job orchestration with guardrails/LLM call integration, tests, and docs. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client/API
participant Router as LLM Chain Route
participant JobSvc as Job Service
participant DB as Database
participant Worker as Worker/Executor
participant Provider as LLM Provider
Client->>Router: POST /llm/chain (request + callback_url)
Router->>JobSvc: start_chain_job(request, project_id, org_id)
JobSvc->>DB: Create Job (LLM_CHAIN) & LlmChain record (PENDING)
JobSvc->>Worker: Enqueue chain task
JobSvc-->>Client: Return job UUID
Worker->>DB: _setup() - mark job PROCESSING, chain RUNNING
Worker->>Worker: apply_input_guardrails, resolve first block
loop per block
Worker->>Provider: execute LLM call for block
Provider-->>Worker: LLM response
Worker->>DB: create llm_call (with chain_id), mark block completed
Worker->>Worker: apply_output_guardrails, send intermediate callback (optional)
Worker->>Worker: derive next block input
end
Worker->>DB: update chain status COMPLETED with output/usage
Worker->>DB: update job status SUCCESS
Worker->>Client: send final callback (if configured)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/models/job.py (1)
55-60:⚠️ Potential issue | 🟡 MinorStale column comment —
LLM_CHAINmissing from the example list.The
sa_column_kwargscomment still reads"(e.g., RESPONSE, LLM_API)". Update it to include the new value for accuracy.📝 Proposed fix
- "comment": "Type of job being executed (e.g., RESPONSE, LLM_API)" + "comment": "Type of job being executed (e.g., RESPONSE, LLM_API, LLM_CHAIN)"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/models/job.py` around lines 55 - 60, Update the stale column comment in the job_type Field: edit the sa_column_kwargs comment on the job_type field (JobType) to include the new enum value LLM_CHAIN (e.g., change "Type of job being executed (e.g., RESPONSE, LLM_API)" to include LLM_CHAIN in the example list) so the DB column comment accurately reflects all relevant job types.backend/app/crud/llm.py (1)
62-75:⚠️ Potential issue | 🟡 MinorDocstring
Argssection missingchain_id.The new parameter is undocumented in the function's docstring.
📝 Proposed fix
Args: session: Database session request: The LLM call request containing query and config job_id: Reference to the parent job + chain_id: Optional reference to a parent LLM chain record project_id: Project this LLM call belongs to organization_id: Organization this LLM call belongs to resolved_config: The resolved configuration blob (either from stored config or ad-hoc)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/crud/llm.py` around lines 62 - 75, The docstring for the function that creates a new LLM call record (create_llm_call) is missing documentation for the new chain_id parameter; update the Args section to include an entry for chain_id with its type and a short description (e.g., "chain_id: Optional[str] — Reference to the parent chain for this LLM call, if any"), ensuring it matches the parameter name used in the function signature and follows the same formatting style as the other Args entries.
🧹 Nitpick comments (8)
backend/app/models/llm/response.py (1)
67-80:LLMChainResponseis structurally identical toLLMCallResponse— consider sharing a base class.Both models have the exact same three fields (
response,usage,provider_raw_response). Extracting a common base avoids diverging in the future.♻️ Proposed refactor
+class LLMCallResponseBase(SQLModel): + """Shared fields for single-call and chain LLM responses.""" + response: LLMResponse = Field(..., description="Normalized, structured LLM response.") + usage: Usage = Field(..., description="Token usage and cost information.") + provider_raw_response: dict[str, object] | None = Field( + default=None, + description="Unmodified raw response from the LLM provider.", + ) + -class LLMCallResponse(SQLModel): - """Top-level response schema for an LLM API call.""" - - response: LLMResponse = Field(...) - usage: Usage = Field(...) - provider_raw_response: dict[str, object] | None = Field(default=None, ...) +class LLMCallResponse(LLMCallResponseBase): + """Top-level response schema for an LLM API call.""" -class LLMChainResponse(SQLModel): - """Response schema for an LLM chain execution.""" - - response: LLMResponse = Field(...) - usage: Usage = Field(...) - provider_raw_response: dict[str, object] | None = Field(default=None, ...) +class LLMChainResponse(LLMCallResponseBase): + """Response schema for an LLM chain execution."""🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/models/llm/response.py` around lines 67 - 80, LLMChainResponse and LLMCallResponse share identical fields—extract those three fields into a shared base SQLModel (e.g., LLMResponseBase or BaseLLMCall) that defines response: LLMResponse, usage: Usage, and provider_raw_response: dict[str, object] | None with the same Field descriptions, then have both LLMChainResponse and LLMCallResponse inherit from that base (removing their duplicate field declarations); ensure imports and type annotations remain unchanged and run tests/validators to confirm serialization and schema behavior is preserved.backend/app/services/llm/chain/types.py (1)
7-18:BlockResult.usageis redundant withresponse.usage— potential consistency footgun.When
response is not None,block_result.usageduplicatesblock_result.response.usage. Callers that populate both independently can silently produce inconsistent data. Consider removingusagefromBlockResultand accessing it viaresult.response.usageinstead.♻️ Proposed refactor
`@dataclass` class BlockResult: """Result of a single block/LLM call execution.""" response: LLMCallResponse | None = None llm_call_id: UUID | None = None - usage: Usage | None = None error: str | None = None `@property` def success(self) -> bool: return self.error is None and self.response is not None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/chain/types.py` around lines 7 - 18, BlockResult currently stores a redundant usage field that can diverge from response.usage; remove the usage attribute from the BlockResult dataclass and any assignments that set block_result.usage, and update all call sites to read usage from block_result.response.usage (guarding for response being None). Update the BlockResult type signature and the success property remains unchanged; search for references to BlockResult.usage and replace them with the guarded access (e.g., result.response and then .usage) to maintain null-safety.backend/app/crud/llm_chain.py (1)
85-99: Useeliffor mutually exclusive status branches and capture timestamp once.The status branches are mutually exclusive (enum values), but using independent
ifstatements is misleading. Also,now()is called multiple times —updated_atandstarted_at/completed_atwill have slightly different values within the same update.♻️ Proposed fix
- db_chain.status = status - db_chain.updated_at = now() - - if status == ChainStatus.RUNNING: - db_chain.started_at = now() - - if status == ChainStatus.FAILED: - db_chain.error = error - db_chain.total_usage = total_usage - db_chain.completed_at = now() - - if status == ChainStatus.COMPLETED: - db_chain.output = output - db_chain.total_usage = total_usage - db_chain.completed_at = now() + current_time = now() + db_chain.status = status + db_chain.updated_at = current_time + + if status == ChainStatus.RUNNING: + db_chain.started_at = current_time + elif status == ChainStatus.FAILED: + db_chain.error = error + db_chain.total_usage = total_usage + db_chain.completed_at = current_time + elif status == ChainStatus.COMPLETED: + db_chain.output = output + db_chain.total_usage = total_usage + db_chain.completed_at = current_time🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/crud/llm_chain.py` around lines 85 - 99, The status-update block uses separate ifs and calls now() multiple times; refactor the update in the function that sets db_chain to compute a single timestamp once (e.g., ts = now()) and assign db_chain.updated_at = ts, then use an if/elif/elif chain on status (checking ChainStatus.RUNNING, ChainStatus.FAILED, ChainStatus.COMPLETED) to set the mutually exclusive fields (db_chain.started_at, db_chain.error, db_chain.total_usage, db_chain.completed_at, db_chain.output) so only the matching branch runs and all timestamps use the same ts value.backend/app/services/llm/jobs.py (1)
461-474:job_idparameter shadowed by reassignment on Line 474.
job_idis declared asstrin the function signature (Line 465) and then rebound toUUIDon Line 474:job_id: UUID = UUID(job_id). This works at runtime but is a type annotation violation — the variable changes type. Use a different name for the converted value.Proposed fix
- request = LLMCallRequest(**request_data) - job_id: UUID = UUID(job_id) + request = LLMCallRequest(**request_data) + job_uuid: UUID = UUID(job_id)Then replace all subsequent references to
job_idwithinexecute_jobwithjob_uuid(matching the pattern already used inexecute_chain_jobat Line 603).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 461 - 474, The function execute_job currently reassigns the str parameter job_id to a UUID on Line 474 which shadows the original type; create a new variable (e.g., job_uuid) by converting the incoming job_id string to UUID and replace all subsequent uses of the old job_id variable inside execute_job with job_uuid (mirroring the pattern used in execute_chain_job) so the original parameter type is preserved and annotations remain correct.backend/app/models/llm/request.py (3)
570-576: UseStrEnuminstead of(str, Enum)— Python 3.11+ is the target.Per the coding guidelines, this codebase targets Python 3.11+, which provides
enum.StrEnum. Ruff also flags this (UP042).♻️ Proposed fix
Update the import at Line 1:
-from enum import Enum +from enum import Enum, StrEnumThen update the class:
-class ChainStatus(str, Enum): +class ChainStatus(StrEnum): """Status of an LLM chain execution.""" PENDING = "pending" RUNNING = "running" FAILED = "failed" COMPLETED = "completed"As per coding guidelines: "Use Python 3.11+ with type hints throughout the codebase".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/models/llm/request.py` around lines 570 - 576, Replace the Enum subclass for ChainStatus with StrEnum: import StrEnum from enum and change class ChainStatus(str, Enum) to class ChainStatus(StrEnum); keep the same member names/values (PENDING, RUNNING, FAILED, COMPLETED) so behavior is unchanged and ruff UP042 is satisfied. Ensure any type hints or comparisons that relied on str behavior remain valid (StrEnum inherits str) and run tests/lint to confirm no further adjustments needed.
640-647:block_sequences:default_factory=listconflicts withnullable=Trueandlist[str] | Nonetype.The default factory always produces
[], so a newLlmChaininstance will never haveblock_sequences=None. But the type hint allowsNoneand the DB column is nullable. This inconsistency can cause confusion — callers might checkif chain.block_sequences is Nonethinking it indicates "not started" but that will never be true for newly created records.Consider aligning: either use
default=None(and handle the empty case asNone) or make the columnNOT NULLwith a default of[].🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/models/llm/request.py` around lines 640 - 647, The Field definition for block_sequences in the LlmChain model is inconsistent: it declares type list[str] | None and sa_column nullable=True but uses default_factory=list so new instances are never None; change this to be consistent by either removing default_factory and using default=None (so block_sequences: list[str] | None = Field(default=None, sa_column=sa.Column(JSONB, nullable=True, ...)) to allow None) or make the column non-nullable and keep default_factory=list (set sa.Column(..., nullable=False) and keep default_factory=list and type list[str]), and update any callers that check for None to instead check for empty list if you choose the non-nullable option; locate and update the block_sequences Field in the LlmChain/request model accordingly.
522-567: Naming collision:ChainBlock(request model) vsChainBlock(execution class).Two distinct
ChainBlockclasses share the same name inbackend/app/models/llm/request.py(line 522, SQLModel) andbackend/app/services/llm/chain/chain.py(line 125, execution class). Inexecute_chain_job, both exist in overlapping scope — the request model is accessed via theLLMChainRequesttype (containinglist[ChainBlock]), while the local import at line 599 brings in the execution class. The code works becauserequest.blocksis already deserialized before the import shadows, but this is fragile and confusing.Consider renaming the execution class (e.g.,
ChainBlockNodeorExecutableChainBlock) to eliminate ambiguity.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/models/llm/request.py` around lines 522 - 567, The request-side SQLModel ChainBlock class conflicts with the execution-side ChainBlock class in services; rename the execution class in backend/app/services/llm/chain/chain.py (e.g., to ExecutableChainBlock or ChainBlockNode) and update all references/imports (including the local import used in execute_chain_job and any usages, method names, or type hints that reference the old ChainBlock) so the request model list[ChainBlock] and the runtime execution class no longer share the same identifier.backend/app/services/llm/chain/chain.py (1)
151-191: Recursive chain execution — unbounded stack depth for long chains.
ChainBlock.executecallsself._next.execute(next_query)recursively. Python's default recursion limit is ~1000. With typical chain sizes (2–10 blocks) this is fine, but there's no upper-bound validation onLLMChainRequest.blockslength to prevent a user from submitting a pathologically long chain.Consider adding a
max_lengthconstraint onLLMChainRequest.blocksin the request model, or converting this to an iterative loop.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/chain/chain.py` around lines 151 - 191, ChainBlock.execute currently recurses via self._next.execute causing unbounded stack depth; replace the recursion with an iterative traversal: implement a loop in ChainBlock.execute that starts with current=self and current_query=query, calls execute_llm_call (same args), calls self._context.on_block_completed(current._index, result), returns immediately on failure, otherwise sets current_query = result_to_query(result) and current = current._next until current is None, then return the last result; remove the recursive call to self._next.execute; additionally add a defensive max_length constraint on LLMChainRequest.blocks in the request model (e.g., pydantic Field(max_items=...)) to prevent extremely long chains if desired.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/api/routes/llm_chain.py`:
- Line 20: Both route handlers missing return type annotations: add explicit
return type hints for llm_callback_notification and llm_chain to match what they
actually return (e.g., APIResponse[LLMChainResponse], fastapi.Response,
starlette.responses.JSONResponse, or None). Update the function signatures
(llm_callback_notification(body: APIResponse[LLMChainResponse]) and
llm_chain(...)) to include the correct -> <ReturnType> annotation, ensuring the
declared type aligns with the handler’s actual return values.
- Line 34: The Swagger description for the /llm/chain endpoint is loading the
wrong markdown (description=load_description("llm/llm_call.md")) — replace the
call to load_description with the chain-specific file (e.g.,
load_description("llm/llm_chain.md")), add or update llm_chain.md with the
appropriate chain documentation, and ensure the route handler that registers
/llm/chain uses this updated description so the API docs accurately reflect the
chain behavior.
- Around line 48-62: The endpoint currently treats request.callback_url as
optional but always returns "will be delivered via callback", which is incorrect
when LLMChainRequest.callback_url is None; either enforce a non-null callback by
validating request.callback_url and raising/returning a 400 before calling
start_chain_job (use validate_callback_url and LLMChainRequest.callback_url to
gate behavior), or change the flow when callback_url is None to return a
synchronous result or a job identifier for polling (have start_chain_job return
a job_id/status and update the APIResponse.success_response payload and Message
accordingly); ensure the response text matches the chosen path and do not claim
callback delivery when no callback_url was provided.
- Around line 16-17: The route decorator uses an invalid OpenAPI runtime
expression "{$callback_url}"; update the expression to a valid OpenAPI runtime
expression that points to where the callback URL actually lives (e.g. request
body or query). Replace "{$callback_url}" in the llm_callback_router.post
decorator with a compliant expression such as "{$request.body#/callback_url}" or
"{$request.query.callback_url}" depending on where the callback_url is provided,
ensuring the symbol llm_callback_router.post is updated accordingly.
In `@backend/app/models/llm/response.py`:
- Around line 83-89: The class docstring for IntermediateChainResponse contains
a typo: change "Flattend structure matching LLMCallResponse keys for
consistency" to "Flattened structure matching LLMCallResponse keys for
consistency" in the IntermediateChainResponse definition in response.py so the
documentation reads correctly.
In `@backend/app/services/llm/chain/chain.py`:
- Around line 31-51: ChainContext currently types callback_url as str but
callers can pass None; update the dataclass to allow Optional types by changing
callback_url: str to callback_url: str | None and tighten request_metadata to
dict[str, Any] | None; ensure any code instantiating ChainContext (e.g., in
execute_chain_job) continues to pass None or str and adjust any downstream uses
to handle None safely (refer to the ChainContext dataclass and the callback_url
and request_metadata fields).
In `@backend/app/services/llm/chain/executor.py`:
- Around line 160-197: The _handle_error method can abort before updating DB if
send_callback raises; modify _handle_error so DB state is always persisted
first: call update_llm_chain_status(...) and JobCrud(session).update(...) before
attempting send_callback, and/or wrap send_callback in its own try/except that
logs but does not re-raise. Ensure you still build callback_response from
APIResponse.failure_response and include references to _handle_error,
send_callback, update_llm_chain_status, and JobCrud.update when making the
change so the chain status is set to ChainStatus.FAILED and the job status to
JobStatus.FAILED even if the callback fails.
- Around line 127-158: The success path in _teardown currently calls
send_callback before persisting DB state, so if send_callback raises the job and
chain updates may never be applied; move the send_callback call (and its
callback_url check) to after JobCrud(session).update(...) and
update_llm_chain_status(...) so the DB updates are performed first, then send
the callback with callback_response.model_dump(), and finally return the
callback response; keep all existing objects (LLMChainResponse, APIResponse) and
use the same self._request.callback_url check.
In `@backend/app/services/llm/jobs.py`:
- Around line 687-691: The callback_url being passed to handle_job_error is a
Pydantic v2 HttpUrl (request.callback_url) and must be converted to a native str
before use; update all calls that pass request.callback_url (the ones that
ultimately call handle_job_error -> send_callback) to pass
str(request.callback_url) if request.callback_url else None so the signature
handle_job_error(callback_url: str | None, ...) and send_callback(callback_url:
str, ...) receive proper types; apply this change to every occurrence in this
module where request.callback_url is forwarded (e.g., the calls around job error
handling that reference handle_job_error and send_callback).
- Around line 294-300: apply_output_guardrails is returning the rephrased text
as the "error" tuple element, causing execute_job to treat safe-but-rephrased
outputs as failures; change apply_output_guardrails so that when safe["success"]
is True and safe["data"].get("rephrase_needed") is truthy, it still returns
(result, None) (i.e., no error) after updating
result.response.response.output.content.value, and if you need to signal that
rephrasing occurred add a non-error flag/metadata on the result (e.g.,
result.response.metadata["rephrased"]=True) instead of returning the text as the
error; this aligns apply_output_guardrails with apply_input_guardrails and
prevents execute_job from misclassifying rephrased outputs as errors.
---
Outside diff comments:
In `@backend/app/crud/llm.py`:
- Around line 62-75: The docstring for the function that creates a new LLM call
record (create_llm_call) is missing documentation for the new chain_id
parameter; update the Args section to include an entry for chain_id with its
type and a short description (e.g., "chain_id: Optional[str] — Reference to the
parent chain for this LLM call, if any"), ensuring it matches the parameter name
used in the function signature and follows the same formatting style as the
other Args entries.
In `@backend/app/models/job.py`:
- Around line 55-60: Update the stale column comment in the job_type Field: edit
the sa_column_kwargs comment on the job_type field (JobType) to include the new
enum value LLM_CHAIN (e.g., change "Type of job being executed (e.g., RESPONSE,
LLM_API)" to include LLM_CHAIN in the example list) so the DB column comment
accurately reflects all relevant job types.
---
Nitpick comments:
In `@backend/app/crud/llm_chain.py`:
- Around line 85-99: The status-update block uses separate ifs and calls now()
multiple times; refactor the update in the function that sets db_chain to
compute a single timestamp once (e.g., ts = now()) and assign
db_chain.updated_at = ts, then use an if/elif/elif chain on status (checking
ChainStatus.RUNNING, ChainStatus.FAILED, ChainStatus.COMPLETED) to set the
mutually exclusive fields (db_chain.started_at, db_chain.error,
db_chain.total_usage, db_chain.completed_at, db_chain.output) so only the
matching branch runs and all timestamps use the same ts value.
In `@backend/app/models/llm/request.py`:
- Around line 570-576: Replace the Enum subclass for ChainStatus with StrEnum:
import StrEnum from enum and change class ChainStatus(str, Enum) to class
ChainStatus(StrEnum); keep the same member names/values (PENDING, RUNNING,
FAILED, COMPLETED) so behavior is unchanged and ruff UP042 is satisfied. Ensure
any type hints or comparisons that relied on str behavior remain valid (StrEnum
inherits str) and run tests/lint to confirm no further adjustments needed.
- Around line 640-647: The Field definition for block_sequences in the LlmChain
model is inconsistent: it declares type list[str] | None and sa_column
nullable=True but uses default_factory=list so new instances are never None;
change this to be consistent by either removing default_factory and using
default=None (so block_sequences: list[str] | None = Field(default=None,
sa_column=sa.Column(JSONB, nullable=True, ...)) to allow None) or make the
column non-nullable and keep default_factory=list (set sa.Column(...,
nullable=False) and keep default_factory=list and type list[str]), and update
any callers that check for None to instead check for empty list if you choose
the non-nullable option; locate and update the block_sequences Field in the
LlmChain/request model accordingly.
- Around line 522-567: The request-side SQLModel ChainBlock class conflicts with
the execution-side ChainBlock class in services; rename the execution class in
backend/app/services/llm/chain/chain.py (e.g., to ExecutableChainBlock or
ChainBlockNode) and update all references/imports (including the local import
used in execute_chain_job and any usages, method names, or type hints that
reference the old ChainBlock) so the request model list[ChainBlock] and the
runtime execution class no longer share the same identifier.
In `@backend/app/models/llm/response.py`:
- Around line 67-80: LLMChainResponse and LLMCallResponse share identical
fields—extract those three fields into a shared base SQLModel (e.g.,
LLMResponseBase or BaseLLMCall) that defines response: LLMResponse, usage:
Usage, and provider_raw_response: dict[str, object] | None with the same Field
descriptions, then have both LLMChainResponse and LLMCallResponse inherit from
that base (removing their duplicate field declarations); ensure imports and type
annotations remain unchanged and run tests/validators to confirm serialization
and schema behavior is preserved.
In `@backend/app/services/llm/chain/chain.py`:
- Around line 151-191: ChainBlock.execute currently recurses via
self._next.execute causing unbounded stack depth; replace the recursion with an
iterative traversal: implement a loop in ChainBlock.execute that starts with
current=self and current_query=query, calls execute_llm_call (same args), calls
self._context.on_block_completed(current._index, result), returns immediately on
failure, otherwise sets current_query = result_to_query(result) and current =
current._next until current is None, then return the last result; remove the
recursive call to self._next.execute; additionally add a defensive max_length
constraint on LLMChainRequest.blocks in the request model (e.g., pydantic
Field(max_items=...)) to prevent extremely long chains if desired.
In `@backend/app/services/llm/chain/types.py`:
- Around line 7-18: BlockResult currently stores a redundant usage field that
can diverge from response.usage; remove the usage attribute from the BlockResult
dataclass and any assignments that set block_result.usage, and update all call
sites to read usage from block_result.response.usage (guarding for response
being None). Update the BlockResult type signature and the success property
remains unchanged; search for references to BlockResult.usage and replace them
with the guarded access (e.g., result.response and then .usage) to maintain
null-safety.
In `@backend/app/services/llm/jobs.py`:
- Around line 461-474: The function execute_job currently reassigns the str
parameter job_id to a UUID on Line 474 which shadows the original type; create a
new variable (e.g., job_uuid) by converting the incoming job_id string to UUID
and replace all subsequent uses of the old job_id variable inside execute_job
with job_uuid (mirroring the pattern used in execute_chain_job) so the original
parameter type is preserved and annotations remain correct.
| @llm_callback_router.post( | ||
| "{$callback_url}", |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find and read the file
find . -name "llm_chain.py" -path "*/backend/app/api/*" | head -5Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 112
🏁 Script executed:
# Also check file structure to understand the path
find . -type f -name "llm_chain.py"Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 144
🏁 Script executed:
# Let me also search for the specific file with less strict matching
fd "llm_chain\.py$"Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 140
🏁 Script executed:
cat -n backend/app/api/routes/llm_chain.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 2400
🏁 Script executed:
# Check if the description files exist
find . -name "llm_call.md" -o -name "llm_chain.md" | grep -E "(llm_call|llm_chain)"Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 113
🏁 Script executed:
# Search for where load_description is defined to understand its usage
rg "def load_description" -A 5Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 403
🏁 Script executed:
# Check the LLMChainRequest model to verify callback_url optionality
rg "class LLMChainRequest" -A 20Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 1424
🏁 Script executed:
# Check if llm_chain.md exists
find . -name "llm_chain.md"Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 57
🏁 Script executed:
# Let's also check what description files exist in the llm directory
ls -la backend/app/api/docs/llm/Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 249
🌐 Web query:
OpenAPI 3.0 callback runtime expression syntax specification
💡 Result:
In OpenAPI 3.0.x, the URL template key inside a Callback Object is a runtime expression (“key expression”) evaluated at runtime to produce the callback URL (e.g., $request.body#/url). (spec.openapis.org)
Runtime expression syntax (ABNF)
expression = ( "$url" / "$method" / "$statusCode"
/ "$request." source / "$response." source )
source = ( header-reference / query-reference / path-reference / body-reference )
header-reference = "header." token
query-reference = "query." name
path-reference = "path." name
body-reference = "body" ["#" json-pointer ]
json-pointer = *( "/" reference-token )
reference-token = *( unescaped / escaped )
escaped = "~" ( "0" / "1" ) ; "~0" => "~", "~1" => "/"Common callback/key examples
- Request body property:
$request.body#/user/uuid(spec.openapis.org) - Request URL:
$url(spec.openapis.org) - Response body property:
$response.body#/status(spec.openapis.org)
Embedding into strings (typical for callback URLs)
Runtime expressions can be embedded in a string by wrapping them in {} (for example, a callback key like {$request.body#/callbackUrl}). (spec.openapis.org)
Citations:
- 1: https://spec.openapis.org/oas/v3.0.3
- 2: https://spec.openapis.org/oas/v3.0.3
- 3: https://spec.openapis.org/oas/v3.0.3
- 4: https://spec.openapis.org/oas/v3.0.3
- 5: https://spec.openapis.org/oas/v3.0.3
- 6: https://spec.openapis.org/oas/v3.0.3
🏁 Script executed:
# Check the entire llm_chain function to see all return paths
sed -n '39,62p' backend/app/api/routes/llm_chain.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 750
🏁 Script executed:
# Verify what the llm_call.md content says and if it's appropriate for llm_chain endpoint
cat backend/app/api/docs/llm/llm_call.md | head -20Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 981
🏁 Script executed:
# Check if there are any return type hints elsewhere in the file or similar endpoints
rg "@router.post" -A 10 backend/app/api/routes/ | head -50Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 2876
🏁 Script executed:
# Check what happens in start_chain_job when callback_url is None
rg "def start_chain_job" -A 20 backend/app/services/Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 1519
🏁 Script executed:
# Also check the llm_callback_notification function return type more carefully
sed -n '20,29p' backend/app/api/routes/llm_chain.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 484
🏁 Script executed:
# Check response_model to infer what return type should be
sed -n '32,38p' backend/app/api/routes/llm_chain.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 320
🏁 Script executed:
# Check if there are other endpoints with similar callback patterns to understand the intended usage
rg "callback" -i backend/app/api/routes/ -lRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 284
🏁 Script executed:
# Let me also check if execute_chain_job uses the callback_url
rg "def execute_chain_job" -A 30 backend/app/services/Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 2098
🏁 Script executed:
# Check what happens when callback_url is None in the job execution
rg "callback_url" backend/app/services/llm/jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 845
🏁 Script executed:
# Final verification: confirm the exact issue exists in the code
sed -n '15,19p' backend/app/api/routes/llm_chain.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 158
Fix invalid OpenAPI runtime expression — {$callback_url} does not conform to spec.
OpenAPI 3.0 defines runtime expressions using the ABNF grammar: expression = ( "$url" / "$method" / "$statusCode" / "$request." source / "$response." source ). The token $callback_url is not a recognized form; valid patterns require $request.body, $response.body, or similar qualified sources.
Proposed fix
- "{$callback_url}",
+ "{$request.body#/callback_url}",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/api/routes/llm_chain.py` around lines 16 - 17, The route
decorator uses an invalid OpenAPI runtime expression "{$callback_url}"; update
the expression to a valid OpenAPI runtime expression that points to where the
callback URL actually lives (e.g. request body or query). Replace
"{$callback_url}" in the llm_callback_router.post decorator with a compliant
expression such as "{$request.body#/callback_url}" or
"{$request.query.callback_url}" depending on where the callback_url is provided,
ensuring the symbol llm_callback_router.post is updated accordingly.
There was a problem hiding this comment.
same "{$callback_url}" pattern is used in the existing /llm/call endpoint, its just a swagger documentation convention for showing the callback schemes, it would not going to affect the runtime. I know for consistency it needed, but both the llm/call and llm/chain need to be fixed if this is the case. which can be fixed in next pr
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify the claim that the same {$callback_url} pattern is used in the existing /llm/call endpoint
rg -n "{.*callback.*}" backend/app/api/routes/ -A 2 -B 2
Length of output: 89
🏁 Script executed:
#!/bin/bash
# Verify the claim that the same {$callback_url} pattern is used in the existing /llm/call endpoint
rg -n '\{\$callback' backend/app/api/routes/ -A 2 -B 2
Length of output: 1128
@vprashrex, you're absolutely correct! I verified that the same {$callback_url} pattern is used consistently across multiple existing endpoints:
backend/app/api/routes/llm.py(line 19)backend/app/api/routes/documents.py(line 51)backend/app/api/routes/collections.py(line 46)backend/app/api/routes/llm_chain.py(line 17)
Your suggestion to fix all callback endpoints together in a future PR makes sense for consistency. Would you like me to create a tracking issue for this, so it's not forgotten?
| "{$callback_url}", | ||
| name="llm_chain_callback", | ||
| ) | ||
| def llm_callback_notification(body: APIResponse[LLMChainResponse]): |
There was a problem hiding this comment.
Missing return type hints on both route handlers — coding guideline violation.
llm_callback_notification and llm_chain both lack return type annotations.
🐛 Proposed fix
-def llm_callback_notification(body: APIResponse[LLMChainResponse]):
+def llm_callback_notification(body: APIResponse[LLMChainResponse]) -> None:-def llm_chain(
- _current_user: AuthContextDep, _session: SessionDep, request: LLMChainRequest
-):
+def llm_chain(
+ _current_user: AuthContextDep, _session: SessionDep, request: LLMChainRequest
+) -> APIResponse[Message]:As per coding guidelines: "Always add type hints to all function parameters and return values in Python code."
Also applies to: 39-41
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/api/routes/llm_chain.py` at line 20, Both route handlers missing
return type annotations: add explicit return type hints for
llm_callback_notification and llm_chain to match what they actually return
(e.g., APIResponse[LLMChainResponse], fastapi.Response,
starlette.responses.JSONResponse, or None). Update the function signatures
(llm_callback_notification(body: APIResponse[LLMChainResponse]) and
llm_chain(...)) to include the correct -> <ReturnType> annotation, ensuring the
declared type aligns with the handler’s actual return values.
There was a problem hiding this comment.
the existing /llm/call handlers (llm_callback_notification, llm_call) also don't have return type annotations following same pattern for the /llm/chain as well. and also this is standard FastAPI convention no fix needed
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find the existing /llm/call route handlers to verify the user's claim
fd -e py -x cat {} \; | grep -A 10 "def llm_call\|def llm_callback_notification" backend/app/api/routes/ || true
# Let's specifically look for the llm.py route file (for /llm/call endpoints)
find backend/app/api/routes -name "llm.py" -type f -exec cat {} \;
Length of output: 1907
@vprashrex, you're absolutely right — I verified that the existing /llm/call handlers (llm_callback_notification and llm_call in backend/app/api/routes/llm.py) also omit return type annotations, so your new handlers are indeed consistent with the established pattern in this codebase. I understand your decision to maintain that consistency.
For context: while FastAPI does allow omitting return types and will infer them in many cases, explicit return type hints are generally recommended because they improve OpenAPI schema generation and IDE support. That said, maintaining consistency within a codebase is also valuable, and you're applying the same pattern used elsewhere.
| if request.callback_url: | ||
| validate_callback_url(str(request.callback_url)) | ||
|
|
||
| start_chain_job( | ||
| db=_session, | ||
| request=request, | ||
| project_id=project_id, | ||
| organization_id=organization_id, | ||
| ) | ||
|
|
||
| return APIResponse.success_response( | ||
| data=Message( | ||
| message="Your response is being generated and will be delivered via callback." | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Unconditional "callback" success message when callback_url is optional — results are silently lost.
LLMChainRequest.callback_url is HttpUrl | None with default=None, so it's entirely valid to submit a chain request without a callback URL. When that happens the endpoint still responds "Your response is being generated and will be delivered via callback.", which is incorrect: there is no delivery mechanism and the caller has no way to retrieve the result.
Consider either (a) requiring callback_url to be non-null, or (b) returning a synchronous result / job-ID for polling when no callback URL is supplied, and adjusting the response message accordingly.
🐛 Minimal guard (option a — require callback_url)
+from fastapi import HTTPException, status
if request.callback_url:
validate_callback_url(str(request.callback_url))
+else:
+ raise HTTPException(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ detail="callback_url is required for LLM chain execution.",
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if request.callback_url: | |
| validate_callback_url(str(request.callback_url)) | |
| start_chain_job( | |
| db=_session, | |
| request=request, | |
| project_id=project_id, | |
| organization_id=organization_id, | |
| ) | |
| return APIResponse.success_response( | |
| data=Message( | |
| message="Your response is being generated and will be delivered via callback." | |
| ), | |
| ) | |
| from fastapi import HTTPException, status | |
| if request.callback_url: | |
| validate_callback_url(str(request.callback_url)) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| detail="callback_url is required for LLM chain execution.", | |
| ) | |
| start_chain_job( | |
| db=_session, | |
| request=request, | |
| project_id=project_id, | |
| organization_id=organization_id, | |
| ) | |
| return APIResponse.success_response( | |
| data=Message( | |
| message="Your response is being generated and will be delivered via callback." | |
| ), | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/api/routes/llm_chain.py` around lines 48 - 62, The endpoint
currently treats request.callback_url as optional but always returns "will be
delivered via callback", which is incorrect when LLMChainRequest.callback_url is
None; either enforce a non-null callback by validating request.callback_url and
raising/returning a 400 before calling start_chain_job (use
validate_callback_url and LLMChainRequest.callback_url to gate behavior), or
change the flow when callback_url is None to return a synchronous result or a
job identifier for polling (have start_chain_job return a job_id/status and
update the APIResponse.success_response payload and Message accordingly); ensure
the response text matches the chosen path and do not claim callback delivery
when no callback_url was provided.
There was a problem hiding this comment.
That's the design choice. which was introduced for the /llm/call. and same way I incorporate that for the /llm/chain as well, to make it consistent. if it need to be fixed, will create new pr for fixing this.
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check the /llm/call endpoint to verify if it has the same pattern
# Look for the llm_call endpoint and check if callback_url is optional and if the message mentions callback delivery
# Find the file with llm_call endpoint
fd -t f "llm_call.py" backend/app/api/routes/ -x cat {}
Length of output: 0
🏁 Script executed:
#!/bin/bash
# Also check the LLMCallRequest model to see if callback_url is optional
rg -A 5 -B 5 "class LLMCallRequest" --type=py
Length of output: 743
🏁 Script executed:
#!/bin/bash
# Search for llm_call route definition more broadly
rg -A 30 "@router.post.*\/llm\/call" --type=py
Length of output: 105
🏁 Script executed:
#!/bin/bash
# List all route files to understand the structure
fd -t f ".py" backend/app/api/routes/
Length of output: 1443
🏁 Script executed:
#!/bin/bash
# Check LLMCallRequest for callback_url field
rg -A 15 "class LLMCallRequest" backend/app/models/llm/request.py
Length of output: 607
🏁 Script executed:
#!/bin/bash
# Check the llm.py route file for the /llm/call endpoint
cat backend/app/api/routes/llm.py
Length of output: 1861
🏁 Script executed:
#!/bin/bash
# Check for callback_url in LLMCallRequest
rg "callback_url" backend/app/models/llm/request.py -A 2 -B 2
Length of output: 274
@vprashrex, thanks for the clarification! I verified and you're absolutely right — /llm/call has the identical pattern (optional callback_url with the same unconditional "via callback" message), so maintaining consistency makes sense.
Since this affects both endpoints and you're open to fixing it in a separate PR, would you like me to create a GitHub issue to track this improvement? I can document both /llm/call and /llm/chain in the issue so it's captured for future work.
| class IntermediateChainResponse(SQLModel): | ||
| """ | ||
| Intermediate callback response from the intermediate blocks | ||
| from the llm chain execution. (if configured) | ||
|
|
||
| Flattend structure matching LLMCallResponse keys for consistency | ||
| """ |
There was a problem hiding this comment.
Typo in docstring: Flattend → Flattened.
📝 Proposed fix
- Flattend structure matching LLMCallResponse keys for consistency
+ Flattened structure matching LLMCallResponse keys for consistency📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class IntermediateChainResponse(SQLModel): | |
| """ | |
| Intermediate callback response from the intermediate blocks | |
| from the llm chain execution. (if configured) | |
| Flattend structure matching LLMCallResponse keys for consistency | |
| """ | |
| class IntermediateChainResponse(SQLModel): | |
| """ | |
| Intermediate callback response from the intermediate blocks | |
| from the llm chain execution. (if configured) | |
| Flattened structure matching LLMCallResponse keys for consistency | |
| """ |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/models/llm/response.py` around lines 83 - 89, The class docstring
for IntermediateChainResponse contains a typo: change "Flattend structure
matching LLMCallResponse keys for consistency" to "Flattened structure matching
LLMCallResponse keys for consistency" in the IntermediateChainResponse
definition in response.py so the documentation reads correctly.
| @dataclass | ||
| class ChainContext: | ||
| """Shared state passed to all blocks. Accumulates responses.""" | ||
|
|
||
| job_id: UUID | ||
| chain_id: UUID | ||
| project_id: int | ||
| organization_id: int | ||
| callback_url: str | ||
| total_blocks: int | ||
|
|
||
| langfuse_credentials: dict[str, Any] | None = None | ||
| request_metadata: dict | None = None | ||
| intermediate_callback_flags: list[bool] = field(default_factory=list) | ||
| aggregated_usage: Usage = field( | ||
| default_factory=lambda: Usage( | ||
| input_tokens=0, | ||
| output_tokens=0, | ||
| total_tokens=0, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
callback_url type is str but receives None from callers.
In execute_chain_job (backend/app/services/llm/jobs.py, Line 644), callback_url is set to None when request.callback_url is absent:
callback_url=str(request.callback_url) if request.callback_url else None,But here it's typed as str, not str | None. This is a type-safety gap. Also, request_metadata should be dict[str, Any] | None for full type annotation.
Proposed fix
`@dataclass`
class ChainContext:
"""Shared state passed to all blocks. Accumulates responses."""
job_id: UUID
chain_id: UUID
project_id: int
organization_id: int
- callback_url: str
+ callback_url: str | None
total_blocks: int
langfuse_credentials: dict[str, Any] | None = None
- request_metadata: dict | None = None
+ request_metadata: dict[str, Any] | None = NoneAs per coding guidelines: "Always add type hints to all function parameters and return values in Python code".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/chain/chain.py` around lines 31 - 51, ChainContext
currently types callback_url as str but callers can pass None; update the
dataclass to allow Optional types by changing callback_url: str to callback_url:
str | None and tighten request_metadata to dict[str, Any] | None; ensure any
code instantiating ChainContext (e.g., in execute_chain_job) continues to pass
None or str and adjust any downstream uses to handle None safely (refer to the
ChainContext dataclass and the callback_url and request_metadata fields).
| def _teardown(self, result: BlockResult) -> dict: | ||
| """Finalize chain record, send callback, and update job status.""" | ||
|
|
||
| with Session(engine) as session: | ||
| if result.success: | ||
| final = LLMChainResponse( | ||
| response=result.response.response, | ||
| usage=result.usage, | ||
| provider_raw_response=result.response.provider_raw_response, | ||
| ) | ||
| callback_response = APIResponse.success_response( | ||
| data=final, metadata=self._request.request_metadata | ||
| ) | ||
| if self._request.callback_url: | ||
| send_callback( | ||
| callback_url=str(self._request.callback_url), | ||
| data=callback_response.model_dump(), | ||
| ) | ||
| JobCrud(session).update( | ||
| job_id=self._context.job_id, | ||
| job_update=JobUpdate(status=JobStatus.SUCCESS), | ||
| ) | ||
| update_llm_chain_status( | ||
| session=session, | ||
| chain_id=self._context.chain_id, | ||
| status=ChainStatus.COMPLETED, | ||
| output=result.response.response.output.model_dump(), | ||
| total_usage=self._context.aggregated_usage.model_dump(), | ||
| ) | ||
| return callback_response.model_dump() | ||
| else: | ||
| return self._handle_error(result.error) |
There was a problem hiding this comment.
Same send_callback risk in _teardown — callback before DB updates.
On the success path (Lines 140–144), send_callback is called before JobCrud.update and update_llm_chain_status. If send_callback throws, DB state won't be finalized. Apply the same pattern: persist DB state first, then send callback.
Proposed fix — DB updates before callback
def _teardown(self, result: BlockResult) -> dict:
"""Finalize chain record, send callback, and update job status."""
with Session(engine) as session:
if result.success:
final = LLMChainResponse(
response=result.response.response,
usage=result.usage,
provider_raw_response=result.response.provider_raw_response,
)
callback_response = APIResponse.success_response(
data=final, metadata=self._request.request_metadata
)
- if self._request.callback_url:
- send_callback(
- callback_url=str(self._request.callback_url),
- data=callback_response.model_dump(),
- )
JobCrud(session).update(
job_id=self._context.job_id,
job_update=JobUpdate(status=JobStatus.SUCCESS),
)
update_llm_chain_status(
session=session,
chain_id=self._context.chain_id,
status=ChainStatus.COMPLETED,
output=result.response.response.output.model_dump(),
total_usage=self._context.aggregated_usage.model_dump(),
)
+
+ if self._request.callback_url:
+ try:
+ send_callback(
+ callback_url=str(self._request.callback_url),
+ data=callback_response.model_dump(),
+ )
+ except Exception as cb_err:
+ logger.warning(
+ f"[ChainExecutor] Failed to send success callback: {cb_err} | "
+ f"chain_id={self._context.chain_id}"
+ )
+
return callback_response.model_dump()
else:
return self._handle_error(result.error)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _teardown(self, result: BlockResult) -> dict: | |
| """Finalize chain record, send callback, and update job status.""" | |
| with Session(engine) as session: | |
| if result.success: | |
| final = LLMChainResponse( | |
| response=result.response.response, | |
| usage=result.usage, | |
| provider_raw_response=result.response.provider_raw_response, | |
| ) | |
| callback_response = APIResponse.success_response( | |
| data=final, metadata=self._request.request_metadata | |
| ) | |
| if self._request.callback_url: | |
| send_callback( | |
| callback_url=str(self._request.callback_url), | |
| data=callback_response.model_dump(), | |
| ) | |
| JobCrud(session).update( | |
| job_id=self._context.job_id, | |
| job_update=JobUpdate(status=JobStatus.SUCCESS), | |
| ) | |
| update_llm_chain_status( | |
| session=session, | |
| chain_id=self._context.chain_id, | |
| status=ChainStatus.COMPLETED, | |
| output=result.response.response.output.model_dump(), | |
| total_usage=self._context.aggregated_usage.model_dump(), | |
| ) | |
| return callback_response.model_dump() | |
| else: | |
| return self._handle_error(result.error) | |
| def _teardown(self, result: BlockResult) -> dict: | |
| """Finalize chain record, send callback, and update job status.""" | |
| with Session(engine) as session: | |
| if result.success: | |
| final = LLMChainResponse( | |
| response=result.response.response, | |
| usage=result.usage, | |
| provider_raw_response=result.response.provider_raw_response, | |
| ) | |
| callback_response = APIResponse.success_response( | |
| data=final, metadata=self._request.request_metadata | |
| ) | |
| JobCrud(session).update( | |
| job_id=self._context.job_id, | |
| job_update=JobUpdate(status=JobStatus.SUCCESS), | |
| ) | |
| update_llm_chain_status( | |
| session=session, | |
| chain_id=self._context.chain_id, | |
| status=ChainStatus.COMPLETED, | |
| output=result.response.response.output.model_dump(), | |
| total_usage=self._context.aggregated_usage.model_dump(), | |
| ) | |
| if self._request.callback_url: | |
| try: | |
| send_callback( | |
| callback_url=str(self._request.callback_url), | |
| data=callback_response.model_dump(), | |
| ) | |
| except Exception as cb_err: | |
| logger.warning( | |
| f"[_teardown] Failed to send success callback: {cb_err} | " | |
| f"chain_id={self._context.chain_id}" | |
| ) | |
| return callback_response.model_dump() | |
| else: | |
| return self._handle_error(result.error) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/chain/executor.py` around lines 127 - 158, The
success path in _teardown currently calls send_callback before persisting DB
state, so if send_callback raises the job and chain updates may never be
applied; move the send_callback call (and its callback_url check) to after
JobCrud(session).update(...) and update_llm_chain_status(...) so the DB updates
are performed first, then send the callback with callback_response.model_dump(),
and finally return the callback response; keep all existing objects
(LLMChainResponse, APIResponse) and use the same self._request.callback_url
check.
There was a problem hiding this comment.
send_callback is wrapped around try/except block. it will never going to raise any issue, it will return false and subsequent db update will happen. so this is false alarm
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find and examine the send_callback function implementation
ast-grep --pattern $'def send_callback($$$) {
$$$
}'
Length of output: 0
🏁 Script executed:
#!/bin/bash
# Also search with ripgrep for the function definition
rg -A 30 "def send_callback" --type py
Length of output: 3856
@vprashrex You're absolutely right, and I apologize for the false alarm!
I verified the send_callback implementation in backend/app/utils.py, and it indeed wraps all operations in try/except blocks and returns bool (True/False) rather than raising exceptions. The function will always return safely, so the DB updates in _teardown will execute regardless of callback success or failure.
The current implementation is correct as-is. Thank you for the clarification!
| def _handle_error(self, error: str) -> dict: | ||
| callback_response = APIResponse.failure_response( | ||
| error=error or "Unknown error occurred", | ||
| metadata=self._request.request_metadata, | ||
| ) | ||
| logger.error( | ||
| f"[ChainExecutor] Chain execution failed | " | ||
| f"chain_id={self._context.chain_id}, job_id={self._context.job_id}, error={error}" | ||
| ) | ||
|
|
||
| with Session(engine) as session: | ||
| if self._request.callback_url: | ||
| send_callback( | ||
| callback_url=str(self._request.callback_url), | ||
| data=callback_response.model_dump(), | ||
| ) | ||
|
|
||
| update_llm_chain_status( | ||
| session, | ||
| chain_id=self._context.chain_id, | ||
| status=ChainStatus.FAILED, | ||
| output=None, | ||
| total_usage=self._context.aggregated_usage.model_dump(), | ||
| error=error, | ||
| ) | ||
| JobCrud(session).update( | ||
| job_id=self._context.job_id, | ||
| job_update=JobUpdate(status=JobStatus.FAILED, error_message=error), | ||
| ) | ||
| return callback_response.model_dump() | ||
|
|
||
| def _handle_unexpected_error(self, e: Exception) -> dict: | ||
| logger.error( | ||
| f"[ChainExecutor.run] Unexpected error: {e} | " | ||
| f"job_id={self._context.job_id}", | ||
| exc_info=True, | ||
| ) | ||
| return self._handle_error("Unexpected error occurred") |
There was a problem hiding this comment.
_handle_error can throw, leaving chain/job in inconsistent DB state.
If send_callback (Line 172) raises an unhandled exception, the subsequent update_llm_chain_status and JobCrud.update calls (Lines 177–188) are skipped. Since _handle_unexpected_error delegates to _handle_error, there's no fallback — the exception propagates to the Celery runner, and the job remains in PROCESSING / chain in RUNNING forever.
Consider wrapping send_callback in a try/except within _handle_error, or ensuring DB updates happen before the callback.
Proposed fix — update DB before sending callback
def _handle_error(self, error: str) -> dict:
callback_response = APIResponse.failure_response(
error=error or "Unknown error occurred",
metadata=self._request.request_metadata,
)
logger.error(
f"[ChainExecutor] Chain execution failed | "
f"chain_id={self._context.chain_id}, job_id={self._context.job_id}, error={error}"
)
with Session(engine) as session:
- if self._request.callback_url:
- send_callback(
- callback_url=str(self._request.callback_url),
- data=callback_response.model_dump(),
- )
-
update_llm_chain_status(
session,
chain_id=self._context.chain_id,
status=ChainStatus.FAILED,
output=None,
total_usage=self._context.aggregated_usage.model_dump(),
error=error,
)
JobCrud(session).update(
job_id=self._context.job_id,
job_update=JobUpdate(status=JobStatus.FAILED, error_message=error),
)
+
+ if self._request.callback_url:
+ try:
+ send_callback(
+ callback_url=str(self._request.callback_url),
+ data=callback_response.model_dump(),
+ )
+ except Exception as cb_err:
+ logger.warning(
+ f"[ChainExecutor] Failed to send error callback: {cb_err} | "
+ f"chain_id={self._context.chain_id}"
+ )
+
return callback_response.model_dump()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/chain/executor.py` around lines 160 - 197, The
_handle_error method can abort before updating DB if send_callback raises;
modify _handle_error so DB state is always persisted first: call
update_llm_chain_status(...) and JobCrud(session).update(...) before attempting
send_callback, and/or wrap send_callback in its own try/except that logs but
does not re-raise. Ensure you still build callback_response from
APIResponse.failure_response and include references to _handle_error,
send_callback, update_llm_chain_status, and JobCrud.update when making the
change so the chain status is set to ChainStatus.FAILED and the job status to
JobStatus.FAILED even if the callback fails.
There was a problem hiding this comment.
This is not going to affect since the send_callback is wrapped around try/except block, if something goes wrong then it will reutrn false, so db update will execute after it. and this same pattern is followed execute_job from llm call
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find and examine the send_callback function implementation
echo "=== send_callback implementation ==="
rg -A 20 "^def send_callback" --type=py
echo -e "\n=== execute_job function - looking for send_callback usage pattern ==="
rg -B 5 -A 10 "send_callback" backend/app/services/llm/jobs.py
Length of output: 4430
@vprashrex You're absolutely right, and I apologize for the incorrect analysis!
I've verified that send_callback in backend/app/utils.py has internal try/except handling and returns a boolean (True on success, False on failure) rather than raising exceptions. The function's docstring explicitly states: "Returns: bool: True if callback succeeded, False otherwise".
The pattern in _handle_error is indeed consistent with execute_job in jobs.py, where send_callback is called followed by DB updates. Since send_callback won't throw, the DB state will always be updated correctly.
Thank you for the clarification!
🐰
| callback_response = APIResponse.failure_response( | ||
| error="Unexpected error occurred", | ||
| metadata=request.request_metadata, | ||
| ) | ||
| return handle_job_error(job_uuid, request.callback_url, callback_response) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Pydantic v2 HttpUrl str subclass type compatibility
💡 Result:
In Pydantic v2, HttpUrl is not a str subclass anymore (unlike v1). It’s a constrained URL type built on Pydantic’s core Url object, so things like isinstance(model.url, str) will be False, and you may need explicit conversion when calling APIs that expect str. This is an intentional breaking change noted in the migration guide. [1][2]
What to do for “expects str” compatibility
- Convert explicitly:
str(model.url)(orf"{model.url}"). Pydantic’s migration guide specifically recommends this approach. [1] - If you need a URL type that is a
str(forisinstance(..., str)and “stringy” library interop), a common workaround is usingpydantic-string-url, which provides drop-in URL types based onstr. [3][4]
Typing notes (why it can look odd)
Historically in v2, HttpUrl was defined via typing.Annotated over Url + UrlConstraints (so it wasn’t a real subclass at runtime), e.g. HttpUrl = Annotated[Url, UrlConstraints(...)]. [5] Pydantic later improved URL types to be concrete subclasses of the base URL type (to enable better isinstance checks between URL types), but they still do not inherit from str. [6]
Sources:
[1] Pydantic v2 Migration Guide (URL types no longer inherit from str) [1]
[2] Pydantic “URLs” docs (shows URL values are Url(...) objects and stringify via str()) [2]
[3] pydantic-string-url PyPI description (v2 URLs not str, provides str-based replacements) [3]
[4] Pydantic GitHub discussion #6395 (practical interop issue + pydantic-string-url suggestion) [4]
[5] Pydantic network types API docs (shows HttpUrl defined via Annotated[..., UrlConstraints(...)]) [5]
[6] Pydantic v2.10 release notes (URL types moved from Annotated approach to concrete URL subclasses) [6]
🏁 Script executed:
# First, find and examine the jobs.py file to see the actual code
find . -name "jobs.py" -path "*/backend/app/services/*" -type fRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 149
🏁 Script executed:
# Check the handle_job_error function signature
rg -A 5 "def handle_job_error" --type pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 459
🏁 Script executed:
# Look at the code around lines 687-691 and 500 in jobs.py
fd jobs.py -x wc -l {} \; 2>/dev/null | head -5Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 317
🏁 Script executed:
# Check the actual code at lines 687-691 and around line 500
sed -n '685,691p' backend/app/services/llm/jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 338
🏁 Script executed:
# Check line 500 area
sed -n '498,505p' backend/app/services/llm/jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 361
🏁 Script executed:
# Check the request object type to confirm callback_url is HttpUrl | None
rg -B 10 "def handle_job_error" backend/app/services/llm/jobs.py | head -20Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 376
🏁 Script executed:
# Look for the request type definition to verify callback_url field type
rg "callback_url" backend/app/services/llm/jobs.py -B 2 -A 2 | head -30Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 975
🏁 Script executed:
# Find the request type definition to confirm callback_url is HttpUrl | None
rg "class.*Request" backend/app/services/llm/jobs.py -A 30 | head -60Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 57
🏁 Script executed:
# Look for imports to understand what HttpUrl is being used
rg "from pydantic" backend/app/services/llm/jobs.py -A 2Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 57
🏁 Script executed:
# Check send_callback to understand what it expects
rg -A 10 "def send_callback" backend/app/services/llm/jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 57
🏁 Script executed:
# Count all occurrences of handle_job_error being called with request.callback_url
rg "handle_job_error.*request\.callback_url" backend/app/services/llm/jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 497
🏁 Script executed:
# Search for request classes more broadly
rg "callback_url.*HttpUrl\|HttpUrl.*callback_url" backend/app/services/llm/ -A 2 -B 2Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 57
🏁 Script executed:
# Look at the entire file structure to find the request definition
sed -n '1,100p' backend/app/services/llm/jobs.py | grep -E "^import|^from|class.*Request"Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 1186
🏁 Script executed:
# Find where request is defined in the function signature at line 687
sed -n '670,691p' backend/app/services/llm/jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 884
🏁 Script executed:
# Search for HttpUrl in the entire backend
rg "HttpUrl" backend/ --type py | head -20Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 580
🏁 Script executed:
# Count all occurrences of handle_job_error being called
rg "return handle_job_error.*request\.callback_url" backend/app/services/llm/jobs.py -nRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 517
🏁 Script executed:
# Check if send_callback expects str
rg -A 5 "def send_callback" backend/app/utils.py backend/app/services/Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 404
🏁 Script executed:
# Verify what function contains line 500
sed -n '480,520p' backend/app/services/llm/jobs.py | head -50Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 1648
🏁 Script executed:
# Check all 5 occurrences in context
rg -n -B 8 "return handle_job_error.*request\.callback_url" backend/app/services/llm/jobs.py | grep -E "^[0-9]+-\s*(def |return handle_job_error)"Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 57
Convert request.callback_url (Pydantic v2 HttpUrl) to str before passing to handle_job_error.
In Pydantic v2, HttpUrl is not a str subclass and requires explicit conversion. The function handle_job_error expects callback_url: str | None, and internally calls send_callback(callback_url: str, ...). Passing HttpUrl directly will cause type errors.
This pattern appears 5 times in the file (lines 517, 550, 575, 586, 691). Convert all occurrences using str(request.callback_url) if request.callback_url else None.
Proposed fix example (line 691)
- return handle_job_error(job_uuid, request.callback_url, callback_response)
+ return handle_job_error(
+ job_uuid,
+ str(request.callback_url) if request.callback_url else None,
+ callback_response,
+ )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/jobs.py` around lines 687 - 691, The callback_url
being passed to handle_job_error is a Pydantic v2 HttpUrl (request.callback_url)
and must be converted to a native str before use; update all calls that pass
request.callback_url (the ones that ultimately call handle_job_error ->
send_callback) to pass str(request.callback_url) if request.callback_url else
None so the signature handle_job_error(callback_url: str | None, ...) and
send_callback(callback_url: str, ...) receive proper types; apply this change to
every occurrence in this module where request.callback_url is forwarded (e.g.,
the calls around job error handling that reference handle_job_error and
send_callback).
There was a problem hiding this comment.
job_uuid is already a UUID which is fine, request.callback_url is HttpUrl | None but handle_job_error expects str | None. at runtime this would not going to create issues since send_callback internally checks the str(callback_url) and if callback exists as well.
There was a problem hiding this comment.
@vprashrex, you're absolutely right! Since send_callback internally handles the conversion with str(callback_url) and performs existence checks, this won't cause runtime issues.
While the code works correctly at runtime, the type mismatch might still trigger warnings from static type checkers (mypy, pyright, etc.). If you're not seeing any issues from your type checker or are suppressing them, then the current implementation is functionally safe.
If you'd like to satisfy type checkers without changing the runtime behavior, you could add the explicit conversion when calling handle_job_error, but since the internal implementation already handles it, it's more of a type hygiene consideration than a functional issue.
Thanks for the clarification! 🐰
✏️ Learnings added
Learnt from: vprashrex
Repo: ProjectTech4DevAI/kaapi-backend PR: 616
File: backend/app/services/llm/jobs.py:734-738
Timestamp: 2026-03-02T05:52:48.150Z
Learning: In backend/app/services/llm/jobs.py, passing HttpUrl (Pydantic v2) directly to handle_job_error (which expects str | None) is acceptable because send_callback internally converts it with str(callback_url) and checks existence, so no runtime issues occur.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
backend/app/services/llm/jobs.py (2)
589-595:task_idis unused inexecute_chain_jobandtask_instancelacks a type annotation.Unlike
execute_job(which logstask_idat startup and in the error handler),execute_chain_jobnever usestask_id, making it dead noise. Additionally,task_instancehas no type annotation in either Celery task function, violating the project type-hint convention.♻️ Proposed fix
def execute_chain_job( request_data: dict, project_id: int, organization_id: int, job_id: str, task_id: str, - task_instance, + task_instance: Any, # celery.Task — available via bind=True ) -> dict:Add
from typing import Anyif not already imported, and includetask_idin the startup log message for consistency:logger.info( f"[execute_chain_job] Starting chain execution | " - f"job_id={job_uuid}, total_blocks={len(request.blocks)}" + f"job_id={job_uuid}, task_id={task_id}, total_blocks={len(request.blocks)}" )As per coding guidelines: "Always add type hints to all function parameters and return values in Python code."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 589 - 595, The function execute_chain_job currently has an unused task_id parameter and an untyped task_instance; update it to include a proper type hint (e.g., task_instance: Any) and add the matching import (from typing import Any) if missing, and use task_id in the startup log (and mirror execute_job’s error handling log) so task_id is logged on start and on failure; ensure the function signature and return type follow the project's type-hint convention and that the startup/error log messages reference task_id.
81-121: Consider extracting a shared_create_and_schedule_jobhelper to eliminate duplication withstart_job.
start_chain_jobis structurally identical tostart_job, differing only injob_type,function_path, and error string. This is a clear DRY violation.♻️ Proposed refactor
+def _create_and_schedule_job( + db: Session, + request: LLMCallRequest | LLMChainRequest, + project_id: int, + organization_id: int, + *, + job_type: JobType, + function_path: str, + error_detail: str, +) -> UUID: + trace_id = correlation_id.get() or "N/A" + fn_label = function_path.rsplit(".", 1)[-1] + job_crud = JobCrud(session=db) + job = job_crud.create(job_type=job_type, trace_id=trace_id) + db.flush() + db.commit() + logger.info(f"[{fn_label}] Created job | job_id={job.id}, status={job.status}, project_id={project_id}") + try: + task_id = start_high_priority_job( + function_path=function_path, + project_id=project_id, + job_id=str(job.id), + trace_id=trace_id, + request_data=request.model_dump(mode="json"), + organization_id=organization_id, + ) + except Exception as e: + logger.error(f"[{fn_label}] Error starting Celery task: {e} | job_id={job.id}", exc_info=True) + job_crud.update(job_id=job.id, job_update=JobUpdate(status=JobStatus.FAILED, error_message=str(e))) + raise HTTPException(status_code=500, detail=error_detail) + logger.info(f"[{fn_label}] Job scheduled | job_id={job.id}, project_id={project_id}, task_id={task_id}") + return job.id def start_job(db, request, project_id, organization_id) -> UUID: - # ... full body ... + return _create_and_schedule_job(db, request, project_id, organization_id, + job_type=JobType.LLM_API, + function_path="app.services.llm.jobs.execute_job", + error_detail="Internal server error while executing LLM call", + ) def start_chain_job(db, request, project_id, organization_id) -> UUID: - # ... full body ... + return _create_and_schedule_job(db, request, project_id, organization_id, + job_type=JobType.LLM_CHAIN, + function_path="app.services.llm.jobs.execute_chain_job", + error_detail="Internal server error while executing LLM chain job", + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 81 - 121, Extract a shared helper (e.g., _create_and_schedule_job) to remove duplication between start_chain_job and start_job: the helper should accept parameters job_type (JobType.LLM_CHAIN / other), function_path (string), project_id, organization_id, request_data (request.model_dump(mode="json") or None), and an error message/template; it should perform JobCrud(session=db).create(...), db.flush(), db.commit(), call start_high_priority_job with the provided function_path and args, catch exceptions to log with logger.error (include exception and job.id), update the job via job_crud.update(job_id=job.id, job_update=JobUpdate(status=JobStatus.FAILED, error_message=str(e))) and re-raise an HTTPException with the provided error detail, and finally return job.id (or task_id if needed). Replace start_chain_job and start_job to call this helper with their respective JobType, function_path, and error text, keeping unique symbols JobCrud, JobUpdate, JobStatus, start_high_priority_job, and logger for locating code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/api/docs/llm/llm_chain.md`:
- Line 9: The docs line incorrectly states `input` is a string; update the
documentation for QueryParams.input to reflect that it is a discriminated union
(TextInput | AudioInput) and can accept either a text string or an audio
payload; mention both variants (TextInput and AudioInput) and their
distinguishing fields (e.g., type/discriminator and text vs. audio
data/uri/format) so consumers know audio inputs are supported and how to
construct them when using LLMChain or related functions that reference
QueryParams.input.
- Line 48: The docs line "If not provided, response is only accessible through
job status" is inaccurate because the LLM chain creation endpoint in
llm_chain.py does not return a job_id; either remove or reword that sentence in
llm_chain.md to reflect current behavior (e.g., state that no job identifier is
returned and callers cannot poll for status) or alternatively update the
endpoint implementation in llm_chain.py to include and return a job_id
(generate/attach the Job id from the job creation flow and include it in the
JSON response) so the documentation is correct.
In `@backend/app/services/llm/jobs.py`:
- Around line 683-688: The inner except in execute_chain_job catches exceptions
without binding them, but the logger.error message references the outer
exception variable e; change the inner except to capture the exception (e.g.
except Exception as update_err:) and update the logger.error call in the
status-update block to log update_err (and keep exc_info=True) so the message
correctly reports the failure to update chain status for chain_uuid and uses the
correct exception object; reference the execute_chain_job function, the
chain_uuid variable, and the logger.error call to locate the change.
- Around line 414-416: The cleanup condition is incorrect because resolved_input
(str) is being compared to query.input (a TextInput|AudioInput model), so it
always yields True; update the resolver to track whether a temp file was created
(e.g., set a boolean flag like resolved_from_temp when you call the resolver
that writes a temp file) and then change the finally block to only call
cleanup_temp_file(resolved_input) when resolved_from_temp is True and
resolved_input is truthy; alternatively, if you prefer not to add a flag,
compare resolved_input against the model's text field (like getattr(query.input,
"text", None)) or check isinstance(query.input, AudioInput) before deciding to
clean up. Ensure you update the resolver location where resolved_input is
produced and reference resolved_input, query.input, and cleanup_temp_file in the
fix.
- Around line 614-617: execute_chain_job currently never updates the job status
to JobStatus.PROCESSING like execute_job does; update execute_chain_job to mark
the job as PROCESSING as soon as the Celery task starts (before creating the
chain or doing heavy work). Locate the execute_chain_job function and add the
same status transition logic used in execute_job (set status to
JobStatus.PROCESSING and persist the change via the session/ORM call that your
project uses) prior to calling create_llm_chain or invoking downstream
processing so monitoring/alerting sees the in-progress state.
---
Duplicate comments:
In `@backend/app/api/routes/llm_chain.py`:
- Around line 48-62: start_chain_job returns a job UUID that is currently
discarded; capture its return value and include it in the API response so
callers can poll when no callback_url is provided. Specifically, assign job_id =
start_chain_job(db=_session, request=request, project_id=project_id,
organization_id=organization_id) and then return
APIResponse.success_response(data=Message(message=...), job_id=job_id) or embed
the job_id into the Message payload; also make the success message conditional:
if request.callback_url is set keep the callback wording, otherwise return a
polling message including the job_id. Ensure you reference start_chain_job,
request.callback_url, APIResponse.success_response and Message when making the
change.
- Line 20: Both route handler functions are missing return type annotations;
update llm_callback_notification(body: APIResponse[LLMChainResponse]) and the
other route handler in this module to include precise return types (e.g., ->
Response, -> JSONResponse, or -> dict depending on what they actually return).
Ensure parameters already typed remain, add the appropriate importing type if
needed (from fastapi.responses or typing), and keep annotations consistent with
the rest of the codebase.
- Around line 16-17: The route decorator is using an invalid OpenAPI runtime
expression "{$callback_url}"; update the string passed to
llm_callback_router.post to a valid form — either change it to a standard path
parameter like "{callback_url}" if you intend a URL path segment, or replace it
with a proper OpenAPI runtime expression such as "{$request.body#/callback_url}"
or "{$request.header.callback_url}" depending on where the callback URL comes
from; make the change in the llm_callback_router.post decorator so the routing
and generated OpenAPI spec are valid.
In `@backend/app/services/llm/jobs.py`:
- Line 694: The call to handle_job_error passes request.callback_url (a Pydantic
v2 HttpUrl) directly but handle_job_error expects str | None; convert the
HttpUrl to str the same way used earlier: pass str(request.callback_url) if
request.callback_url else None when calling handle_job_error (reference the
handle_job_error call and the existing conversion pattern used previously near
the other callback usage).
---
Nitpick comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 589-595: The function execute_chain_job currently has an unused
task_id parameter and an untyped task_instance; update it to include a proper
type hint (e.g., task_instance: Any) and add the matching import (from typing
import Any) if missing, and use task_id in the startup log (and mirror
execute_job’s error handling log) so task_id is logged on start and on failure;
ensure the function signature and return type follow the project's type-hint
convention and that the startup/error log messages reference task_id.
- Around line 81-121: Extract a shared helper (e.g., _create_and_schedule_job)
to remove duplication between start_chain_job and start_job: the helper should
accept parameters job_type (JobType.LLM_CHAIN / other), function_path (string),
project_id, organization_id, request_data (request.model_dump(mode="json") or
None), and an error message/template; it should perform
JobCrud(session=db).create(...), db.flush(), db.commit(), call
start_high_priority_job with the provided function_path and args, catch
exceptions to log with logger.error (include exception and job.id), update the
job via job_crud.update(job_id=job.id,
job_update=JobUpdate(status=JobStatus.FAILED, error_message=str(e))) and
re-raise an HTTPException with the provided error detail, and finally return
job.id (or task_id if needed). Replace start_chain_job and start_job to call
this helper with their respective JobType, function_path, and error text,
keeping unique symbols JobCrud, JobUpdate, JobStatus, start_high_priority_job,
and logger for locating code.
| ### Key Parameters | ||
|
|
||
| **`query`** (required) - Initial query input for the first block in the chain: | ||
| - `input` (required, string, min 1 char): User question/prompt/query |
There was a problem hiding this comment.
input documented as string only — audio input support is omitted.
QueryParams.input is a discriminated union of TextInput and AudioInput. Describing it as string misleads consumers who may want to use audio inputs in chains.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/api/docs/llm/llm_chain.md` at line 9, The docs line incorrectly
states `input` is a string; update the documentation for QueryParams.input to
reflect that it is a discriminated union (TextInput | AudioInput) and can accept
either a text string or an audio payload; mention both variants (TextInput and
AudioInput) and their distinguishing fields (e.g., type/discriminator and text
vs. audio data/uri/format) so consumers know audio inputs are supported and how
to construct them when using LLMChain or related functions that reference
QueryParams.input.
| **`callback_url`** (optional, HTTPS URL): | ||
| - Webhook endpoint to receive the final response and intermediate callbacks | ||
| - Must be a valid HTTPS URL | ||
| - If not provided, response is only accessible through job status |
There was a problem hiding this comment.
"Accessible through job status" is misleading — the endpoint currently does not return a job_id.
Without a job_id in the response, callers have no identifier to query for job status when callback_url is omitted. This documentation claim is inaccurate until the route returns a job ID (see the comment in llm_chain.py).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/api/docs/llm/llm_chain.md` at line 48, The docs line "If not
provided, response is only accessible through job status" is inaccurate because
the LLM chain creation endpoint in llm_chain.py does not return a job_id; either
remove or reword that sentence in llm_chain.md to reflect current behavior
(e.g., state that no job identifier is returned and callers cannot poll for
status) or alternatively update the endpoint implementation in llm_chain.py to
include and return a job_id (generate/attach the Job id from the job creation
flow and include it in the JSON response) so the documentation is correct.
| try: | ||
| with Session(engine) as session: | ||
| chain_record = create_llm_chain( | ||
| session, |
There was a problem hiding this comment.
execute_chain_job never sets the job to JobStatus.PROCESSING — inconsistency with execute_job.
execute_job immediately marks the job as PROCESSING (lines 483–486) when the Celery task starts. execute_chain_job skips this step entirely, leaving the job in its initial state until completion or failure. This breaks monitoring/alerting that relies on status transitions and makes it impossible to distinguish stuck-queued jobs from in-progress ones.
🐛 Proposed fix
try:
with Session(engine) as session:
+ JobCrud(session=session).update(
+ job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING)
+ )
chain_record = create_llm_chain(📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| with Session(engine) as session: | |
| chain_record = create_llm_chain( | |
| session, | |
| try: | |
| with Session(engine) as session: | |
| JobCrud(session=session).update( | |
| job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) | |
| ) | |
| chain_record = create_llm_chain( | |
| session, |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/jobs.py` around lines 614 - 617, execute_chain_job
currently never updates the job status to JobStatus.PROCESSING like execute_job
does; update execute_chain_job to mark the job as PROCESSING as soon as the
Celery task starts (before creating the chain or doing heavy work). Locate the
execute_chain_job function and add the same status transition logic used in
execute_job (set status to JobStatus.PROCESSING and persist the change via the
session/ORM call that your project uses) prior to calling create_llm_chain or
invoking downstream processing so monitoring/alerting sees the in-progress
state.
There was a problem hiding this comment.
ChainExecutor._setup() already sets the job status processing, It's called at the start of executor.run() before any chain execution happens
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Read the ChainExecutor file to verify the user's claim
cat backend/app/services/llm/chain/executor.py
Length of output: 4572
@vprashrex, you're absolutely right! I apologize for the incorrect review comment.
I've verified the code and confirmed that ChainExecutor._setup() does indeed set the job status to PROCESSING (lines 51-54 in backend/app/services/llm/chain/executor.py) before any chain execution happens. The run() method calls _setup() first (line 40), so the status transition occurs exactly as it should.
The implementation is correct and actually cleaner than what I suggested—the lifecycle management is properly encapsulated within the ChainExecutor class. Thank you for the clarification!
| except Exception: | ||
| logger.error( | ||
| f"[execute_chain_job] Failed to update chain status: {e} | " | ||
| f"chain_id={chain_uuid}", | ||
| exc_info=True, | ||
| ) |
There was a problem hiding this comment.
Inner except block logs the outer exception e instead of the status-update exception.
The bare except Exception: at line 683 does not bind the caught exception, so {e} in the log message still refers to the outer chain-execution error — not the failure to update chain status. This is misleading during debugging.
🐛 Proposed fix
- except Exception:
+ except Exception as update_error:
logger.error(
- f"[execute_chain_job] Failed to update chain status: {e} | "
- f"chain_id={chain_uuid}",
+ f"[execute_chain_job] Failed to update chain status: {update_error} | "
+ f"chain_id={chain_uuid}",
exc_info=True,
)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/jobs.py` around lines 683 - 688, The inner except in
execute_chain_job catches exceptions without binding them, but the logger.error
message references the outer exception variable e; change the inner except to
capture the exception (e.g. except Exception as update_err:) and update the
logger.error call in the status-update block to log update_err (and keep
exc_info=True) so the message correctly reports the failure to update chain
status for chain_uuid and uses the correct exception object; reference the
execute_chain_job function, the chain_uuid variable, and the logger.error call
to locate the change.
…t and eliminate code duplication
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/app/services/llm/chain/executor.py (1)
63-91: Unnecessary session acquired on the error path.
with Session(engine) as session:at Line 63 wraps both branches. Whenresult.successisFalse, the session object is created but never used, then_handle_error(Line 91) opens its own session at Line 103. Consider dispatching the error case before entering the session context.♻️ Proposed refactor
def _teardown(self, result: BlockResult) -> dict: """Finalize chain record, send callback, and update job status.""" + if not result.success: + return self._handle_error(result.error) + with Session(engine) as session: - if result.success: - final = LLMChainResponse( - ... - ) - ... - return callback_response.model_dump() - else: - return self._handle_error(result.error) + final = LLMChainResponse(...) + ... + return callback_response.model_dump()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/chain/executor.py` around lines 63 - 91, The session is opened unconditionally but only used on the success path; change executor logic to check result.success before entering the Session(engine) context so the error path returns early via self._handle_error(result.error) without creating a session. Concretely, move or add the early guard (if not result.success: return self._handle_error(result.error)) before the with Session(engine) as session: block, and keep the existing success logic that constructs LLMChainResponse, calls APIResponse.success_response, send_callback, JobCrud(session).update, and update_llm_chain_status inside the session context.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 480-487: The functions execute_job and execute_chain_job currently
omit type annotations for the task_instance (and task_id where noted) which
triggers static-analysis and ARG001 unused-parameter warnings; edit the
signatures for execute_job and execute_chain_job to annotate task_instance with
an appropriate type (e.g., celery.app.task.Task or typing.Any) and add a return
type hint, and if the parameter must remain unused keep the annotation but add a
trailing comment "# noqa: ARG001" to suppress the unused-argument warning;
update only the function signatures for locateable symbols execute_job and
execute_chain_job and ensure imports (Task or Any) are added as needed.
---
Duplicate comments:
In `@backend/app/services/llm/chain/executor.py`:
- Around line 73-88: The callback is sent before persisting success state; move
the DB updates (calls to JobCrud(session).update and update_llm_chain_status) to
execute and commit first, then call send_callback (only if
self._request.callback_url) afterwards; wrap send_callback in its own try/except
so any exception from send_callback (or network issues) is caught and logged but
does not prevent the job/chain from remaining marked SUCCESS/COMPLETED
(referenced symbols: send_callback, self._request.callback_url,
callback_response, JobCrud.update, update_llm_chain_status).
- Around line 103-121: The send_callback call in _handle_error can raise and
prevent the DB updates, leaving ChainStatus and JobStatus stuck; move the
update_llm_chain_status(session, chain_id=..., status=ChainStatus.FAILED, ...)
and JobCrud(session).update(job_id=self._context.job_id,
job_update=JobUpdate(...)) to execute before calling send_callback, and then
call send_callback afterwards inside its own try/except so callback failures are
logged/ignored without preventing the DB writes; locate the relevant block using
send_callback, update_llm_chain_status, and JobCrud.update to reorder and add
the error handling.
In `@backend/app/services/llm/jobs.py`:
- Around line 529-531: The callback_url from the request is a Pydantic Url
object (not a str) so update every call site that passes request.callback_url to
send_callback and handle_job_error to pass a string instead; specifically change
the send_callback call in execute_job (callback_url=request.callback_url) and
all handle_job_error calls in execute_job and execute_chain_job (currently
callback_url=request.callback_url) to use callback_url=str(request.callback_url)
so the functions receive a plain str.
- Around line 658-663: The inner bare "except Exception:" in execute_chain_job
incorrectly references the outer exception variable `e`; change the inner
handler to capture its own exception (e.g., "except Exception as update_err:")
and update the logger.error call in that block to include the new variable
(e.g., `update_err`) in the message and keep `exc_info=True` so the actual
status-update failure is logged; locate this in jobs.py around the logger.error
inside execute_chain_job.
- Around line 424-426: The current cleanup condition compares resolved_input
(str) to query.input (TextInput | AudioInput) which is always True, so update
the logic to only attempt temp-file cleanup for inputs that can produce a temp
file (i.e., AudioInput). Replace the condition using the cross-type comparison
with an explicit type check against AudioInput (use the query.input model class
name) and only call cleanup_temp_file(resolved_input) when resolved_input is
truthy and query.input is an AudioInput; remove the resolved_input !=
query.input comparison. This ensures cleanup_temp_file is only invoked for
audio-derived temp files and leaves TextInput untouched.
---
Nitpick comments:
In `@backend/app/services/llm/chain/executor.py`:
- Around line 63-91: The session is opened unconditionally but only used on the
success path; change executor logic to check result.success before entering the
Session(engine) context so the error path returns early via
self._handle_error(result.error) without creating a session. Concretely, move or
add the early guard (if not result.success: return
self._handle_error(result.error)) before the with Session(engine) as session:
block, and keep the existing success logic that constructs LLMChainResponse,
calls APIResponse.success_response, send_callback, JobCrud(session).update, and
update_llm_chain_status inside the session context.
| def execute_job( | ||
| request_data: dict, | ||
| project_id: int, | ||
| organization_id: int, | ||
| job_id: str, | ||
| task_id: str, | ||
| task_instance, | ||
| ) -> dict: |
There was a problem hiding this comment.
task_instance (and task_id in execute_chain_job) missing type annotations — coding guideline violation.
Both execute_job (Line 486) and execute_chain_job (Line 569) leave task_instance without a type hint. The static analysis tools also flag these as unused parameters; if the Celery task signature requires them, add a type annotation (e.g., Task from celery) or Any, and suppress with # noqa: ARG001 if intentionally unused.
🛠️ Proposed fix
def execute_job(
request_data: dict,
project_id: int,
organization_id: int,
job_id: str,
task_id: str,
- task_instance,
+ task_instance: Any, # required by Celery task signature
) -> dict: def execute_chain_job(
request_data: dict,
project_id: int,
organization_id: int,
job_id: str,
task_id: str,
- task_instance,
+ task_instance: Any, # required by Celery task signature
) -> dict:As per coding guidelines, "Always add type hints to all function parameters and return values in Python code."
Also applies to: 563-570
🧰 Tools
🪛 Ruff (0.15.1)
[warning] 486-486: Unused function argument: task_instance
(ARG001)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/jobs.py` around lines 480 - 487, The functions
execute_job and execute_chain_job currently omit type annotations for the
task_instance (and task_id where noted) which triggers static-analysis and
ARG001 unused-parameter warnings; edit the signatures for execute_job and
execute_chain_job to annotate task_instance with an appropriate type (e.g.,
celery.app.task.Task or typing.Any) and add a return type hint, and if the
parameter must remain unused keep the annotation but add a trailing comment "#
noqa: ARG001" to suppress the unused-argument warning; update only the function
signatures for locateable symbols execute_job and execute_chain_job and ensure
imports (Task or Any) are added as needed.
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/models/llm/request.py (1)
1-17:⚠️ Potential issue | 🟡 MinorRemove duplicate imports.
Lines 1-9 and 10-17 contain overlapping imports from the same modules (
enum,typing,uuid,sqlmodel,pydantic,datetime,app.core.util). This duplication will cause import conflicts or shadowing.🐛 Proposed fix — consolidate imports
-from enum import Enum -from typing import Annotated, Any, Literal, Union - -from uuid import UUID, uuid4 -from sqlmodel import Field, SQLModel -from pydantic import Discriminator, model_validator, HttpUrl -from datetime import datetime -from app.core.util import now - -import sqlalchemy as sa -from typing import Annotated, Any, Literal, Union -from uuid import UUID, uuid4 -from pydantic import model_validator, HttpUrl -from datetime import datetime -from sqlalchemy.dialects.postgresql import JSONB -from sqlmodel import Field, SQLModel, Index, text -from app.core.util import now +from enum import StrEnum +from typing import Annotated, Any, Literal, Union +from uuid import UUID, uuid4 +from datetime import datetime + +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB +from sqlmodel import Field, SQLModel, Index, text +from pydantic import Discriminator, model_validator, HttpUrl + +from app.core.util import now🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/models/llm/request.py` around lines 1 - 17, The top of request.py contains duplicated import blocks (e.g., repeated imports of Enum, Annotated/Any/Literal/Union, UUID/uuid4, SQLModel/Field/Index/text, model_validator/HttpUrl, datetime, now, and JSONB) causing shadowing; consolidate into a single import section by removing the redundant second block and keeping one complete set that includes JSONB, Index, text, and any symbols used by classes like the request model or validators (e.g., model_validator, HttpUrl, UUID, uuid4, Field, SQLModel, now).backend/app/services/llm/jobs.py (1)
1-1:⚠️ Potential issue | 🟡 MinorFix formatting with
black.The CI pipeline failed because the file needs reformatting. Run
black backend/app/services/llm/jobs.pyto fix.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` at line 1, The file contains formatting issues (see the top-level "import logging" line) causing CI to fail; run the project's formatter (black) on the module to reformat it (e.g., run black for the file), commit the reformatted file so the import and file whitespace match the project's style.
♻️ Duplicate comments (3)
backend/app/services/llm/jobs.py (3)
739-743:⚠️ Potential issue | 🟡 MinorConvert
request.callback_url(HttpUrl) tostrbefore passing tohandle_job_error.In Pydantic v2,
HttpUrlis not astrsubclass. The functionhandle_job_errorexpectscallback_url: str | None.🐛 Proposed fix
- return handle_job_error(job_uuid, request.callback_url, callback_response) + return handle_job_error( + job_uuid, + str(request.callback_url) if request.callback_url else None, + callback_response, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 739 - 743, The callback_url being passed to handle_job_error is a Pydantic HttpUrl (request.callback_url) which is not a str in Pydantic v2; update the call that returns on unexpected error to pass str(request.callback_url) (or None if callback is None) so handle_job_error receives a plain str | None—i.e., modify the return that currently calls handle_job_error(job_uuid, request.callback_url, callback_response) to convert request.callback_url to a string before passing it, keeping the rest (job_uuid and callback_response from APIResponse.failure_response) unchanged.
663-674:⚠️ Potential issue | 🟠 Major
execute_chain_jobnever sets the job toJobStatus.PROCESSING.Unlike
execute_job(lines 575-579) which immediately marks the job asPROCESSING,execute_chain_jobskips this step. This breaks monitoring that relies on status transitions.🐛 Proposed fix
try: with Session(engine) as session: + JobCrud(session=session).update( + job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) + ) + chain_record = create_llm_chain(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 663 - 674, execute_chain_job never marks the job as JobStatus.PROCESSING which breaks status monitoring; inside execute_chain_job (after opening Session(engine) and before creating/starting the chain) update the job record for job_uuid to JobStatus.PROCESSING using the same approach as execute_job (i.e., within the Session call persist the status change and commit/flush so the DB reflects the transition) — ensure you reference the job by job_uuid and use the JobStatus.PROCESSING enum and the same session instance used around create_llm_chain so the status update is durable before further processing.
732-737:⚠️ Potential issue | 🟡 MinorInner
exceptblock logs the outer exceptioneinstead of the status-update exception.The bare
except Exception:at line 732 doesn't bind the caught exception, so{e}in the log message still refers to the outer chain-execution error.🐛 Proposed fix
- except Exception: + except Exception as update_error: logger.error( - f"[execute_chain_job] Failed to update chain status: {e} | " + f"[execute_chain_job] Failed to update chain status: {update_error} | " f"chain_id={chain_uuid}", exc_info=True, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 732 - 737, In execute_chain_job's inner except that attempts to log a failure updating chain status, change the bare "except Exception:" to bind the exception (e.g., "except Exception as update_err:") and use that bound name in the logger.error f-string (replace "{e}" with "{update_err}") so the log shows the actual status-update exception; keep exc_info=True to preserve traceback and retain chain_uuid context in the message.
🧹 Nitpick comments (2)
backend/app/models/llm/request.py (1)
580-586: UseStrEnuminstead of inheriting from bothstrandEnum.Python 3.11+ provides
enum.StrEnumwhich is the idiomatic approach for string enums. This aligns with the codebase's requirement for Python 3.11+.♻️ Proposed fix
-class ChainStatus(str, Enum): +class ChainStatus(StrEnum): """Status of an LLM chain execution.""" PENDING = "pending" RUNNING = "running" FAILED = "failed" COMPLETED = "completed"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/models/llm/request.py` around lines 580 - 586, Replace the current string-enum pattern for ChainStatus (class ChainStatus(str, Enum)) with the Python 3.11+ idiomatic StrEnum: import StrEnum from enum and declare class ChainStatus(StrEnum) with the same members (PENDING, RUNNING, FAILED, COMPLETED); this removes the manual str/Enum inheritance while preserving values and behavior.backend/app/crud/llm.py (1)
49-49: Update docstring to document the newchain_idparameter.The function signature now includes
chain_id: UUID | None = None, but the Args section in the docstring doesn't mention it.📝 Proposed fix
""" Create a new LLM call record in the database. Args: session: Database session request: The LLM call request containing query and config job_id: Reference to the parent job + chain_id: Reference to the parent chain (None for standalone calls) project_id: Project this LLM call belongs to organization_id: Organization this LLM call belongs to resolved_config: The resolved configuration blob (either from stored config or ad-hoc) Returns: LlmCall: The created LLM call record """Also applies to: 55-67
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/crud/llm.py` at line 49, The docstring for the function whose signature includes "chain_id: UUID | None = None" is missing documentation for the new chain_id parameter; update the Args section of that function's docstring (and the same section for the similar function block around lines 55-67) to add an entry for "chain_id" describing its type (UUID | None), purpose (optional identifier to associate the operation with a chain), and default behavior when None. Reference the function signature "chain_id: UUID | None = None" and ensure the docstring style matches the existing Args formatting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/models/llm/request.py`:
- Around line 650-657: The Field declaration for block_sequences sets a Python
default of an empty list via default_factory=list while the underlying sa_column
JSONB is nullable=True, creating a mismatch; pick one approach and make changes
accordingly: either set sa_column nullable=False so the DB always stores an
array when block_sequences (Field) defaults to [], or change the Field to
default=None (remove default_factory) and keep nullable=True so Python-side type
matches DB NULL semantics; update the Field/type annotation for block_sequences
and the sa.Column nullable flag consistently (referencing block_sequences, Field
and the sa.Column(JSONB, nullable=...) entry).
In `@backend/app/services/llm/jobs.py`:
- Around line 486-488: The call to resolved_input_context is passing a
QueryParams object (query) but the context manager
resolved_input_context(query_input: TextInput | AudioInput) expects a TextInput
or AudioInput; update the call site in the job handler so you pass the actual
TextInput/AudioInput payload (e.g., query.input or a converted
TextInput/AudioInput constructed from QueryParams) instead of the QueryParams
wrapper, and ensure any necessary conversion/validation is done beforehand so
resolved_input_context and its isinstance checks work correctly (update the
variable passed to resolved_input_context, not the context manager signature).
- Around line 588-597: The call to execute_llm_call passes job_id as a str while
execute_llm_call(job_id: UUID) expects a UUID; convert or validate the string
before calling. In the caller where job_id is a string (the function that
receives job_id as a parameter), coerce it to a UUID instance (e.g., via
uuid.UUID(job_id)) or validate and raise a clear error if invalid, and pass that
UUID into execute_llm_call; ensure uuid is imported and add a short
comment/validation near the job_id usage to document the conversion.
- Line 634: The call to handle_job_error is using the wrong types: pass job_uuid
(UUID) instead of job_id (str) and convert request.callback_url to a plain
string or None before calling; update the call site that currently does return
handle_job_error(job_id, request.callback_url, callback_response) to use
handle_job_error(job_uuid, str(request.callback_url) if request.callback_url is
not None else None, callback_response) so the function receives a UUID and a str
| None as expected.
- Around line 477-479: Remove the redundant standalone call to
resolve_input(query.input) and its error check because
resolved_input_context(query) already calls resolve_input internally; instead
rely on resolved_input_context to handle resolution and errors (keeping the same
BlockResult(error=..., llm_call_id=llm_call_id) behavior when the context
reports an error), and remove unused variables (resolved_input, resolve_error)
so the function only uses the resolved_input provided by resolved_input_context.
---
Outside diff comments:
In `@backend/app/models/llm/request.py`:
- Around line 1-17: The top of request.py contains duplicated import blocks
(e.g., repeated imports of Enum, Annotated/Any/Literal/Union, UUID/uuid4,
SQLModel/Field/Index/text, model_validator/HttpUrl, datetime, now, and JSONB)
causing shadowing; consolidate into a single import section by removing the
redundant second block and keeping one complete set that includes JSONB, Index,
text, and any symbols used by classes like the request model or validators
(e.g., model_validator, HttpUrl, UUID, uuid4, Field, SQLModel, now).
In `@backend/app/services/llm/jobs.py`:
- Line 1: The file contains formatting issues (see the top-level "import
logging" line) causing CI to fail; run the project's formatter (black) on the
module to reformat it (e.g., run black for the file), commit the reformatted
file so the import and file whitespace match the project's style.
---
Duplicate comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 739-743: The callback_url being passed to handle_job_error is a
Pydantic HttpUrl (request.callback_url) which is not a str in Pydantic v2;
update the call that returns on unexpected error to pass
str(request.callback_url) (or None if callback is None) so handle_job_error
receives a plain str | None—i.e., modify the return that currently calls
handle_job_error(job_uuid, request.callback_url, callback_response) to convert
request.callback_url to a string before passing it, keeping the rest (job_uuid
and callback_response from APIResponse.failure_response) unchanged.
- Around line 663-674: execute_chain_job never marks the job as
JobStatus.PROCESSING which breaks status monitoring; inside execute_chain_job
(after opening Session(engine) and before creating/starting the chain) update
the job record for job_uuid to JobStatus.PROCESSING using the same approach as
execute_job (i.e., within the Session call persist the status change and
commit/flush so the DB reflects the transition) — ensure you reference the job
by job_uuid and use the JobStatus.PROCESSING enum and the same session instance
used around create_llm_chain so the status update is durable before further
processing.
- Around line 732-737: In execute_chain_job's inner except that attempts to log
a failure updating chain status, change the bare "except Exception:" to bind the
exception (e.g., "except Exception as update_err:") and use that bound name in
the logger.error f-string (replace "{e}" with "{update_err}") so the log shows
the actual status-update exception; keep exc_info=True to preserve traceback and
retain chain_uuid context in the message.
---
Nitpick comments:
In `@backend/app/crud/llm.py`:
- Line 49: The docstring for the function whose signature includes "chain_id:
UUID | None = None" is missing documentation for the new chain_id parameter;
update the Args section of that function's docstring (and the same section for
the similar function block around lines 55-67) to add an entry for "chain_id"
describing its type (UUID | None), purpose (optional identifier to associate the
operation with a chain), and default behavior when None. Reference the function
signature "chain_id: UUID | None = None" and ensure the docstring style matches
the existing Args formatting.
In `@backend/app/models/llm/request.py`:
- Around line 580-586: Replace the current string-enum pattern for ChainStatus
(class ChainStatus(str, Enum)) with the Python 3.11+ idiomatic StrEnum: import
StrEnum from enum and declare class ChainStatus(StrEnum) with the same members
(PENDING, RUNNING, FAILED, COMPLETED); this removes the manual str/Enum
inheritance while preserving values and behavior.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
backend/app/api/routes/llm_chain.pybackend/app/crud/llm.pybackend/app/models/llm/request.pybackend/app/models/llm/response.pybackend/app/services/llm/jobs.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/api/routes/llm_chain.py
| block_sequences: list[str] | None = Field( | ||
| default_factory=list, | ||
| sa_column=sa.Column( | ||
| JSONB, | ||
| nullable=True, | ||
| comment="Ordered list of llm_call UUIDs as blocks complete", | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Potential type mismatch: default_factory=list with nullable=True column.
block_sequences uses default_factory=list (Python-side default: empty list) but the JSONB column is nullable=True. This could lead to inconsistency where the model expects a list but the DB may contain NULL. Consider whether the default should be None or the column should be nullable=False.
💡 Option A: Make column non-nullable (if empty list is always valid)
block_sequences: list[str] | None = Field(
default_factory=list,
sa_column=sa.Column(
JSONB,
- nullable=True,
+ nullable=False,
+ server_default="[]",
comment="Ordered list of llm_call UUIDs as blocks complete",
),
)💡 Option B: Use None as default (if NULL is meaningful)
- block_sequences: list[str] | None = Field(
- default_factory=list,
+ block_sequences: list[str] | None = Field(
+ default=None,
sa_column=sa.Column(
JSONB,
nullable=True,
comment="Ordered list of llm_call UUIDs as blocks complete",
),
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| block_sequences: list[str] | None = Field( | |
| default_factory=list, | |
| sa_column=sa.Column( | |
| JSONB, | |
| nullable=True, | |
| comment="Ordered list of llm_call UUIDs as blocks complete", | |
| ), | |
| ) | |
| block_sequences: list[str] | None = Field( | |
| default_factory=list, | |
| sa_column=sa.Column( | |
| JSONB, | |
| nullable=False, | |
| server_default="[]", | |
| comment="Ordered list of llm_call UUIDs as blocks complete", | |
| ), | |
| ) |
| block_sequences: list[str] | None = Field( | |
| default_factory=list, | |
| sa_column=sa.Column( | |
| JSONB, | |
| nullable=True, | |
| comment="Ordered list of llm_call UUIDs as blocks complete", | |
| ), | |
| ) | |
| block_sequences: list[str] | None = Field( | |
| default=None, | |
| sa_column=sa.Column( | |
| JSONB, | |
| nullable=True, | |
| comment="Ordered list of llm_call UUIDs as blocks complete", | |
| ), | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/models/llm/request.py` around lines 650 - 657, The Field
declaration for block_sequences sets a Python default of an empty list via
default_factory=list while the underlying sa_column JSONB is nullable=True,
creating a mismatch; pick one approach and make changes accordingly: either set
sa_column nullable=False so the DB always stores an array when block_sequences
(Field) defaults to [], or change the Field to default=None (remove
default_factory) and keep nullable=True so Python-side type matches DB NULL
semantics; update the Field/type annotation for block_sequences and the
sa.Column nullable flag consistently (referencing block_sequences, Field and the
sa.Column(JSONB, nullable=...) entry).
backend/app/services/llm/jobs.py
Outdated
| exc_info=True, | ||
| ) | ||
| return handle_job_error(job_uuid, callback_url_str, callback_response) | ||
| return handle_job_error(job_id, request.callback_url, callback_response) |
There was a problem hiding this comment.
Type mismatch: handle_job_error called with str instead of UUID, and HttpUrl instead of str | None.
Line 634 has two type issues:
job_idisstrbuthandle_job_errorexpectsUUID(should usejob_uuid)request.callback_urlisHttpUrl | Nonebuthandle_job_errorexpectsstr | None
🐛 Proposed fix
- return handle_job_error(job_id, request.callback_url, callback_response)
+ return handle_job_error(
+ job_uuid,
+ str(request.callback_url) if request.callback_url else None,
+ callback_response,
+ )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/jobs.py` at line 634, The call to handle_job_error
is using the wrong types: pass job_uuid (UUID) instead of job_id (str) and
convert request.callback_url to a plain string or None before calling; update
the call site that currently does return handle_job_error(job_id,
request.callback_url, callback_response) to use handle_job_error(job_uuid,
str(request.callback_url) if request.callback_url is not None else None,
callback_response) so the function receives a UUID and a str | None as expected.
Prajna1999
left a comment
There was a problem hiding this comment.
In the TTS PR jobs.py was refactored to different functions. Make sure they work with llm_chain as well. Otherwise LGTM!
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (8)
backend/app/services/llm/jobs.py (8)
585-594:⚠️ Potential issue | 🟠 MajorType mismatch:
job_idisstrbutexecute_llm_callexpectsUUID.Line 588 passes
job_id(astrparameter) toexecute_llm_call, but the function signature at line 374 declaresjob_id: UUID. Use the already-convertedjob_uuidinstead.🐛 Proposed fix
result = execute_llm_call( config=request.config, query=request.query, - job_id=job_id, + job_id=job_uuid, project_id=project_id, organization_id=organization_id,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 585 - 594, execute_llm_call is being called with job_id (a str) but its signature declares job_id: UUID; replace the string job_id with the already-converted UUID variable job_uuid when calling execute_llm_call so the types match; update the call site where execute_llm_call(...) currently passes job_id to pass job_uuid instead (ensure any downstream uses expecting the original string remain unchanged).
550-557: 🛠️ Refactor suggestion | 🟠 MajorAdd type annotation for
task_instanceparameter.The
task_instanceparameter lacks a type annotation. Per coding guidelines, all function parameters require type hints. Since it's required by Celery but unused, annotate it asAny.🛠️ Proposed fix
def execute_job( request_data: dict, project_id: int, organization_id: int, job_id: str, task_id: str, - task_instance, + task_instance: Any, # noqa: ARG001 - required by Celery task signature ) -> dict:As per coding guidelines, "Always add type hints to all function parameters and return values in Python code."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 550 - 557, The function execute_job has an untyped parameter task_instance; add a type annotation of Any to it and import Any from typing (or typing_extensions) so the signature reads task_instance: Any and satisfies the project's typing rules; ensure the import statement (e.g., from typing import Any) is added/merged with existing imports and keep the return type as dict.
736-740:⚠️ Potential issue | 🟡 MinorConvert
request.callback_url(Pydantic v2HttpUrl) tostrbefore passing tohandle_job_error.In Pydantic v2,
HttpUrlis not astrsubclass and requires explicit conversion. The functionhandle_job_errorexpectscallback_url: str | None.🐛 Proposed fix
callback_response = APIResponse.failure_response( error="Unexpected error occurred", metadata=request.request_metadata, ) - return handle_job_error(job_uuid, request.callback_url, callback_response) + return handle_job_error( + job_uuid, + str(request.callback_url) if request.callback_url else None, + callback_response, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 736 - 740, The code passes request.callback_url (a Pydantic v2 HttpUrl) directly to handle_job_error which expects callback_url: str | None; explicitly convert the URL to a string when calling handle_job_error (e.g., use str(request.callback_url) or None if callback_url is None) so that the signature of handle_job_error receives a plain str; update the call that returns handle_job_error(job_uuid, request.callback_url, callback_response) to pass a stringified callback URL instead.
729-734:⚠️ Potential issue | 🟡 MinorInner
exceptblock logs the outer exceptioneinstead of the status-update exception.The
except Exception:at line 729 does not bind the caught exception, so{e}in the log message still refers to the outer chain-execution error — not the failure to update chain status.🐛 Proposed fix
- except Exception: + except Exception as update_error: logger.error( - f"[execute_chain_job] Failed to update chain status: {e} | " + f"[execute_chain_job] Failed to update chain status: {update_error} | " f"chain_id={chain_uuid}", exc_info=True, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 729 - 734, The inner except in execute_chain_job that wraps the status-update call mistakenly logs the outer exception variable `e`; change the handler to capture the actual exception (e.g., `except Exception as err:`) and log `err` (or include both `err` and `e` for context) in the logger.error call so the log reflects the failure to update chain status rather than the original chain-execution error; update the logging message near the status-update call to reference the correct exception variable.
631-631:⚠️ Potential issue | 🟠 MajorType mismatches:
job_idshould bejob_uuidandrequest.callback_urlneedsstrconversion.Line 631 has two type issues:
job_idisstrbuthandle_job_errorexpectsUUID— usejob_uuidrequest.callback_urlisHttpUrl | Nonebut the function expectsstr | None— usecallback_url_str🐛 Proposed fix
- return handle_job_error(job_id, request.callback_url, callback_response) + return handle_job_error(job_uuid, callback_url_str, callback_response)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` at line 631, The call to handle_job_error is using the wrong types: replace the first argument job_id (str) with job_uuid (UUID) and pass a str or None for the callback URL instead of request.callback_url (HttpUrl | None); convert request.callback_url to a string (use the existing callback_url_str or do str(request.callback_url) when not None) so the call becomes handle_job_error(job_uuid, callback_url_str, callback_response) keeping the same callback_response argument.
486-496:⚠️ Potential issue | 🟠 MajorType mismatch:
resolved_input_contextexpectsTextInput | AudioInput, notQueryParams.The context manager
resolved_input_context(query_input: TextInput | AudioInput)at line 152 expects aTextInputorAudioInput, but line 488 passesquerywhich is aQueryParamsobject. This causes theisinstance(query_input, AudioInput)check in the cleanup logic to always beFalse, preventing proper temp file cleanup for audio inputs.🐛 Proposed fix
try: - with resolved_input_context(query) as resolved_input: + with resolved_input_context(query.input) as resolved_input: response, error = decorated_execute(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 486 - 496, The issue is that resolved_input_context expects a TextInput or AudioInput but the code passes the entire QueryParams object (query) to it; change the context manager call to pass the actual input field (e.g., query.input or whatever property on QueryParams holds the TextInput|AudioInput) so resolved_input_context receives a TextInput|AudioInput, then use resolved_input as before when calling decorated_execute (keep decorated_execute, BlockResult, and llm_call_id logic unchanged); ensure any Variable/attribute name you reference matches the QueryParams schema so audio temp-file cleanup in resolved_input_context's isinstance(AudioInput) branch works correctly.
634-641: 🛠️ Refactor suggestion | 🟠 MajorAdd type annotation for
task_instanceparameter.Same as
execute_job, thetask_instanceparameter needs a type annotation per coding guidelines.🛠️ Proposed fix
def execute_chain_job( request_data: dict, project_id: int, organization_id: int, job_id: str, task_id: str, - task_instance, + task_instance: Any, # noqa: ARG001 - required by Celery task signature ) -> dict:As per coding guidelines, "Always add type hints to all function parameters and return values in Python code."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 634 - 641, The function execute_chain_job is missing a type annotation for the task_instance parameter; update its signature to add the same task_instance type used in execute_job (e.g., the TaskInstance class or typing alias used there), import that type if necessary, and keep the return type as dict to satisfy the codebase typing guidelines.
660-671:⚠️ Potential issue | 🟠 Major
execute_chain_jobnever sets the job toJobStatus.PROCESSING— inconsistency withexecute_job.
execute_jobimmediately marks the job asPROCESSING(lines 574-576) when the Celery task starts.execute_chain_jobskips this step entirely, leaving the job in its initial state until completion or failure. This breaks monitoring/alerting that relies on status transitions.🐛 Proposed fix
try: with Session(engine) as session: + JobCrud(session=session).update( + job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING) + ) + chain_record = create_llm_chain(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 660 - 671, execute_chain_job never sets the job status to JobStatus.PROCESSING on start, unlike execute_job; add a status transition to PROCESSING at the beginning of execute_chain_job (e.g., immediately after opening the DB Session and before calling create_llm_chain). Use the same helper/update function or pattern used by execute_job (e.g., update_job_status or set_job_status with JobStatus.PROCESSING and job_uuid) so the job record is updated consistently before performing create_llm_chain and further processing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 606-609: The call to JobCrud.update is passing job_id as a str but
the method expects a UUID; replace the string with the already-converted UUID
(use job_uuid) or explicitly convert job_id to a UUID before calling
JobCrud(session=session).update(job_id=...), so update the call inside the with
Session(engine) block to pass the UUID (job_uuid) to JobCrud.update along with
JobUpdate(status=JobStatus.SUCCESS).
---
Duplicate comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 585-594: execute_llm_call is being called with job_id (a str) but
its signature declares job_id: UUID; replace the string job_id with the
already-converted UUID variable job_uuid when calling execute_llm_call so the
types match; update the call site where execute_llm_call(...) currently passes
job_id to pass job_uuid instead (ensure any downstream uses expecting the
original string remain unchanged).
- Around line 550-557: The function execute_job has an untyped parameter
task_instance; add a type annotation of Any to it and import Any from typing (or
typing_extensions) so the signature reads task_instance: Any and satisfies the
project's typing rules; ensure the import statement (e.g., from typing import
Any) is added/merged with existing imports and keep the return type as dict.
- Around line 736-740: The code passes request.callback_url (a Pydantic v2
HttpUrl) directly to handle_job_error which expects callback_url: str | None;
explicitly convert the URL to a string when calling handle_job_error (e.g., use
str(request.callback_url) or None if callback_url is None) so that the signature
of handle_job_error receives a plain str; update the call that returns
handle_job_error(job_uuid, request.callback_url, callback_response) to pass a
stringified callback URL instead.
- Around line 729-734: The inner except in execute_chain_job that wraps the
status-update call mistakenly logs the outer exception variable `e`; change the
handler to capture the actual exception (e.g., `except Exception as err:`) and
log `err` (or include both `err` and `e` for context) in the logger.error call
so the log reflects the failure to update chain status rather than the original
chain-execution error; update the logging message near the status-update call to
reference the correct exception variable.
- Line 631: The call to handle_job_error is using the wrong types: replace the
first argument job_id (str) with job_uuid (UUID) and pass a str or None for the
callback URL instead of request.callback_url (HttpUrl | None); convert
request.callback_url to a string (use the existing callback_url_str or do
str(request.callback_url) when not None) so the call becomes
handle_job_error(job_uuid, callback_url_str, callback_response) keeping the same
callback_response argument.
- Around line 486-496: The issue is that resolved_input_context expects a
TextInput or AudioInput but the code passes the entire QueryParams object
(query) to it; change the context manager call to pass the actual input field
(e.g., query.input or whatever property on QueryParams holds the
TextInput|AudioInput) so resolved_input_context receives a TextInput|AudioInput,
then use resolved_input as before when calling decorated_execute (keep
decorated_execute, BlockResult, and llm_call_id logic unchanged); ensure any
Variable/attribute name you reference matches the QueryParams schema so audio
temp-file cleanup in resolved_input_context's isinstance(AudioInput) branch
works correctly.
- Around line 634-641: The function execute_chain_job is missing a type
annotation for the task_instance parameter; update its signature to add the same
task_instance type used in execute_job (e.g., the TaskInstance class or typing
alias used there), import that type if necessary, and keep the return type as
dict to satisfy the codebase typing guidelines.
- Around line 660-671: execute_chain_job never sets the job status to
JobStatus.PROCESSING on start, unlike execute_job; add a status transition to
PROCESSING at the beginning of execute_chain_job (e.g., immediately after
opening the DB Session and before calling create_llm_chain). Use the same
helper/update function or pattern used by execute_job (e.g., update_job_status
or set_job_status with JobStatus.PROCESSING and job_uuid) so the job record is
updated consistently before performing create_llm_chain and further processing.
…ove callback logic in ChainContext
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 6
♻️ Duplicate comments (5)
backend/app/services/llm/jobs.py (3)
553-560:⚠️ Potential issue | 🟡 MinorMake Celery placeholder args explicit (typed + intentionally unused).
task_instanceis untyped/unused, andtask_idis unused inexecute_chain_job. Make intent explicit to avoid recurring ARG001 noise.Proposed fix
def execute_job( @@ - task_instance, + task_instance: Any, ) -> dict: @@ + _ = task_instance @@ def execute_chain_job( @@ - task_id: str, - task_instance, + task_id: str, + task_instance: Any, ) -> dict: @@ + _ = task_instance logger.info( f"[execute_chain_job] Starting chain execution | " - f"job_id={job_uuid}, total_blocks={len(request.blocks)}" + f"job_id={job_uuid}, task_id={task_id}, total_blocks={len(request.blocks)}" )As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".
Also applies to: 637-644
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 553 - 560, The function execute_job has untyped/unused Celery parameters: task_instance is untyped and task_id is unused (and execute_chain_job also leaves task_id unused); add explicit type hints and mark intentionally unused parameters (e.g., task_id: str = None or task_id: str | None and task_instance: Any) and/or rename them to _task_id and _task_instance or annotate with typing.Any and a leading underscore to silence ARG001, and update the return type if missing; apply the same change to execute_chain_job (lines referenced) so all Celery handler signatures are typed and clearly intentionally unused.
732-735:⚠️ Potential issue | 🟡 MinorCapture and log the inner status-update exception explicitly.
The inner
exceptcurrently logs outere, which hides the real update failure reason.Proposed fix
- except Exception: + except Exception as update_error: logger.error( - f"[execute_chain_job] Failed to update chain status: {e} | " + f"[execute_chain_job] Failed to update chain status: {update_error} | " f"chain_id={chain_uuid}", exc_info=True, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 732 - 735, The except block inside execute_chain_job is logging the outer exception variable e instead of the exception caught by the inner except, which hides the real failure; change the inner except to capture its exception (e.g., except Exception as update_exc:) and log update_exc (and preferably add exc_info=True) in the logger.error call that currently references e so the actual status-update error and stack trace are recorded.
588-592:⚠️ Potential issue | 🟠 MajorUse
job_uuidconsistently for UUID-typed calls inexecute_job.Line 591, Line 611, and Line 634 pass
job_id(str) where UUID is expected. This can break DB updates and error handling paths.Proposed fix
result = execute_llm_call( config=request.config, query=request.query, - job_id=job_id, + job_id=job_uuid, project_id=project_id, organization_id=organization_id, request_metadata=request.request_metadata, langfuse_credentials=langfuse_credentials, include_provider_raw_response=request.include_provider_raw_response, ) @@ with Session(engine) as session: JobCrud(session=session).update( - job_id=job_id, job_update=JobUpdate(status=JobStatus.SUCCESS) + job_id=job_uuid, job_update=JobUpdate(status=JobStatus.SUCCESS) ) @@ - return handle_job_error(job_id, request.callback_url, callback_response) + return handle_job_error(job_uuid, callback_url_str, callback_response)Also applies to: 609-612, 634-634
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 588 - 592, The call sites in execute_job pass a string job_id where a UUID is expected; update all calls to execute_llm_call and any other UUID-typed parameters to use the UUID variable job_uuid instead of job_id (e.g., the execute_llm_call invocation and the other calls around execute_job that currently pass job_id on lines near the execute_llm_call, the block around the job result handling, and the DB update/error-handling calls). Locate execute_llm_call and the surrounding execute_job logic, replace job_id with job_uuid for UUID-typed arguments, and ensure any downstream functions still expect a UUID (adjust parameter names or conversions only if necessary to preserve types).backend/app/services/llm/chain/chain.py (1)
39-44:⚠️ Potential issue | 🟡 MinorUpdate
ChainContextfield types to match actual call sites.
callback_urlis declaredstrbut is instantiated withNonein chain-job flow;request_metadatais also unparameterized (dict | None). Tighten both types for consistency and static safety.Proposed fix
`@dataclass` class ChainContext: @@ - callback_url: str + callback_url: str | None @@ - request_metadata: dict | None = None + request_metadata: dict[str, Any] | None = NoneAs per coding guidelines, "Use Python 3.11+ with type hints throughout the codebase".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/chain/chain.py` around lines 39 - 44, Change the ChainContext field annotations so they reflect actual usage: update callback_url to allow None (e.g., callback_url: str | None) and parameterize request_metadata with value type (e.g., request_metadata: dict[str, Any] | None). Locate the ChainContext dataclass in chain.py and modify those two type hints accordingly (ensure Any is imported/available) so static checkers match runtime call sites where None may be passed.backend/app/models/llm/request.py (1)
687-694:⚠️ Potential issue | 🟡 MinorAlign
block_sequencesnullability with its default semantics.
default_factory=list(Line 688) conflicts withnullable=True+list[str] | None. This can deserialize toNonefrom DB while downstream logic expects an appendable list.Proposed fix (non-null list semantics)
- block_sequences: list[str] | None = Field( + block_sequences: list[str] = Field( default_factory=list, sa_column=sa.Column( JSONB, - nullable=True, + nullable=False, + server_default=sa.text("'[]'::jsonb"), comment="Ordered list of llm_call UUIDs as blocks complete", ), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/models/llm/request.py` around lines 687 - 694, The Field block_sequences should enforce non-null list semantics: change its type from list[str] | None to list[str] and update the sa.Column nullable flag from True to False so the DB never returns None (keep default_factory=list). Locate the block_sequences Field declaration and set its type to list[str], and change sa.Column(..., nullable=False, ...) to ensure consumers can safely append without null checks.
🧹 Nitpick comments (1)
backend/app/tests/services/llm/test_jobs.py (1)
1170-1268: Prefer factory-style fixtures for new test data builders.The added fixtures return fixed payloads/objects directly; in this test path, factory fixtures would improve reuse and scenario variation while matching repo test standards.
♻️ Example factory fixture shape
+from typing import Any, Callable `@pytest.fixture` -def chain_request_data(self) -> dict[str, Any]: - return { - "query": {"input": "Test query"}, - "blocks": [...], - } +def chain_request_data_factory(self) -> Callable[[str], dict[str, Any]]: + def _factory(input_text: str = "Test query") -> dict[str, Any]: + return { + "query": {"input": input_text}, + "blocks": [...], + } + return _factoryAs per coding guidelines, "backend/app/tests/**/*.py: Use factory pattern for test fixtures in backend/app/tests/".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/tests/services/llm/test_jobs.py` around lines 1170 - 1268, The fixtures chain_request, chain_request_data, and mock_llm_response return static payloads—change them to factory-style fixtures so tests can parametrize/instantiate varied data per test; refactor LLMChainRequest-producing fixture (chain_request), the dict-producing fixture (chain_request_data), and the LLMCallResponse fixture (mock_llm_response) into callables that accept override kwargs or return a builder function (e.g., chain_request_factory, chain_request_data_factory, mock_llm_response_factory) so tests can call them with different inputs, and update tests to call the factory to create instances rather than using fixed objects.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/llm/chain/chain.py`:
- Around line 184-186: The recursive handoff using "return
self._next.execute(next_query)" can overflow the Python call stack on long
chains; change LLMChain.execute to an iterative loop: start with current = self
and query = initial_query, run the current block's execution logic to produce
result, compute next_query = result_to_query(result), then set current =
current._next and query = next_query and continue until current is None, finally
return the last result; replace the recursive call site that uses
self._next.execute with this loop-based traversal while keeping result_to_query
and self._next as the linkage.
- Around line 97-104: The log prefix in the exception and success paths of
_send_intermediate_callback uses the generic "[ChainContext]" tag; update both
logger.info and logger.warning messages to use the function-qualified prefix
"[_send_intermediate_callback]" per convention, keeping the rest of the message
and variables (block_index, self.total_blocks, self.job_id, and the exception
variable e) unchanged so the messages become
logger.info(f"[_send_intermediate_callback] ...") and
logger.warning(f"[_send_intermediate_callback] ...").
In `@backend/app/tests/services/llm/test_chain_executor.py`:
- Line 171: The tests assign unused locals `result` and `mock_job_crud` (e.g.,
the `result = executor.run()` call and any `mock_job_crud = ...` assignments)
which trigger F841; either remove those assignments or use the values in
assertions. Locate the `executor.run()` invocation(s) and the `mock_job_crud`
variable in the test functions and (a) delete the `result =` prefix and call
`executor.run()` directly if the return value is irrelevant, or (b) replace the
unused `mock_job_crud` assignment with an assertion that inspects its expected
interactions/state; ensure no unused variables remain.
In `@backend/app/tests/services/llm/test_chain.py`:
- Line 291: The test contains an unused binding `result = block1.execute(query)`
which triggers lint F841; remove the unused variable by replacing that line with
a direct call `block1.execute(query)` (or assign to `_` if you prefer), ensuring
the test still executes the call to `block1.execute` without creating the unused
`result` variable.
In `@backend/app/tests/services/llm/test_jobs.py`:
- Line 1360: The test assigns an unused local variable `result =
self._execute_chain_job(chain_request_data)` which triggers F841; either remove
the assignment and call `self._execute_chain_job(chain_request_data)` directly,
or replace it with an assertion that validates the returned payload (e.g.,
`assert result == expected_payload` or assert on specific fields) to match the
pattern used in other tests; locate the call to `_execute_chain_job` and update
it accordingly.
- Around line 1167-1367: Add explicit type hints to the fixtures and test
methods in TestStartChainJob and TestExecuteChainJob: annotate the chain_request
fixture to return LLMChainRequest, annotate test_start_chain_job_success and
test_start_chain_job_celery_failure parameters (e.g., db: Session,
chain_request: LLMChainRequest) and their return types (-> None); annotate
chain_request_data fixture to return Dict[str, Any], mock_llm_response to return
LLMCallResponse, and _execute_chain_job signature to accept request_data:
Dict[str, Any] and return Dict[str, Any] (or Any if more appropriate); annotate
test_success_flow, test_exception_during_chain_creation, and
test_chain_status_updated_to_failed_on_error parameters (e.g., db/session mocks)
and return types (-> None). Import typing types (Dict, Any) and the concrete
types (LLMChainRequest, LLMCallResponse, Session) at the top if not already
present.
---
Duplicate comments:
In `@backend/app/models/llm/request.py`:
- Around line 687-694: The Field block_sequences should enforce non-null list
semantics: change its type from list[str] | None to list[str] and update the
sa.Column nullable flag from True to False so the DB never returns None (keep
default_factory=list). Locate the block_sequences Field declaration and set its
type to list[str], and change sa.Column(..., nullable=False, ...) to ensure
consumers can safely append without null checks.
In `@backend/app/services/llm/chain/chain.py`:
- Around line 39-44: Change the ChainContext field annotations so they reflect
actual usage: update callback_url to allow None (e.g., callback_url: str | None)
and parameterize request_metadata with value type (e.g., request_metadata:
dict[str, Any] | None). Locate the ChainContext dataclass in chain.py and modify
those two type hints accordingly (ensure Any is imported/available) so static
checkers match runtime call sites where None may be passed.
In `@backend/app/services/llm/jobs.py`:
- Around line 553-560: The function execute_job has untyped/unused Celery
parameters: task_instance is untyped and task_id is unused (and
execute_chain_job also leaves task_id unused); add explicit type hints and mark
intentionally unused parameters (e.g., task_id: str = None or task_id: str |
None and task_instance: Any) and/or rename them to _task_id and _task_instance
or annotate with typing.Any and a leading underscore to silence ARG001, and
update the return type if missing; apply the same change to execute_chain_job
(lines referenced) so all Celery handler signatures are typed and clearly
intentionally unused.
- Around line 732-735: The except block inside execute_chain_job is logging the
outer exception variable e instead of the exception caught by the inner except,
which hides the real failure; change the inner except to capture its exception
(e.g., except Exception as update_exc:) and log update_exc (and preferably add
exc_info=True) in the logger.error call that currently references e so the
actual status-update error and stack trace are recorded.
- Around line 588-592: The call sites in execute_job pass a string job_id where
a UUID is expected; update all calls to execute_llm_call and any other
UUID-typed parameters to use the UUID variable job_uuid instead of job_id (e.g.,
the execute_llm_call invocation and the other calls around execute_job that
currently pass job_id on lines near the execute_llm_call, the block around the
job result handling, and the DB update/error-handling calls). Locate
execute_llm_call and the surrounding execute_job logic, replace job_id with
job_uuid for UUID-typed arguments, and ensure any downstream functions still
expect a UUID (adjust parameter names or conversions only if necessary to
preserve types).
---
Nitpick comments:
In `@backend/app/tests/services/llm/test_jobs.py`:
- Around line 1170-1268: The fixtures chain_request, chain_request_data, and
mock_llm_response return static payloads—change them to factory-style fixtures
so tests can parametrize/instantiate varied data per test; refactor
LLMChainRequest-producing fixture (chain_request), the dict-producing fixture
(chain_request_data), and the LLMCallResponse fixture (mock_llm_response) into
callables that accept override kwargs or return a builder function (e.g.,
chain_request_factory, chain_request_data_factory, mock_llm_response_factory) so
tests can call them with different inputs, and update tests to call the factory
to create instances rather than using fixed objects.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
backend/app/crud/llm.pybackend/app/models/llm/__init__.pybackend/app/models/llm/request.pybackend/app/services/llm/chain/chain.pybackend/app/services/llm/chain/types.pybackend/app/services/llm/jobs.pybackend/app/tests/crud/test_llm_chain.pybackend/app/tests/services/llm/test_chain.pybackend/app/tests/services/llm/test_chain_executor.pybackend/app/tests/services/llm/test_jobs.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/services/llm/chain/types.py
| logger.info( | ||
| f"[ChainContext] Sent intermediate callback | " | ||
| f"block={block_index + 1}/{self.total_blocks}, job_id={self.job_id}" | ||
| ) | ||
| except Exception as e: | ||
| logger.warning( | ||
| f"[ChainContext] Failed to send intermediate callback: {e} | " | ||
| f"block={block_index + 1}/{self.total_blocks}, job_id={self.job_id}" |
There was a problem hiding this comment.
Use function-qualified log prefixes in _send_intermediate_callback.
Current prefix [ChainContext] is too generic; the project convention requires function name prefixes.
Proposed fix
- logger.info(
- f"[ChainContext] Sent intermediate callback | "
+ logger.info(
+ f"[ChainContext._send_intermediate_callback] Sent intermediate callback | "
f"block={block_index + 1}/{self.total_blocks}, job_id={self.job_id}"
)
@@
- logger.warning(
- f"[ChainContext] Failed to send intermediate callback: {e} | "
+ logger.warning(
+ f"[ChainContext._send_intermediate_callback] Failed to send intermediate callback: {e} | "
f"block={block_index + 1}/{self.total_blocks}, job_id={self.job_id}"
)As per coding guidelines, "Prefix all log messages with the function name in square brackets: logger.info(f\"[function_name] Message {mask_string(sensitive_value)}\")".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/chain/chain.py` around lines 97 - 104, The log
prefix in the exception and success paths of _send_intermediate_callback uses
the generic "[ChainContext]" tag; update both logger.info and logger.warning
messages to use the function-qualified prefix "[_send_intermediate_callback]"
per convention, keeping the rest of the message and variables (block_index,
self.total_blocks, self.job_id, and the exception variable e) unchanged so the
messages become logger.info(f"[_send_intermediate_callback] ...") and
logger.warning(f"[_send_intermediate_callback] ...").
| if self._next: | ||
| next_query = result_to_query(result) | ||
| return self._next.execute(next_query) |
There was a problem hiding this comment.
Recursive block handoff can fail on long chains.
return self._next.execute(next_query) creates one stack frame per block. Since block count is request-driven, sufficiently long chains can raise RecursionError. Prefer iterative traversal in LLMChain.execute().
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/services/llm/chain/chain.py` around lines 184 - 186, The
recursive handoff using "return self._next.execute(next_query)" can overflow the
Python call stack on long chains; change LLMChain.execute to an iterative loop:
start with current = self and query = initial_query, run the current block's
execution logic to produce result, compute next_query = result_to_query(result),
then set current = current._next and query = next_query and continue until
current is None, finally return the last result; replace the recursive call site
that uses self._next.execute with this loop-based traversal while keeping
result_to_query and self._next as the linkage.
| ): | ||
| mock_session.return_value.__enter__.return_value = MagicMock() | ||
|
|
||
| result = executor.run() |
There was a problem hiding this comment.
Clean up unused locals in these tests (F841).
Both result and mock_job_crud are assigned but never read.
Proposed fix
- result = executor.run()
+ executor.run()
@@
- patch("app.services.llm.chain.executor.JobCrud") as mock_job_crud,
+ patch("app.services.llm.chain.executor.JobCrud"),Also applies to: 203-203
🧰 Tools
🪛 Ruff (0.15.2)
[error] 171-171: Local variable result is assigned to but never used
Remove assignment to unused variable result
(F841)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/tests/services/llm/test_chain_executor.py` at line 171, The tests
assign unused locals `result` and `mock_job_crud` (e.g., the `result =
executor.run()` call and any `mock_job_crud = ...` assignments) which trigger
F841; either remove those assignments or use the values in assertions. Locate
the `executor.run()` invocation(s) and the `mock_job_crud` variable in the test
functions and (a) delete the `result =` prefix and call `executor.run()`
directly if the return value is irrelevant, or (b) replace the unused
`mock_job_crud` assignment with an assertion that inspects its expected
interactions/state; ensure no unused variables remain.
| response=text_response, usage=text_response.usage | ||
| ) | ||
|
|
||
| result = block1.execute(query) |
There was a problem hiding this comment.
Remove the unused result binding in this test.
The call result isn’t used; this triggers F841 and adds noise.
Proposed fix
- result = block1.execute(query)
+ block1.execute(query)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| result = block1.execute(query) | |
| block1.execute(query) |
🧰 Tools
🪛 Ruff (0.15.2)
[error] 291-291: Local variable result is assigned to but never used
Remove assignment to unused variable result
(F841)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/tests/services/llm/test_chain.py` at line 291, The test contains
an unused binding `result = block1.execute(query)` which triggers lint F841;
remove the unused variable by replacing that line with a direct call
`block1.execute(query)` (or assign to `_` if you prefer), ensuring the test
still executes the call to `block1.execute` without creating the unused `result`
variable.
| class TestStartChainJob: | ||
| """Test cases for the start_chain_job function.""" | ||
|
|
||
| @pytest.fixture | ||
| def chain_request(self): | ||
| return LLMChainRequest( | ||
| query=QueryParams(input="Test query"), | ||
| blocks=[ | ||
| ChainBlockModel( | ||
| config=LLMCallConfig( | ||
| blob=ConfigBlob( | ||
| completion=NativeCompletionConfig( | ||
| provider="openai-native", | ||
| type="text", | ||
| params={"model": "gpt-4"}, | ||
| ) | ||
| ) | ||
| ) | ||
| ) | ||
| ], | ||
| ) | ||
|
|
||
| def test_start_chain_job_success(self, db: Session, chain_request): | ||
| project = get_project(db) | ||
|
|
||
| with ( | ||
| patch("app.services.llm.jobs.start_high_priority_job") as mock_schedule, | ||
| patch("app.services.llm.jobs.JobCrud") as mock_job_crud_class, | ||
| ): | ||
| mock_schedule.return_value = "fake-task-id" | ||
| mock_job = MagicMock() | ||
| mock_job.id = uuid4() | ||
| mock_job.job_type = JobType.LLM_CHAIN | ||
| mock_job.status = JobStatus.PENDING | ||
| mock_job_crud_class.return_value.create.return_value = mock_job | ||
|
|
||
| job_id = start_chain_job( | ||
| db, chain_request, project.id, project.organization_id | ||
| ) | ||
|
|
||
| assert job_id == mock_job.id | ||
| mock_schedule.assert_called_once() | ||
| _, kwargs = mock_schedule.call_args | ||
| assert kwargs["function_path"] == "app.services.llm.jobs.execute_chain_job" | ||
|
|
||
| def test_start_chain_job_celery_failure(self, db: Session, chain_request): | ||
| project = get_project(db) | ||
|
|
||
| with ( | ||
| patch("app.services.llm.jobs.start_high_priority_job") as mock_schedule, | ||
| patch("app.services.llm.jobs.JobCrud") as mock_job_crud_class, | ||
| ): | ||
| mock_schedule.side_effect = Exception("Celery connection failed") | ||
| mock_job = MagicMock() | ||
| mock_job.id = uuid4() | ||
| mock_job_crud_class.return_value.create.return_value = mock_job | ||
|
|
||
| with pytest.raises(HTTPException) as exc_info: | ||
| start_chain_job(db, chain_request, project.id, project.organization_id) | ||
|
|
||
| assert exc_info.value.status_code == 500 | ||
| assert "Internal server error while executing LLM chain job" in str( | ||
| exc_info.value.detail | ||
| ) | ||
|
|
||
|
|
||
| class TestExecuteChainJob: | ||
| """Test suite for execute_chain_job.""" | ||
|
|
||
| @pytest.fixture | ||
| def chain_request_data(self): | ||
| return { | ||
| "query": {"input": "Test query"}, | ||
| "blocks": [ | ||
| { | ||
| "config": { | ||
| "blob": { | ||
| "completion": { | ||
| "provider": "openai-native", | ||
| "type": "text", | ||
| "params": {"model": "gpt-4"}, | ||
| } | ||
| } | ||
| }, | ||
| } | ||
| ], | ||
| } | ||
|
|
||
| @pytest.fixture | ||
| def mock_llm_response(self): | ||
| return LLMCallResponse( | ||
| response=LLMResponse( | ||
| provider_response_id="resp-123", | ||
| conversation_id=None, | ||
| model="gpt-4", | ||
| provider="openai", | ||
| output=TextOutput(content=TextContent(value="Test response")), | ||
| ), | ||
| usage=Usage(input_tokens=10, output_tokens=20, total_tokens=30), | ||
| provider_raw_response=None, | ||
| ) | ||
|
|
||
| def _execute_chain_job(self, request_data): | ||
| return execute_chain_job( | ||
| request_data=request_data, | ||
| project_id=1, | ||
| organization_id=1, | ||
| job_id=str(uuid4()), | ||
| task_id="task-123", | ||
| task_instance=None, | ||
| ) | ||
|
|
||
| def test_success_flow(self, chain_request_data, mock_llm_response): | ||
| from app.services.llm.chain.types import BlockResult | ||
|
|
||
| with ( | ||
| patch("app.services.llm.jobs.Session") as mock_session, | ||
| patch("app.services.llm.jobs.create_llm_chain") as mock_create_chain, | ||
| patch("app.services.llm.jobs.get_provider_credential") as mock_creds, | ||
| patch("app.services.llm.chain.executor.Session") as mock_executor_session, | ||
| patch("app.services.llm.chain.executor.send_callback"), | ||
| patch("app.services.llm.chain.executor.update_llm_chain_status"), | ||
| patch("app.services.llm.chain.chain.execute_llm_call") as mock_execute_llm, | ||
| patch("app.services.llm.chain.chain.Session"), | ||
| ): | ||
| mock_session.return_value.__enter__.return_value = MagicMock() | ||
| mock_session.return_value.__exit__.return_value = None | ||
| mock_executor_session.return_value.__enter__.return_value = MagicMock() | ||
| mock_executor_session.return_value.__exit__.return_value = None | ||
|
|
||
| mock_chain_record = MagicMock() | ||
| mock_chain_record.id = uuid4() | ||
| mock_create_chain.return_value = mock_chain_record | ||
| mock_creds.return_value = None | ||
|
|
||
| mock_execute_llm.return_value = BlockResult( | ||
| response=mock_llm_response, | ||
| llm_call_id=uuid4(), | ||
| usage=mock_llm_response.usage, | ||
| ) | ||
|
|
||
| result = self._execute_chain_job(chain_request_data) | ||
|
|
||
| assert result["success"] is True | ||
|
|
||
| def test_exception_during_chain_creation(self, chain_request_data): | ||
| with ( | ||
| patch("app.services.llm.jobs.Session") as mock_session, | ||
| patch( | ||
| "app.services.llm.jobs.create_llm_chain", | ||
| side_effect=Exception("DB error"), | ||
| ), | ||
| patch("app.services.llm.jobs.handle_job_error") as mock_handle_error, | ||
| ): | ||
| mock_session.return_value.__enter__.return_value = MagicMock() | ||
| mock_session.return_value.__exit__.return_value = None | ||
| mock_handle_error.return_value = { | ||
| "success": False, | ||
| "error": "Unexpected error occurred", | ||
| } | ||
|
|
||
| result = self._execute_chain_job(chain_request_data) | ||
|
|
||
| assert result["success"] is False | ||
|
|
||
| def test_chain_status_updated_to_failed_on_error(self, chain_request_data): | ||
| chain_id = uuid4() | ||
|
|
||
| with ( | ||
| patch("app.services.llm.jobs.Session") as mock_session, | ||
| patch("app.services.llm.jobs.create_llm_chain") as mock_create_chain, | ||
| patch("app.services.llm.jobs.get_provider_credential") as mock_creds, | ||
| patch( | ||
| "app.services.llm.jobs.update_llm_chain_status" | ||
| ) as mock_update_status, | ||
| patch("app.services.llm.jobs.handle_job_error") as mock_handle_error, | ||
| patch( | ||
| "app.services.llm.chain.chain.LLMChain", | ||
| side_effect=Exception("Chain init error"), | ||
| ), | ||
| ): | ||
| mock_session.return_value.__enter__.return_value = MagicMock() | ||
| mock_session.return_value.__exit__.return_value = None | ||
|
|
||
| mock_chain_record = MagicMock() | ||
| mock_chain_record.id = chain_id | ||
| mock_create_chain.return_value = mock_chain_record | ||
| mock_creds.return_value = None | ||
| mock_handle_error.return_value = { | ||
| "success": False, | ||
| "error": "Unexpected error occurred", | ||
| } | ||
|
|
||
| result = self._execute_chain_job(chain_request_data) | ||
|
|
||
| mock_update_status.assert_called_once() | ||
| _, kwargs = mock_update_status.call_args | ||
| assert kwargs["chain_id"] == chain_id | ||
| assert kwargs["status"].value == "failed" | ||
|
|
||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify defs in this file that still miss explicit return annotations.
rg -n '^\s*def\s+\w+\(.*\):\s*$' backend/app/tests/services/llm/test_jobs.py | rg -v -- '->'Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 2299
Add type hints to chain test methods and fixtures.
The functions in TestStartChainJob and TestExecuteChainJob (lines 1171–1332) lack type annotations. Per coding guidelines, all Python functions must include type hints for parameters and return values.
♻️ Suggested typing pattern
+from typing import Any
class TestStartChainJob:
`@pytest.fixture`
- def chain_request(self):
+ def chain_request(self) -> LLMChainRequest:
return LLMChainRequest(...)
- def test_start_chain_job_success(self, db: Session, chain_request):
+ def test_start_chain_job_success(
+ self, db: Session, chain_request: LLMChainRequest
+ ) -> None:
...
class TestExecuteChainJob:
`@pytest.fixture`
- def chain_request_data(self):
+ def chain_request_data(self) -> dict[str, Any]:
return {...}
- def _execute_chain_job(self, request_data):
+ def _execute_chain_job(self, request_data: dict[str, Any]) -> dict[str, Any]:
return execute_chain_job(...)🧰 Tools
🪛 Ruff (0.15.2)
[error] 1360-1360: Local variable result is assigned to but never used
Remove assignment to unused variable result
(F841)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/tests/services/llm/test_jobs.py` around lines 1167 - 1367, Add
explicit type hints to the fixtures and test methods in TestStartChainJob and
TestExecuteChainJob: annotate the chain_request fixture to return
LLMChainRequest, annotate test_start_chain_job_success and
test_start_chain_job_celery_failure parameters (e.g., db: Session,
chain_request: LLMChainRequest) and their return types (-> None); annotate
chain_request_data fixture to return Dict[str, Any], mock_llm_response to return
LLMCallResponse, and _execute_chain_job signature to accept request_data:
Dict[str, Any] and return Dict[str, Any] (or Any if more appropriate); annotate
test_success_flow, test_exception_during_chain_creation, and
test_chain_status_updated_to_failed_on_error parameters (e.g., db/session mocks)
and return types (-> None). Import typing types (Dict, Any) and the concrete
types (LLMChainRequest, LLMCallResponse, Session) at the top if not already
present.
| "error": "Unexpected error occurred", | ||
| } | ||
|
|
||
| result = self._execute_chain_job(chain_request_data) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check if the file exists and get the line count
wc -l backend/app/tests/services/llm/test_jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 123
🏁 Script executed:
# Read the code around line 1360 with context
sed -n '1350,1370p' backend/app/tests/services/llm/test_jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 781
🏁 Script executed:
# Search for all occurrences of 'result' in the file to understand the pattern
rg -n 'result' backend/app/tests/services/llm/test_jobs.py | head -30Repository: ProjectTech4DevAI/kaapi-backend
Length of output: 1871
🏁 Script executed:
# Get more context around line 1360 to see the test function name
sed -n '1330,1375p' backend/app/tests/services/llm/test_jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 1882
🏁 Script executed:
# Check if there are any assertions on result variable after line 1360
sed -n '1360,1380p' backend/app/tests/services/llm/test_jobs.pyRepository: ProjectTech4DevAI/kaapi-backend
Length of output: 825
Remove unused local result variable or assert on its value.
At line 1360, result is assigned from self._execute_chain_job(chain_request_data) but never used. The variable triggers F841 (unused local variable). Either remove the assignment or add assertions to validate the returned payload, consistent with other similar test methods in this file.
🧰 Tools
🪛 Ruff (0.15.2)
[error] 1360-1360: Local variable result is assigned to but never used
Remove assignment to unused variable result
(F841)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/app/tests/services/llm/test_jobs.py` at line 1360, The test assigns
an unused local variable `result = self._execute_chain_job(chain_request_data)`
which triggers F841; either remove the assignment and call
`self._execute_chain_job(chain_request_data)` directly, or replace it with an
assertion that validates the returned payload (e.g., `assert result ==
expected_payload` or assert on specific fields) to match the pattern used in
other tests; locate the call to `_execute_chain_job` and update it accordingly.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
backend/app/services/llm/jobs.py (1)
732-737:⚠️ Potential issue | 🟡 MinorInner
exceptblock logs the outer exceptioneinstead of the status-update exception.The bare
except Exception:at line 732 does not bind the caught exception, so{e}in the log message (line 734) still refers to the outer chain-execution error — not the failure to update chain status. This is misleading during debugging.Proposed fix
- except Exception: + except Exception as update_error: logger.error( - f"[execute_chain_job] Failed to update chain status: {e} | " + f"[execute_chain_job] Failed to update chain status: {update_error} | " f"chain_id={chain_uuid}", exc_info=True, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/llm/jobs.py` around lines 732 - 737, The inner except in execute_chain_job is currently bare and logs the outer exception variable e, which is misleading; change the handler to bind the caught exception (e.g., except Exception as update_err:) and use that bound name in the logger.error call (include update_err in the message or f-string and keep exc_info=True) so the log reflects the actual status-update failure for chain_uuid and not the outer execution error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 732-737: The inner except in execute_chain_job is currently bare
and logs the outer exception variable e, which is misleading; change the handler
to bind the caught exception (e.g., except Exception as update_err:) and use
that bound name in the logger.error call (include update_err in the message or
f-string and keep exc_info=True) so the log reflects the actual status-update
failure for chain_uuid and not the outer execution error.
Summary
Target issue is #542
Explain the motivation for making this change. What existing problem does the pull request solve?
This PR adds the foundation for llm_chain, a workflow manager that orchestrates sequential llm_call executions as a single request. It reuses existing
llm-callas its core primitive, so every existingllm-callcapability (providers, configs, guardrails, prompt templates) works automatically in chains with zero code duplication.Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.Summary by CodeRabbit
New Features
Database
Documentation
Tests