mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-14 03:30:19 +00:00
Compare commits
65 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5e49361125 | |||
| c65cf8c2a0 | |||
| 981c57d6f9 | |||
| 87eba86514 | |||
| 09e6f02e45 | |||
| a2ea314cd8 | |||
| e2be720726 | |||
| 8b6905ec7e | |||
| e4830943cf | |||
| 5dfb6fc288 | |||
| 94ea3f6a34 | |||
| b8339abc76 | |||
| c037609b8b | |||
| b8b37cffa3 | |||
| e8e4b2e822 | |||
| c02e5720c2 | |||
| 3fb732da66 | |||
| fdb529d598 | |||
| 4c5c8795ef | |||
| 8b342c600f | |||
| 723c4d6daa | |||
| 5b02563659 | |||
| a5f16c1184 | |||
| 7641cda958 | |||
| 9ef76475c2 | |||
| 1ed93b6a24 | |||
| 004027ffdd | |||
| 0434953053 | |||
| 4b7ef28e46 | |||
| 437b4c8968 | |||
| fdb4f37053 | |||
| 1260756246 | |||
| cb769534d3 | |||
| de1a5c88e4 | |||
| 5b4b50e050 | |||
| 1d619ad507 | |||
| 3ce3b5388f | |||
| 991c2f79e8 | |||
| c329696dc2 | |||
| 4b5e001934 | |||
| aa46864df6 | |||
| af3f0ea171 | |||
| 84a7b5c7c8 | |||
| e01cadb779 | |||
| ce5833cb67 | |||
| 5b534f45d1 | |||
| e692448346 | |||
| 3fe5b62cbe | |||
| 6c6c8698d3 | |||
| cdbef3f72e | |||
| 71030c6e21 | |||
| adf49db7c4 | |||
| 69cefce3d9 | |||
| b295265f25 | |||
| 1a4005c7b9 | |||
| 64d3bdb978 | |||
| 434532ce36 | |||
| 23120f26e4 | |||
| 77dbe4b7b3 | |||
| 1ccc86e589 | |||
| ccb33fa48c | |||
| 082481c35d | |||
| 441e0c5b7c | |||
| 0a6b02ccb5 | |||
| 248a4dcab5 |
@@ -51,7 +51,7 @@ export interface GlobalServerConfig {
|
||||
|
||||
### 3. Assemble Server Config (if new domain)
|
||||
|
||||
In `src/server/globalConfig/index.ts`:
|
||||
In `apps/server/src/globalConfig/index.ts`:
|
||||
|
||||
```typescript
|
||||
import { <domain>Env } from '@/envs/<domain>';
|
||||
@@ -97,7 +97,7 @@ AI_IMAGE_DEFAULT_IMAGE_NUM: z.coerce.number().min(1).max(20).optional(),
|
||||
// packages/types/src/serverConfig.ts
|
||||
image?: PartialDeep<UserImageConfig>;
|
||||
|
||||
// src/server/globalConfig/index.ts
|
||||
// apps/server/src/globalConfig/index.ts
|
||||
image: cleanObject({ defaultImageNum: imageEnv.AI_IMAGE_DEFAULT_IMAGE_NUM }),
|
||||
|
||||
// src/store/user/slices/common/action.ts
|
||||
|
||||
@@ -50,14 +50,14 @@ execAgent({ hooks })
|
||||
|
||||
## Key Files
|
||||
|
||||
| File | Role |
|
||||
| ---------------------------------------------------------- | ------------------------------------------------------ |
|
||||
| `packages/agent-runtime/src/types/hooks.ts` | Type definitions (AgentHookType, all event interfaces) |
|
||||
| `src/server/services/agentRuntime/hooks/types.ts` | Server-side types (AgentHook, re-exports) |
|
||||
| `src/server/services/agentRuntime/hooks/HookDispatcher.ts` | Registration, dispatch, dispatchBeforeToolCall |
|
||||
| `src/server/modules/AgentRuntime/RuntimeExecutors.ts` | Tool/Compact/HumanIntervention hook dispatch |
|
||||
| `src/server/services/agentRuntime/AgentRuntimeService.ts` | Step hooks + HumanIntervention resume/reject |
|
||||
| `src/server/services/aiAgent/index.ts` | CallAgent hook dispatch |
|
||||
| File | Role |
|
||||
| --------------------------------------------------------------- | ------------------------------------------------------ |
|
||||
| `packages/agent-runtime/src/types/hooks.ts` | Type definitions (AgentHookType, all event interfaces) |
|
||||
| `apps/server/src/services/agentRuntime/hooks/types.ts` | Server-side types (AgentHook, re-exports) |
|
||||
| `apps/server/src/services/agentRuntime/hooks/HookDispatcher.ts` | Registration, dispatch, dispatchBeforeToolCall |
|
||||
| `apps/server/src/modules/AgentRuntime/RuntimeExecutors.ts` | Tool/Compact/HumanIntervention hook dispatch |
|
||||
| `apps/server/src/services/agentRuntime/AgentRuntimeService.ts` | Step hooks + HumanIntervention resume/reject |
|
||||
| `apps/server/src/services/aiAgent/index.ts` | CallAgent hook dispatch |
|
||||
|
||||
## Registration Flow
|
||||
|
||||
|
||||
@@ -26,9 +26,9 @@ Agent Signal has one consistent shape:
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/services/agentSignal/index.ts`
|
||||
- `src/server/workflows/agentSignal/index.ts`
|
||||
- `src/server/workflows/agentSignal/run.ts`
|
||||
- `apps/server/src/services/agentSignal/index.ts`
|
||||
- `apps/server/src/workflows/agentSignal/index.ts`
|
||||
- `apps/server/src/workflows/agentSignal/run.ts`
|
||||
|
||||
## Core Model
|
||||
|
||||
@@ -48,11 +48,11 @@ Keep the boundaries strict:
|
||||
## Implementation Workflow
|
||||
|
||||
1. Decide whether the use case is synchronous or quiet background work.
|
||||
2. Define or reuse a source type in `src/server/services/agentSignal/sourceTypes.ts`.
|
||||
3. Define or reuse signal and action types in `src/server/services/agentSignal/policies/types.ts`.
|
||||
2. Define or reuse a source type in `apps/server/src/services/agentSignal/sourceTypes.ts`.
|
||||
3. Define or reuse signal and action types in `apps/server/src/services/agentSignal/policies/types.ts`.
|
||||
4. Implement handlers with `defineSourceHandler`, `defineSignalHandler`, or `defineActionHandler`.
|
||||
5. Bundle handlers with `defineAgentSignalHandlers(...)`.
|
||||
6. Register the policy in `src/server/services/agentSignal/policies/index.ts` and pass it into the runtime factory if needed.
|
||||
6. Register the policy in `apps/server/src/services/agentSignal/policies/index.ts` and pass it into the runtime factory if needed.
|
||||
7. Add or update ingress code that emits or enqueues the source event.
|
||||
8. Add observability and tests before considering the flow complete.
|
||||
|
||||
@@ -63,19 +63,19 @@ Keep the boundaries strict:
|
||||
`packages/agent-signal/src/base/builders.ts`
|
||||
`packages/agent-signal/src/base/types.ts`
|
||||
- Server-owned runtime and middleware:
|
||||
`src/server/services/agentSignal/runtime/AgentSignalRuntime.ts`
|
||||
`src/server/services/agentSignal/runtime/AgentSignalScheduler.ts`
|
||||
`src/server/services/agentSignal/runtime/middleware.ts`
|
||||
`src/server/services/agentSignal/runtime/context.ts`
|
||||
`apps/server/src/services/agentSignal/runtime/AgentSignalRuntime.ts`
|
||||
`apps/server/src/services/agentSignal/runtime/AgentSignalScheduler.ts`
|
||||
`apps/server/src/services/agentSignal/runtime/middleware.ts`
|
||||
`apps/server/src/services/agentSignal/runtime/context.ts`
|
||||
- Existing policy example:
|
||||
`src/server/services/agentSignal/policies/analyzeIntent/index.ts`
|
||||
`src/server/services/agentSignal/policies/analyzeIntent/feedbackSatisfaction.ts`
|
||||
`src/server/services/agentSignal/policies/analyzeIntent/feedbackDomain.ts`
|
||||
`src/server/services/agentSignal/policies/analyzeIntent/feedbackAction.ts`
|
||||
`src/server/services/agentSignal/policies/analyzeIntent/actions/userMemory.ts`
|
||||
`apps/server/src/services/agentSignal/policies/analyzeIntent/index.ts`
|
||||
`apps/server/src/services/agentSignal/policies/analyzeIntent/feedbackSatisfaction.ts`
|
||||
`apps/server/src/services/agentSignal/policies/analyzeIntent/feedbackDomain.ts`
|
||||
`apps/server/src/services/agentSignal/policies/analyzeIntent/feedbackAction.ts`
|
||||
`apps/server/src/services/agentSignal/policies/analyzeIntent/actions/userMemory.ts`
|
||||
- Observability:
|
||||
`src/server/services/agentSignal/observability/projector.ts`
|
||||
`src/server/services/agentSignal/observability/traceEvents.ts`
|
||||
`apps/server/src/services/agentSignal/observability/projector.ts`
|
||||
`apps/server/src/services/agentSignal/observability/traceEvents.ts`
|
||||
`packages/observability-otel/src/modules/agent-signal/index.ts`
|
||||
|
||||
## Implementation Rules
|
||||
@@ -86,7 +86,7 @@ Keep the boundaries strict:
|
||||
- Use stable ids and idempotency keys when the same source can arrive more than once.
|
||||
- Preserve scope discipline. The runtime uses `scopeKey` to serialize related background work.
|
||||
- Prefer the dedicated shared package types and builders from `@lobechat/agent-signal` for normalized nodes and result contracts.
|
||||
- Add focused tests near the touched runtime, policy, or store module. Existing tests under `src/server/services/agentSignal/**/__tests__` are the reference pattern.
|
||||
- Add focused tests near the touched runtime, policy, or store module. Existing tests under `apps/server/src/services/agentSignal/**/__tests__` are the reference pattern.
|
||||
|
||||
## References
|
||||
|
||||
|
||||
@@ -32,9 +32,9 @@ source node
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/services/agentSignal/index.ts`
|
||||
- `src/server/services/agentSignal/sources/index.ts`
|
||||
- `src/server/services/agentSignal/runtime/AgentSignalScheduler.ts`
|
||||
- `apps/server/src/services/agentSignal/index.ts`
|
||||
- `apps/server/src/services/agentSignal/sources/index.ts`
|
||||
- `apps/server/src/services/agentSignal/runtime/AgentSignalScheduler.ts`
|
||||
|
||||
## Package Boundaries
|
||||
|
||||
@@ -56,7 +56,7 @@ Read:
|
||||
- `packages/agent-signal/src/types/events.ts`
|
||||
- `packages/agent-signal/src/types/builtin.ts`
|
||||
|
||||
### `src/server/services/agentSignal`
|
||||
### `apps/server/src/services/agentSignal`
|
||||
|
||||
Treat this as the server-owned implementation layer.
|
||||
|
||||
@@ -89,11 +89,11 @@ Examples:
|
||||
|
||||
Define source payloads in:
|
||||
|
||||
- `src/server/services/agentSignal/sourceTypes.ts`
|
||||
- `apps/server/src/services/agentSignal/sourceTypes.ts`
|
||||
|
||||
Build normalized sources in:
|
||||
|
||||
- `src/server/services/agentSignal/sources/buildSource.ts`
|
||||
- `apps/server/src/services/agentSignal/sources/buildSource.ts`
|
||||
- `packages/agent-signal/src/base/builders.ts`
|
||||
|
||||
### Signal
|
||||
@@ -109,7 +109,7 @@ Examples from `analyzeIntent`:
|
||||
|
||||
Define server-owned signal types in:
|
||||
|
||||
- `src/server/services/agentSignal/policies/types.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/types.ts`
|
||||
|
||||
### Action
|
||||
|
||||
@@ -157,9 +157,9 @@ When a user asks for "the procedure", document the flow above and point to the e
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/services/agentSignal/sources/index.ts`
|
||||
- `src/server/services/agentSignal/runtime/context.ts`
|
||||
- `src/server/services/agentSignal/constants.ts`
|
||||
- `apps/server/src/services/agentSignal/sources/index.ts`
|
||||
- `apps/server/src/services/agentSignal/runtime/context.ts`
|
||||
- `apps/server/src/services/agentSignal/constants.ts`
|
||||
|
||||
Use `enqueueAgentSignalSourceEvent(...)` when the work should stay quiet and out-of-band. That path:
|
||||
|
||||
@@ -172,8 +172,8 @@ This is the preferred path when the UI request should finish immediately and the
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/workflows/agentSignal/index.ts`
|
||||
- `src/server/workflows/agentSignal/run.ts`
|
||||
- `apps/server/src/workflows/agentSignal/index.ts`
|
||||
- `apps/server/src/workflows/agentSignal/run.ts`
|
||||
|
||||
## Existing Example: `analyzeIntent`
|
||||
|
||||
@@ -192,8 +192,8 @@ agent.user.message
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/index.ts`
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/feedbackSatisfaction.ts`
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/feedbackDomain.ts`
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/feedbackAction.ts`
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/actions/userMemory.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/index.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/feedbackSatisfaction.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/feedbackDomain.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/feedbackAction.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/actions/userMemory.ts`
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
## Fluent Registration API
|
||||
|
||||
Use the middleware helpers in `src/server/services/agentSignal/runtime/middleware.ts`.
|
||||
Use the middleware helpers in `apps/server/src/services/agentSignal/runtime/middleware.ts`.
|
||||
|
||||
They provide:
|
||||
|
||||
@@ -32,7 +32,7 @@ The context gives you:
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/services/agentSignal/runtime/context.ts`
|
||||
- `apps/server/src/services/agentSignal/runtime/context.ts`
|
||||
|
||||
## Return Contracts
|
||||
|
||||
@@ -48,7 +48,7 @@ Return one of these shapes:
|
||||
Read:
|
||||
|
||||
- `packages/agent-signal/src/base/types.ts`
|
||||
- `src/server/services/agentSignal/runtime/AgentSignalScheduler.ts`
|
||||
- `apps/server/src/services/agentSignal/runtime/AgentSignalScheduler.ts`
|
||||
|
||||
## Policy Composition Pattern
|
||||
|
||||
@@ -72,8 +72,8 @@ That bundle is later passed into the runtime via:
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/services/agentSignal/policies/index.ts`
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/index.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/index.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/index.ts`
|
||||
|
||||
## Source Handler Pattern
|
||||
|
||||
@@ -81,7 +81,7 @@ Use a source handler when you are interpreting a producer event into semantic si
|
||||
|
||||
Reference:
|
||||
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/feedbackSatisfaction.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/feedbackSatisfaction.ts`
|
||||
|
||||
Pattern:
|
||||
|
||||
@@ -114,8 +114,8 @@ Use a signal handler when one semantic state should branch into more semantic st
|
||||
|
||||
References:
|
||||
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/feedbackDomain.ts`
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/feedbackAction.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/feedbackDomain.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/feedbackAction.ts`
|
||||
|
||||
Pattern:
|
||||
|
||||
@@ -148,7 +148,7 @@ Use an action handler when the runtime should do actual work.
|
||||
|
||||
Reference:
|
||||
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/actions/userMemory.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/actions/userMemory.ts`
|
||||
|
||||
Pattern:
|
||||
|
||||
@@ -186,9 +186,9 @@ Keep these rules:
|
||||
Use this split:
|
||||
|
||||
- external event payloads:
|
||||
`src/server/services/agentSignal/sourceTypes.ts`
|
||||
`apps/server/src/services/agentSignal/sourceTypes.ts`
|
||||
- policy-owned signal and action payloads:
|
||||
`src/server/services/agentSignal/policies/types.ts`
|
||||
`apps/server/src/services/agentSignal/policies/types.ts`
|
||||
- normalized shared node contracts:
|
||||
`packages/agent-signal/src/base/types.ts`
|
||||
|
||||
@@ -216,10 +216,10 @@ Prefer focused tests near the touched code.
|
||||
|
||||
Useful references:
|
||||
|
||||
- `src/server/services/agentSignal/runtime/__tests__/AgentSignalRuntime.test.ts`
|
||||
- `src/server/services/agentSignal/__tests__/index.integration.test.ts`
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/__tests__/*`
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/actions/__tests__/*`
|
||||
- `apps/server/src/services/agentSignal/runtime/__tests__/AgentSignalRuntime.test.ts`
|
||||
- `apps/server/src/services/agentSignal/__tests__/index.integration.test.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/__tests__/*`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/actions/__tests__/*`
|
||||
|
||||
Test at the smallest level that proves the behavior:
|
||||
|
||||
|
||||
@@ -24,9 +24,9 @@ After runtime execution, the service projects one compact observability model fr
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/services/agentSignal/observability/projector.ts`
|
||||
- `src/server/services/agentSignal/observability/traceEvents.ts`
|
||||
- `src/server/services/agentSignal/observability/store.ts`
|
||||
- `apps/server/src/services/agentSignal/observability/projector.ts`
|
||||
- `apps/server/src/services/agentSignal/observability/traceEvents.ts`
|
||||
- `apps/server/src/services/agentSignal/observability/store.ts`
|
||||
|
||||
Projection outputs:
|
||||
|
||||
@@ -58,7 +58,7 @@ Workflow-triggered runs do not naturally pass through the normal foreground runt
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/workflows/agentSignal/run.ts`
|
||||
- `apps/server/src/workflows/agentSignal/run.ts`
|
||||
|
||||
Use that path when:
|
||||
|
||||
@@ -77,8 +77,8 @@ Check:
|
||||
|
||||
Read:
|
||||
|
||||
- `src/server/services/agentSignal/index.ts`
|
||||
- `src/server/services/agentSignal/sources/index.ts`
|
||||
- `apps/server/src/services/agentSignal/index.ts`
|
||||
- `apps/server/src/services/agentSignal/sources/index.ts`
|
||||
|
||||
### The signal exists but no action runs
|
||||
|
||||
@@ -98,8 +98,8 @@ Check:
|
||||
|
||||
Reference:
|
||||
|
||||
- `src/server/services/agentSignal/policies/actionIdempotency.ts`
|
||||
- `src/server/services/agentSignal/policies/analyzeIntent/actions/userMemory.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/actionIdempotency.ts`
|
||||
- `apps/server/src/services/agentSignal/policies/analyzeIntent/actions/userMemory.ts`
|
||||
|
||||
### Background runs are hard to discover
|
||||
|
||||
|
||||
@@ -216,6 +216,6 @@ When using `--messages`, the output shows three sections (if context engine data
|
||||
|
||||
## Integration Points
|
||||
|
||||
- **Recording**: `src/server/services/agentRuntime/AgentRuntimeService.ts` — in the `executeStep()` method, after building `stepPresentationData`, writes partial snapshot in dev mode
|
||||
- **Context engine capture**: `src/server/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, calls `ctx.tracingContextEngine(input, output)`. `AgentRuntimeService.executeStep` buffers it per step and passes it to `traceRecorder.appendStep` as the typed `contextEngine` field (kept off the `events` array to stay out of Redis state).
|
||||
- **Recording**: `apps/server/src/services/agentRuntime/AgentRuntimeService.ts` — in the `executeStep()` method, after building `stepPresentationData`, writes partial snapshot in dev mode
|
||||
- **Context engine capture**: `apps/server/src/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, calls `ctx.tracingContextEngine(input, output)`. `AgentRuntimeService.executeStep` buffers it per step and passes it to `traceRecorder.appendStep` as the typed `contextEngine` field (kept off the `events` array to stay out of Redis state).
|
||||
- **Store**: `FileSnapshotStore` reads/writes to `.agent-tracing/` relative to `process.cwd()`
|
||||
|
||||
@@ -271,7 +271,7 @@ Lists in the same file you may need to touch:
|
||||
|
||||
- `defaultToolIds` — added to the agent's tool list by default
|
||||
- `alwaysOnToolIds` — forced on regardless of user selection (use sparingly)
|
||||
- `runtimeManagedToolIds` — enable state controlled by runtime, not user UI; **must mirror the rules map** in `src/server/modules/Mecha/AgentToolsEngine/index.ts` and `src/helpers/toolEngineering/index.ts`
|
||||
- `runtimeManagedToolIds` — enable state controlled by runtime, not user UI; **must mirror the rules map** in `apps/server/src/modules/Mecha/AgentToolsEngine/index.ts` and `src/helpers/toolEngineering/index.ts`
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ lsof -ti:3011 | xargs kill
|
||||
pnpm run dev:next
|
||||
```
|
||||
|
||||
**Important:** Server-side code changes in the submodule (`lobehub/src/server/`, `lobehub/packages/`) require a server restart. Next.js hot-reload may not pick up changes in submodule packages.
|
||||
**Important:** Server-side code changes in the submodule (`lobehub/apps/server/src/`, `lobehub/src/server/`, `lobehub/packages/`) require a server restart. Next.js hot-reload may not pick up changes in submodule packages.
|
||||
|
||||
### Step 2: Check CLI Authentication
|
||||
|
||||
@@ -150,14 +150,15 @@ $CLI provider test <provider-id>
|
||||
|
||||
### When Server Restart is Needed
|
||||
|
||||
| Change Location | Restart? |
|
||||
| ----------------------------------------- | -------- |
|
||||
| `lobehub/src/server/` (routers, services) | Yes |
|
||||
| `lobehub/packages/database/` (models) | Yes |
|
||||
| `lobehub/packages/types/` | Yes |
|
||||
| `lobehub/packages/prompts/` | Yes |
|
||||
| `lobehub/apps/cli/` (CLI code) | No |
|
||||
| `src/` (cloud overrides) | Yes |
|
||||
| Change Location | Restart? |
|
||||
| ------------------------------------------------------- | -------- |
|
||||
| `lobehub/apps/server/src/` (routers, services, modules) | Yes |
|
||||
| `lobehub/src/server/` (agent-hono, workflows-hono) | Yes |
|
||||
| `lobehub/packages/database/` (models) | Yes |
|
||||
| `lobehub/packages/types/` | Yes |
|
||||
| `lobehub/packages/prompts/` | Yes |
|
||||
| `lobehub/apps/cli/` (CLI code) | No |
|
||||
| `src/` (cloud overrides) | Yes |
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
|
||||
@@ -111,7 +111,7 @@ Generate video from text prompt. This is an async operation.
|
||||
**Source**: `apps/cli/src/commands/generate/video.ts`
|
||||
|
||||
```bash
|
||||
lh gen video "A cat playing piano" -m <model> -p <provider> [options]
|
||||
lh gen video "A cat playing piano" -m < model > -p < provider > [options]
|
||||
```
|
||||
|
||||
| Option | Description | Required |
|
||||
@@ -259,13 +259,13 @@ Image and video generation use an async task pattern:
|
||||
UUID from the `async_tasks` table, not `gen_xxx`
|
||||
- Returns `{ status, error, generation }` (generation includes asset URLs on success)
|
||||
- Before querying, calls `checkTimeoutTasks` which marks tasks as `error` if they have been
|
||||
`pending` or `processing` for more than ~5 minutes (`ASYNC_TASK_TIMEOUT = 298s`)
|
||||
`pending` or `processing` for more than \~5 minutes (`ASYNC_TASK_TIMEOUT = 298s`)
|
||||
|
||||
**Server routes**:
|
||||
|
||||
- `src/server/routers/lambda/image/index.ts` — image creation (uses `authedProcedure` + `serverDatabase`)
|
||||
- `src/server/routers/lambda/video/index.ts` — video creation (uses `authedProcedure` + `serverDatabase`)
|
||||
- `src/server/routers/lambda/generation.ts` — status checking
|
||||
- `apps/server/src/routers/lambda/image/index.ts` — image creation (uses `authedProcedure` + `serverDatabase`)
|
||||
- `apps/server/src/routers/lambda/video/index.ts` — video creation (uses `authedProcedure` + `serverDatabase`)
|
||||
- `apps/server/src/routers/lambda/generation.ts` — status checking
|
||||
- `packages/database/src/models/asyncTask.ts` — `AsyncTaskModel` including `checkTimeoutTasks`
|
||||
|
||||
**Note**: Image/video routes do NOT use the `keyVaults` middleware — they read API keys from the database via `initModelRuntimeFromDB` or `createAsyncCaller`.
|
||||
|
||||
@@ -6,6 +6,66 @@ user-invocable: false
|
||||
|
||||
# Database Migrations Guide
|
||||
|
||||
## Development-stage schema changes
|
||||
|
||||
Schema changes churn during feature development. When the schema changes before the migration has shipped, do not hand-edit the existing migration SQL to chase the new schema shape. Delete the draft migration artifacts added by this branch (SQL file, matching snapshot, and matching journal entry), then run the generator again and re-apply the normal migration review steps below.
|
||||
|
||||
For example, if this branch's draft migration is `0110_add_verify_tables_and_ai_infra_id`:
|
||||
|
||||
```bash
|
||||
# 1. Delete the draft SQL and its snapshot
|
||||
rm packages/database/migrations/0110_add_verify_tables_and_ai_infra_id.sql
|
||||
rm packages/database/migrations/meta/0110_snapshot.json
|
||||
|
||||
# 2. Remove the matching 0110 entry from the journal's "entries" array
|
||||
# packages/database/migrations/meta/_journal.json
|
||||
|
||||
# 3. Regenerate from the current schema
|
||||
bun run db:generate
|
||||
```
|
||||
|
||||
This keeps the generated SQL, snapshot, and journal aligned with the actual schema. Manual SQL edits are reserved for review-time hardening such as idempotent clauses, custom extension SQL, and meaningful filename/tag updates.
|
||||
|
||||
Before release, if a feature branch accumulated multiple development-only migrations, consolidate them into one migration when possible. Production does not need to replay every intermediate draft shape, and fewer migrations reduce deploy-time risk.
|
||||
|
||||
For example, if this branch added `0110`, `0111`, and `0112`, delete all three drafts and regenerate a single migration:
|
||||
|
||||
```bash
|
||||
# 1. Delete every draft SQL and snapshot this branch added
|
||||
rm packages/database/migrations/011{0,1,2}_*.sql
|
||||
rm packages/database/migrations/meta/011{0,1,2}_snapshot.json
|
||||
|
||||
# 2. Remove the 0110/0111/0112 entries from the journal's "entries" array
|
||||
# packages/database/migrations/meta/_journal.json
|
||||
|
||||
# 3. Regenerate one migration covering the full schema delta
|
||||
bun run db:generate
|
||||
```
|
||||
|
||||
Do not make a migration compatible with earlier development-only versions of the same branch. While the migration has not shipped, there is no production history to preserve. Fix local/dev databases directly with whatever SQL is simplest (drop the draft table, rename a column, delete draft rows), then regenerate the branch migration from the current schema.
|
||||
|
||||
For example, if an earlier draft on this branch created `signup_attempt_id` and you have since renamed it to `user_signup_log_id`, do not add a compatibility `ALTER ... RENAME` to the migration. Just fix the dev DB directly (see the `access-pg` skill for the `bun -e` + `pg` pattern), then regenerate:
|
||||
|
||||
```bash
|
||||
# Fix the dev DB to match the new schema (simplest SQL wins)
|
||||
set -a && source .env && set +a && bun -e '
|
||||
import pg from "pg";
|
||||
const client = new pg.Client({ connectionString: process.env.DATABASE_URL });
|
||||
await client.connect();
|
||||
await client.query("ALTER TABLE user_signup_logs DROP COLUMN signup_attempt_id");
|
||||
await client.end();
|
||||
'
|
||||
|
||||
# Regenerate so the migration reflects only the final shape
|
||||
bun run db:generate
|
||||
```
|
||||
|
||||
After a migration has reached production or the target default branch, treat it as immutable: add a follow-up migration instead of rewriting it.
|
||||
|
||||
## Rebase conflicts
|
||||
|
||||
When a rebase conflicts in migration files, keep the upstream/default-branch migrations and remove all migrations introduced by the current feature branch. Complete the rebase, then regenerate this branch's migration from the rebased schema. This avoids merging two independent snapshots or hand-splicing journal entries.
|
||||
|
||||
## Step 1: Generate Migrations
|
||||
|
||||
```bash
|
||||
|
||||
@@ -57,7 +57,7 @@ process.env.DEBUG = 'lobe-*';
|
||||
## Example
|
||||
|
||||
```typescript
|
||||
// src/server/routers/edge/market/index.ts
|
||||
// apps/server/src/routers/edge/market/index.ts
|
||||
import debug from 'debug';
|
||||
|
||||
const log = debug('lobe-edge-router:market');
|
||||
|
||||
+152
-60
@@ -6,6 +6,14 @@ user-invocable: false
|
||||
|
||||
# Drizzle ORM Schema Style Guide
|
||||
|
||||
> **Adding a Model or Repository?** Ship a sibling test in the same PR — every new
|
||||
> file under `packages/database/src/models/**` or `src/repositories/**` needs a
|
||||
> matching `__tests__/<name>.test.ts`. See the **testing** skill
|
||||
> (`.agents/skills/testing/references/db-model-test.md`) for the `getTestDB()`
|
||||
> integration pattern, user-isolation tests, the BM25 `describe.skipIf(!isServerDB)`
|
||||
> guard, and schema gotchas. CI's coverage patch gate won't reliably catch a brand-new
|
||||
> untested file, so this is on you.
|
||||
|
||||
## Configuration
|
||||
|
||||
- Config: `drizzle.config.ts`
|
||||
@@ -25,16 +33,42 @@ Location: `packages/database/src/schemas/_helpers.ts`
|
||||
|
||||
- **Tables**: Plural snake_case (`users`, `session_groups`)
|
||||
- **Columns**: snake_case (`user_id`, `created_at`)
|
||||
- **New tables**: Check nearby existing tables before naming a new one. Preserve
|
||||
the established noun family and suffix. For example, if the user-scoped table
|
||||
is `user_xxx_logs`, the workspace-scoped counterpart should be
|
||||
`workspace_xxx_logs`, not `workspace_xxx_records` or another new synonym.
|
||||
|
||||
```typescript
|
||||
// ✅ Good: follows the existing user/workspace table family.
|
||||
export const userSignupLogs = pgTable('user_signup_logs', { ... });
|
||||
export const workspaceSignupLogs = pgTable('workspace_signup_logs', { ... });
|
||||
|
||||
// ❌ Bad: introduces a new suffix for the same concept.
|
||||
export const workspaceSignupRecords = pgTable('workspace_signup_records', { ... });
|
||||
```
|
||||
|
||||
## Column Definitions
|
||||
|
||||
### Primary Keys
|
||||
|
||||
Do not use auto-incrementing primary keys (`serial`, `bigserial`, generated
|
||||
identity columns). They create sequence-state problems during cross-database
|
||||
migrations, restores, and data copy jobs. Prefer text IDs from application
|
||||
generators (`idGenerator`, `createNanoId`) or `uuid` for internal tables.
|
||||
|
||||
Keep `$defaultFn(...)` when a table normally owns ID generation. Callers can
|
||||
still pass an explicit `id`; the default only runs when the insert omits it. Do
|
||||
not remove the default just because one flow needs to supply a request-scoped ID.
|
||||
|
||||
```typescript
|
||||
// ✅ Good: app-generated text ID; explicit inserts can still override it.
|
||||
id: text('id')
|
||||
.primaryKey()
|
||||
.$defaultFn(() => idGenerator('agents'))
|
||||
.notNull(),
|
||||
|
||||
// ❌ Bad: sequence state is fragile across DB migrations and restores.
|
||||
id: serial('id').primaryKey(),
|
||||
```
|
||||
|
||||
ID prefixes make entity types distinguishable. For internal tables, use `uuid`.
|
||||
@@ -53,6 +87,80 @@ userId: text('user_id')
|
||||
...timestamps, // Spread from _helpers.ts
|
||||
```
|
||||
|
||||
### Optional and Undefined Values
|
||||
|
||||
Do not introduce artificial sentinel strings for missing values, such as
|
||||
`unknown`, unless the domain already has that explicit state and existing code
|
||||
uses it consistently. Prefer nullable columns, optional TypeScript fields, or a
|
||||
separate concrete status enum when the value is genuinely absent.
|
||||
|
||||
```typescript
|
||||
// ✅ Good: absent until the final stage writes a real decision.
|
||||
export type UserSignupLogFinalDecision = 'allow' | 'block' | 'error';
|
||||
|
||||
finalDecision: varchar('final_decision', { length: 32 }).$type<UserSignupLogFinalDecision>(),
|
||||
|
||||
// ❌ Bad: invents a new state that callers now need to handle everywhere.
|
||||
export type UserSignupLogFinalDecision = 'allow' | 'block' | 'error' | 'unknown';
|
||||
|
||||
finalDecision: varchar('final_decision', { length: 32 })
|
||||
.$type<UserSignupLogFinalDecision>()
|
||||
.notNull()
|
||||
.default('unknown');
|
||||
```
|
||||
|
||||
### Field Descriptions
|
||||
|
||||
For columns whose meaning is not obvious from the name alone, add JSDoc on the
|
||||
schema field. Include a concrete example when it clarifies the stored value or
|
||||
the lifecycle moment that writes it. This is especially important for external
|
||||
IDs, lifecycle statuses, denormalized snapshots, JSONB signals, and fields whose
|
||||
name could mean either a request ID or a persisted row ID.
|
||||
|
||||
```typescript
|
||||
// ✅ Good: explain the table's business object first, then only document
|
||||
// non-obvious lifecycle or risk-control fields.
|
||||
/**
|
||||
* User signup logs - one row per signup flow, collecting stage-level
|
||||
* risk-control decisions before and after the auth provider creates a user.
|
||||
*/
|
||||
export const userSignupLogs = pgTable('user_signup_logs', {
|
||||
/** Final signup outcome reason, for example user_created, llm_block, or guard_error */
|
||||
finalReason: text('final_reason'),
|
||||
|
||||
/** Aggregated risk level derived from stage decisions, for example block -> high */
|
||||
riskLevel: varchar('risk_level', { length: 16 }).$type<UserSignupLogRiskLevel>(),
|
||||
|
||||
/** Ordered stage-level decisions and metadata grouped by signup review stage */
|
||||
stageResults: jsonb('stage_results').$type<UserSignupLogStageResults>(),
|
||||
});
|
||||
|
||||
// ❌ Bad: comments restate obvious column names without adding domain meaning.
|
||||
/** User email */
|
||||
email: text('email'),
|
||||
```
|
||||
|
||||
### JSONB Types
|
||||
|
||||
Avoid `Record<string, unknown>` or similarly loose JSONB types for schema
|
||||
columns. Define a concrete interface that describes the expected JSON shape, even
|
||||
when most properties are optional. This keeps callers, migrations, and review
|
||||
queries aligned on the same data contract.
|
||||
|
||||
```typescript
|
||||
interface UserSignupLogMetadata {
|
||||
payloadPath?: string;
|
||||
requestPath?: string;
|
||||
}
|
||||
|
||||
metadata: jsonb('metadata').$type<UserSignupLogMetadata>(),
|
||||
```
|
||||
|
||||
```typescript
|
||||
// ❌ Bad: hides the contract and makes downstream access untyped.
|
||||
metadata: jsonb('metadata').$type<Record<string, unknown>>(),
|
||||
```
|
||||
|
||||
### Indexes
|
||||
|
||||
```typescript
|
||||
@@ -176,66 +284,52 @@ const rows = await this.db
|
||||
|
||||
### Raw SQL and Advanced Queries
|
||||
|
||||
Prefer Drizzle builders whenever the query can be expressed clearly with `select`,
|
||||
`insert().select()`, `update().from()`, joins, CTEs, `groupBy`, and typed selected
|
||||
columns. This keeps table and column references tied to schema definitions, so
|
||||
schema changes are more likely to surface as TypeScript errors.
|
||||
Prefer Drizzle builders whenever the query reads clearly with `select`,
|
||||
`insert().select()`, `update().from()`, joins, CTEs, and `groupBy` — this keeps
|
||||
table/column references tied to schema, so changes surface as TypeScript errors.
|
||||
Within a builder, expression-level `sql<T>` is fine for features lacking a helper
|
||||
(JSON path, casts, aggregates, `CASE`, `NOW()`). Row locks are clauses, not
|
||||
expressions — use `.for('update')`, never raw `FOR UPDATE`.
|
||||
|
||||
Expression-level `sql<T>` is fine inside a Drizzle builder for PostgreSQL features
|
||||
that do not have a dedicated helper, such as JSON path extraction, casts, aggregate
|
||||
expressions, `CASE`, `NOW()`, or advisory locks. Row locks are query clauses, not
|
||||
expressions; use the select builder's `.for('update')` instead of raw
|
||||
`FOR UPDATE` SQL fragments.
|
||||
Use `COALESCE` only when null-handling is part of required DB semantics (nullable
|
||||
JSONB append/merge, "keep first non-null"). Don't scatter
|
||||
`COALESCE(excluded.col, current.col)` across ordinary upsert scalars just to avoid
|
||||
an update object — build `set` from defined values only, and hide any remaining
|
||||
SQL behind named helpers (`appendJsonbArray`, `mergeJsonbObject`, `keepFirstValue`)
|
||||
so the method reads as business intent, not SQL plumbing.
|
||||
|
||||
```typescript
|
||||
// ✅ Scalars included only when present; SQL hidden behind a named helper.
|
||||
const updateValues = compactUndefined({
|
||||
email: record.email ?? undefined,
|
||||
ip: record.ip ?? undefined,
|
||||
});
|
||||
await db.insert(userSignupLogs).values(values).onConflictDoUpdate({
|
||||
set: { ...updateValues, stageResults: appendStageResult(stage, result), updatedAt: now },
|
||||
target: userSignupLogs.id,
|
||||
});
|
||||
|
||||
// ❌ Every scalar becomes SQL plumbing.
|
||||
set: {
|
||||
email: sql`COALESCE(excluded.email, ${userSignupLogs.email})`,
|
||||
ip: sql`COALESCE(excluded.ip, ${userSignupLogs.ip})`,
|
||||
}
|
||||
```
|
||||
|
||||
When refactoring raw SQL:
|
||||
|
||||
- Preserve the original query shape for latency-sensitive paths. If raw SQL is one
|
||||
database roundtrip, do not replace it with multiple depth-based queries just to
|
||||
remove `execute`.
|
||||
- Use `$with(...)` plus `insert().select()` / `update().from()` for multi-step
|
||||
single-roundtrip writes when Drizzle can express the data flow.
|
||||
- Avoid generic `execute<MyRow>(sql...)` as the main safety mechanism. It types the
|
||||
returned rows, but it does not keep selected columns in sync with schema changes.
|
||||
- If the only clean implementation is a PostgreSQL feature that Drizzle cannot
|
||||
express well, keep the raw SQL and tighten it instead: use schema references in
|
||||
interpolations, explicit user scope, a narrow row interface, and regression tests.
|
||||
- Preserve query shape on latency-sensitive paths. If raw SQL is one roundtrip,
|
||||
don't split it into multiple depth-based queries just to drop `execute`.
|
||||
- Use `$with(...)` + `insert().select()` / `update().from()` for multi-step
|
||||
single-roundtrip writes Drizzle can express.
|
||||
- Don't rely on `execute<MyRow>(sql...)` for safety — it types rows but doesn't keep
|
||||
selected columns in sync with schema changes.
|
||||
- If only a PostgreSQL feature Drizzle can't express works, keep the raw SQL and
|
||||
tighten it: schema refs in interpolations, explicit user scope, a narrow row
|
||||
interface, and regression tests.
|
||||
|
||||
Recursive CTEs are a special case: current Drizzle usage in this repo does not have
|
||||
a clean `WITH RECURSIVE` builder pattern. Keep recursive CTE raw SQL when replacing
|
||||
it would add extra database roundtrips or materially worsen performance.
|
||||
|
||||
Example: convert an aggregate query when Drizzle can preserve one roundtrip:
|
||||
|
||||
```typescript
|
||||
// ✅ Good: builder owns table and column references; sql<T> stays expression-level.
|
||||
const rows = await trx
|
||||
.select({
|
||||
model: messages.model,
|
||||
provider: messages.provider,
|
||||
totalCost: sql<string | null>`sum((${messages.metadata}->'usage'->>'cost')::numeric)`.as(
|
||||
'totalCost',
|
||||
),
|
||||
})
|
||||
.from(messages)
|
||||
.where(
|
||||
and(
|
||||
eq(messages.topicId, topicId),
|
||||
eq(messages.userId, userId),
|
||||
eq(messages.role, 'assistant'),
|
||||
sql`${messages.metadata} ? 'usage'`,
|
||||
),
|
||||
)
|
||||
.groupBy(messages.provider, messages.model);
|
||||
```
|
||||
|
||||
Example: use the select lock builder for row locks:
|
||||
|
||||
```typescript
|
||||
const [user] = await trx.select().from(users).where(eq(users.id, userId)).for('update');
|
||||
```
|
||||
|
||||
Example: keep a recursive CTE raw when replacing it would add depth-based DB
|
||||
roundtrips:
|
||||
Recursive CTEs are the canonical "keep raw" case — there's no clean `WITH RECURSIVE`
|
||||
builder, and a rewrite would add depth-based roundtrips:
|
||||
|
||||
```typescript
|
||||
interface TaskTreeRow {
|
||||
@@ -243,15 +337,13 @@ interface TaskTreeRow {
|
||||
parent_task_id: string | null;
|
||||
}
|
||||
|
||||
// execute<T> is acceptable here only because Drizzle has no clean WITH RECURSIVE
|
||||
// builder; a builder rewrite would add depth-based roundtrips. Keep schema refs in
|
||||
// the interpolations and scope every leg to the user.
|
||||
// execute<T> acceptable: no clean WITH RECURSIVE builder. Keep schema refs in the
|
||||
// interpolations and scope every leg to the user.
|
||||
const { rows } = await db.execute<TaskTreeRow>(sql`
|
||||
WITH RECURSIVE task_tree AS (
|
||||
SELECT ${tasks.id}, ${tasks.parentTaskId}
|
||||
FROM ${tasks}
|
||||
WHERE ${tasks.id} = ${rootTaskId}
|
||||
AND ${tasks.createdByUserId} = ${userId}
|
||||
WHERE ${tasks.id} = ${rootTaskId} AND ${tasks.createdByUserId} = ${userId}
|
||||
UNION ALL
|
||||
SELECT ${tasks.id}, ${tasks.parentTaskId}
|
||||
FROM ${tasks}
|
||||
|
||||
@@ -56,7 +56,8 @@ git submodules.
|
||||
├── apps/
|
||||
│ ├── cli/ # LobeHub CLI
|
||||
│ ├── desktop/ # Electron desktop app
|
||||
│ └── device-gateway/ # Device gateway service
|
||||
│ ├── device-gateway/ # Device gateway service
|
||||
│ └── server/ # Next.js-backed server: featureFlags, globalConfig, modules, routers, services, utils, workflows (`@/server/*` alias)
|
||||
├── docs/ # changelog, development, self-hosting, usage
|
||||
├── locales/ # en-US, zh-CN, ...
|
||||
├── packages/ # ~80 @lobechat/* workspace packages — `ls` for the full set. Key ones:
|
||||
@@ -85,32 +86,32 @@ git submodules.
|
||||
├── business/ # Open-source stubs (client/server) — cloud repo provides real impls
|
||||
├── features/ # Domain business components
|
||||
├── store/ # ~30 zustand stores — `ls` for the full set
|
||||
├── server/ # featureFlags, globalConfig, modules, routers, services, workflows, agent-hono
|
||||
├── server/ # standalone-Hono server pieces only: agent-hono, workflows-hono (main backend lives in `apps/server`)
|
||||
└── ... # components, hooks, layout, libs, locales, services, types, utils
|
||||
```
|
||||
|
||||
## Architecture Map
|
||||
|
||||
| Layer | Location |
|
||||
| ---------------- | --------------------------------------------------- |
|
||||
| UI Components | `src/components`, `src/features` |
|
||||
| SPA Pages | `src/routes/` |
|
||||
| React Router | `src/spa/router/` |
|
||||
| Global Providers | `src/layout` |
|
||||
| Zustand Stores | `src/store` |
|
||||
| Client Services | `src/services/` |
|
||||
| REST API | `src/app/(backend)/webapi` |
|
||||
| tRPC Routers | `src/server/routers/{async\|lambda\|mobile\|tools}` |
|
||||
| Server Services | `src/server/services` (can access DB) |
|
||||
| Server Modules | `src/server/modules` (no DB access) |
|
||||
| Feature Flags | `src/server/featureFlags` |
|
||||
| Global Config | `src/server/globalConfig` |
|
||||
| DB Schema | `packages/database/src/schemas` |
|
||||
| DB Model | `packages/database/src/models` |
|
||||
| DB Repository | `packages/database/src/repositories` |
|
||||
| Third-party | `src/libs` (analytics, oidc, etc.) |
|
||||
| Builtin Tools | `packages/builtin-tool-*`, `packages/builtin-tools` |
|
||||
| Open-source stub | `src/business/*`, `packages/business/*` (this repo) |
|
||||
| Layer | Location |
|
||||
| ---------------- | -------------------------------------------------------- |
|
||||
| UI Components | `src/components`, `src/features` |
|
||||
| SPA Pages | `src/routes/` |
|
||||
| React Router | `src/spa/router/` |
|
||||
| Global Providers | `src/layout` |
|
||||
| Zustand Stores | `src/store` |
|
||||
| Client Services | `src/services/` |
|
||||
| REST API | `src/app/(backend)/webapi` |
|
||||
| tRPC Routers | `apps/server/src/routers/{async\|lambda\|mobile\|tools}` |
|
||||
| Server Services | `apps/server/src/services` (can access DB) |
|
||||
| Server Modules | `apps/server/src/modules` (no DB access) |
|
||||
| Feature Flags | `apps/server/src/featureFlags` |
|
||||
| Global Config | `apps/server/src/globalConfig` |
|
||||
| DB Schema | `packages/database/src/schemas` |
|
||||
| DB Model | `packages/database/src/models` |
|
||||
| DB Repository | `packages/database/src/repositories` |
|
||||
| Third-party | `src/libs` (analytics, oidc, etc.) |
|
||||
| Builtin Tools | `packages/builtin-tool-*`, `packages/builtin-tools` |
|
||||
| Open-source stub | `src/business/*`, `packages/business/*` (this repo) |
|
||||
|
||||
## Data Flow
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ user-invocable: false
|
||||
|
||||
- Bug fixes must include tests covering the fixed scenario
|
||||
- New logic (services, store actions, utilities) should have test coverage
|
||||
- **New database Model/Repository** (`packages/database/src/models/**`, `src/repositories/**`) must ship a sibling `__tests__/<name>.test.ts` — incl. user-isolation tests; BM25 search guarded by `describe.skipIf(!isServerDB)` (see `/testing` → `db-model-test.md`)
|
||||
- Existing tests still cover the changed behavior?
|
||||
- Prefer `vi.spyOn` over `vi.mock` (see `/testing` skill)
|
||||
|
||||
|
||||
@@ -14,15 +14,21 @@ user-invocable: false
|
||||
# Run specific test file
|
||||
bunx vitest run --silent='passed-only' '[file-path]'
|
||||
|
||||
# Database package (client)
|
||||
# Database package (client-db, PGlite — default, skips BM25/pg_search)
|
||||
cd packages/database && bunx vitest run --silent='passed-only' '[file]'
|
||||
|
||||
# Database package (server)
|
||||
# Database package (server-db, Postgres — BM25/pgvector parity, what CI measures coverage in)
|
||||
cd packages/database && TEST_SERVER_DB=1 bunx vitest run --silent='passed-only' '[file]'
|
||||
```
|
||||
|
||||
**Never run** `bun run test` - it runs all 3000+ tests (\~10 minutes).
|
||||
|
||||
> **Database models/repositories:** every new file under `packages/database/src/models/**`
|
||||
> or `src/repositories/**` ships with a sibling `__tests__/<name>.test.ts` in the same PR.
|
||||
> Use the real DB via `getTestDB()` (integration style), guard BM25/full-text-search blocks
|
||||
> with `describe.skipIf(!isServerDB)`, and always test user-isolation. See
|
||||
> `references/db-model-test.md` for setup, schema gotchas, and the client-vs-server-db split.
|
||||
|
||||
## Test Categories
|
||||
|
||||
| Category | Location | Config |
|
||||
|
||||
@@ -1,95 +1,74 @@
|
||||
# Database Model Testing Guide
|
||||
|
||||
Test `packages/database` Model layer.
|
||||
Test the `packages/database` Model and Repository layers.
|
||||
|
||||
## Dual Environment Verification (Required)
|
||||
> **Rule: every new Model or Repository ships with a sibling test in the same PR.**
|
||||
> A new file under `src/models/**` or `src/repositories/**` must have a matching
|
||||
> `__tests__/<name>.test.ts`. Coverage runs in server-db mode in CI and the patch
|
||||
> gate will not always catch a brand-new untested file (a small new file barely
|
||||
> moves the project total) — so this is a convention, not something CI guarantees.
|
||||
> Start from the template: `packages/database/src/models/__tests__/_test_template.ts`.
|
||||
|
||||
## Two test environments: client-db vs server-db
|
||||
|
||||
`getTestDB()` (`src/core/getTestDB.ts`) returns different engines based on the
|
||||
`TEST_SERVER_DB` env var:
|
||||
|
||||
| Mode | Engine | When | Notes |
|
||||
| ----------------------- | ----------------------------------- | ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| **client-db** (default) | PGlite (in-memory) | `bunx vitest run` | Migration runner **skips any SQL containing `pg_search` / `bm25`** — the ParadeDB BM25 `@@@` operator does not exist here. |
|
||||
| **server-db** | node-postgres → `DATABASE_TEST_URL` | `TEST_SERVER_DB=1` | CI uses the `paradedb/paradedb` image (has `pg_search`). **Coverage is measured in this mode** (`test:coverage` → `vitest.config.server.mts`, uploaded to Codecov). |
|
||||
|
||||
```bash
|
||||
# 1. Client environment (fast)
|
||||
cd packages/database && TEST_SERVER_DB=0 bunx vitest run --silent='passed-only' '[file]'
|
||||
# 1. Client environment (fast, default — what most local runs use)
|
||||
cd packages/database && bunx vitest run --silent='passed-only' '[file]'
|
||||
|
||||
# 2. Server environment (compatibility)
|
||||
# 2. Server environment (BM25 / pg_search / pgvector parity, needs DATABASE_TEST_URL)
|
||||
cd packages/database && TEST_SERVER_DB=1 bunx vitest run --silent='passed-only' '[file]'
|
||||
```
|
||||
|
||||
## User Permission Check - Security First 🔒
|
||||
Implication: client-db coverage **under-counts** any code that needs BM25 (e.g.
|
||||
`repositories/search/index.ts` reads near-0% locally but is fully covered in CI).
|
||||
Don't chase those lines locally — confirm via CI/Codecov.
|
||||
|
||||
**Critical security requirement**: All user data operations must include permission checks.
|
||||
## BM25 / full-text search → `describe.skipIf(!isServerDB)`
|
||||
|
||||
Any method using the BM25 `@@@` operator or `sanitizeBm25` (keyword search:
|
||||
`queryByKeyword`, `searchAgents`, userMemory lexical search, …) **throws under
|
||||
PGlite** (often swallowed by a `catch` that returns `[]`, so the test silently
|
||||
fails with empty results). Guard those blocks so they only run in server-db:
|
||||
|
||||
```typescript
|
||||
// ❌ DANGEROUS: Missing permission check
|
||||
update = async (id: string, data: Partial<MyModel>) => {
|
||||
return this.db
|
||||
.update(myTable)
|
||||
.set(data)
|
||||
.where(eq(myTable.id, id)) // Only checks ID
|
||||
.returning();
|
||||
};
|
||||
|
||||
// ✅ SECURE: Permission check included
|
||||
update = async (id: string, data: Partial<MyModel>) => {
|
||||
return this.db
|
||||
.update(myTable)
|
||||
.set(data)
|
||||
.where(
|
||||
and(
|
||||
eq(myTable.id, id),
|
||||
eq(myTable.userId, this.userId), // ✅ Permission check
|
||||
),
|
||||
)
|
||||
.returning();
|
||||
};
|
||||
```
|
||||
|
||||
## Test File Structure
|
||||
|
||||
```typescript
|
||||
// @vitest-environment node
|
||||
describe('MyModel', () => {
|
||||
describe('create', () => {
|
||||
/* ... */
|
||||
});
|
||||
describe('queryAll', () => {
|
||||
/* ... */
|
||||
});
|
||||
describe('update', () => {
|
||||
it('should update own records');
|
||||
it('should NOT update other users records'); // 🔒 Security
|
||||
});
|
||||
describe('delete', () => {
|
||||
it('should delete own records');
|
||||
it('should NOT delete other users records'); // 🔒 Security
|
||||
});
|
||||
describe('user isolation', () => {
|
||||
it('should enforce user data isolation'); // 🔒 Core security
|
||||
});
|
||||
// BM25 search requires the pg_search extension (ParadeDB), not available in PGlite
|
||||
const isServerDB = process.env.TEST_SERVER_DB === '1';
|
||||
describe.skipIf(!isServerDB)('queryByKeyword', () => {
|
||||
/* ... */
|
||||
});
|
||||
```
|
||||
|
||||
## Security Test Example
|
||||
Convention already used in `session.test.ts`, `topic.query.test.ts`,
|
||||
`message.query.test.ts`, `home/index.test.ts`, `repositories/search/index.test.ts`.
|
||||
|
||||
## Setup boilerplate
|
||||
|
||||
Top-of-file pattern (see `_test_template.ts` for the full version). Use real DB
|
||||
integration via `getTestDB()` — **not a mocked `vi.fn()` db**; the integration
|
||||
style exercises real SQL and gives far deeper coverage.
|
||||
|
||||
```typescript
|
||||
it('should not update records of other users', async () => {
|
||||
const [otherUserRecord] = await serverDB
|
||||
.insert(myTable)
|
||||
.values({ userId: 'other-user', data: 'original' })
|
||||
.returning();
|
||||
import { eq } from 'drizzle-orm';
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
|
||||
const result = await myModel.update(otherUserRecord.id, { data: 'hacked' });
|
||||
import { getTestDB } from '../../core/getTestDB';
|
||||
import { users } from '../../schemas';
|
||||
import type { LobeChatDatabase } from '../../type';
|
||||
import { MyModel } from '../myModel';
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
const unchanged = await serverDB.query.myTable.findFirst({
|
||||
where: eq(myTable.id, otherUserRecord.id),
|
||||
});
|
||||
expect(unchanged?.data).toBe('original');
|
||||
});
|
||||
```
|
||||
const serverDB: LobeChatDatabase = await getTestDB(); // top-level await is fine
|
||||
|
||||
## Data Management
|
||||
|
||||
```typescript
|
||||
const userId = 'test-user';
|
||||
const userId = 'my-model-test-user';
|
||||
const otherUserId = 'other-user';
|
||||
const myModel = new MyModel(serverDB, userId);
|
||||
|
||||
beforeEach(async () => {
|
||||
await serverDB.delete(users);
|
||||
@@ -97,40 +76,99 @@ beforeEach(async () => {
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await serverDB.delete(users);
|
||||
await serverDB.delete(users); // cascades to user-scoped rows
|
||||
});
|
||||
```
|
||||
|
||||
## Foreign Key Handling
|
||||
Some tests need the Node environment (pgvector, server-only deps) — add
|
||||
`// @vitest-environment node` as the first line when required.
|
||||
|
||||
## User permission check — security first 🔒
|
||||
|
||||
**Every user-data operation must be ownership-scoped.** Always add a test proving
|
||||
another user cannot read/update/delete the row.
|
||||
|
||||
```typescript
|
||||
// ❌ Wrong: Invalid foreign key
|
||||
// ✅ SECURE: ownership in the WHERE clause
|
||||
update = async (id: string, data: Partial<MyModel>) =>
|
||||
this.db
|
||||
.update(myTable)
|
||||
.set(data)
|
||||
.where(and(eq(myTable.id, id), eq(myTable.userId, this.userId)))
|
||||
.returning();
|
||||
```
|
||||
|
||||
```typescript
|
||||
it('should NOT update another user's record', async () => {
|
||||
const otherModel = new MyModel(serverDB, otherUserId);
|
||||
const [row] = await otherModel.create({ data: 'original' });
|
||||
|
||||
await myModel.update(row.id, { data: 'hacked' });
|
||||
|
||||
const unchanged = await serverDB.query.myTable.findFirst({
|
||||
where: eq(myTable.id, row.id),
|
||||
});
|
||||
expect(unchanged?.data).toBe('original');
|
||||
});
|
||||
```
|
||||
|
||||
## What to cover
|
||||
|
||||
Aim each model/repository as close to 100% as practical (excluding BM25):
|
||||
|
||||
- Every public method
|
||||
- Both branches of conditionals; empty-list / `if (!x) return []` early returns
|
||||
- Error fallbacks (e.g. decrypt/JSON-parse failure → `null`)
|
||||
- Filters, pagination, ordering branches
|
||||
- Ownership / user isolation, and workspace scoping if the model takes a `workspaceId`
|
||||
|
||||
## Schema gotchas (real traps that fail inserts or types)
|
||||
|
||||
- **`workspaces`** requires `{ id, name, slug, primaryOwnerId }` and has **no
|
||||
`userId` column** — `insert(workspaces).values({ id, name, slug, primaryOwnerId })`.
|
||||
- **uuid columns**: a "not found" test must pass a _valid_ UUID
|
||||
(`'00000000-0000-0000-0000-000000000000'`); a random string raises a `22P02`
|
||||
DB error instead of returning `undefined`/`null`.
|
||||
- **Enum / `$type` columns** are type-checked: e.g. `files.source` is a
|
||||
`FileSource` enum (`image_generation` | `page-editor` | `video_generation`),
|
||||
not free text — passing `'upload'` is a type error.
|
||||
- Read the table's schema in `src/schemas/` for `notNull` columns **without
|
||||
defaults**; you must supply those on insert.
|
||||
|
||||
## Foreign key handling
|
||||
|
||||
```typescript
|
||||
// ❌ Wrong: invalid foreign key
|
||||
const testData = { asyncTaskId: 'invalid-uuid', fileId: 'non-existent' };
|
||||
|
||||
// ✅ Correct: Use null
|
||||
// ✅ Use null …
|
||||
const testData = { asyncTaskId: null, fileId: null };
|
||||
|
||||
// ✅ Or: Create referenced record first
|
||||
beforeEach(async () => {
|
||||
const [asyncTask] = await serverDB
|
||||
.insert(asyncTasks)
|
||||
.values({ id: 'valid-id', status: 'pending' })
|
||||
.returning();
|
||||
testData.asyncTaskId = asyncTask.id;
|
||||
});
|
||||
// ✅ … or create the referenced row first
|
||||
const [asyncTask] = await serverDB.insert(asyncTasks).values({ status: 'pending' }).returning();
|
||||
testData.asyncTaskId = asyncTask.id;
|
||||
```
|
||||
|
||||
## Predictable Sorting
|
||||
## Predictable sorting
|
||||
|
||||
```typescript
|
||||
// ✅ Use explicit timestamps
|
||||
const oldDate = new Date('2024-01-01T10:00:00Z');
|
||||
const newDate = new Date('2024-01-02T10:00:00Z');
|
||||
// ✅ Use explicit timestamps — never rely on insert order
|
||||
await serverDB.insert(table).values([
|
||||
{ ...data1, createdAt: oldDate },
|
||||
{ ...data2, createdAt: newDate },
|
||||
{ ...data1, createdAt: new Date('2024-01-01T10:00:00Z') },
|
||||
{ ...data2, createdAt: new Date('2024-01-02T10:00:00Z') },
|
||||
]);
|
||||
|
||||
// ❌ Don't rely on insert order
|
||||
await serverDB.insert(table).values([data1, data2]); // Unpredictable
|
||||
```
|
||||
|
||||
## Checking coverage of one file
|
||||
|
||||
```bash
|
||||
# Per-file coverage; read the "Uncovered Line #s" column to find gaps
|
||||
cd packages/database
|
||||
bunx vitest run --coverage --silent='passed-only' '[test-file]' 2>&1 | grep '[sourceFile].ts'
|
||||
```
|
||||
|
||||
## Before finishing
|
||||
|
||||
1. Tests pass: `bunx vitest run --silent='passed-only' '[file]'`
|
||||
2. Types pass: `bun run type-check` (vitest uses esbuild and does **not**
|
||||
type-check — a green test run can still have type errors).
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
---
|
||||
name: trpc-router
|
||||
description: 'TRPC router development guide. Use when creating or modifying src/server/routers, adding procedures, or implementing server-side API endpoints.'
|
||||
description: 'TRPC router development guide. Use when creating or modifying apps/server/src/routers, adding procedures, or implementing server-side API endpoints.'
|
||||
user-invocable: false
|
||||
---
|
||||
|
||||
@@ -8,9 +8,9 @@ user-invocable: false
|
||||
|
||||
## File Location
|
||||
|
||||
- Routers: `src/server/routers/lambda/<domain>.ts`
|
||||
- Helpers: `src/server/routers/lambda/_helpers/`
|
||||
- Schemas: `src/server/routers/lambda/_schema/`
|
||||
- Routers: `apps/server/src/routers/lambda/<domain>.ts`
|
||||
- Helpers: `apps/server/src/routers/lambda/_helpers/`
|
||||
- Schemas: `apps/server/src/routers/lambda/_schema/`
|
||||
|
||||
## Router Structure
|
||||
|
||||
|
||||
@@ -186,4 +186,4 @@ QSTASH_URL=https://custom-qstash.com
|
||||
- [Upstash Workflow Documentation](https://upstash.com/docs/workflow)
|
||||
- [QStash Documentation](https://upstash.com/docs/qstash)
|
||||
- [Example Workflows in Codebase](<../../src/app/(backend)/api/workflows/>)
|
||||
- [Workflow Classes](../../src/server/workflows/)
|
||||
- [Workflow Classes](../../apps/server/src/workflows/)
|
||||
|
||||
@@ -177,7 +177,7 @@ This allows cloud to override specific modules while using lobehub defaults.
|
||||
Place workflow class in cloud:
|
||||
|
||||
```text
|
||||
lobehub-cloud/src/server/workflows/featureName/index.ts
|
||||
lobehub-cloud/apps/server/src/workflows/featureName/index.ts
|
||||
```
|
||||
|
||||
### Shared Workflows
|
||||
@@ -185,7 +185,7 @@ lobehub-cloud/src/server/workflows/featureName/index.ts
|
||||
Place workflow class in lobehub, re-export in cloud if needed:
|
||||
|
||||
```text
|
||||
lobehub/src/server/workflows/featureName/index.ts
|
||||
lobehub/apps/server/src/workflows/featureName/index.ts
|
||||
```
|
||||
|
||||
---
|
||||
@@ -294,8 +294,8 @@ export { POST } from 'lobehub/src/app/(backend)/api/workflows/feature/*/route';
|
||||
**Step 4**: Move workflow class to lobehub
|
||||
|
||||
```bash
|
||||
mv lobehub-cloud/src/server/workflows/feature \
|
||||
lobehub/src/server/workflows/
|
||||
mv lobehub-cloud/apps/server/src/workflows/feature \
|
||||
lobehub/apps/server/src/workflows/
|
||||
```
|
||||
|
||||
**Step 5**: Update cloud imports
|
||||
@@ -305,7 +305,7 @@ mv lobehub-cloud/src/server/workflows/feature \
|
||||
import { Workflow } from '@/server/workflows/feature';
|
||||
|
||||
// To
|
||||
import { Workflow } from 'lobehub/src/server/workflows/feature';
|
||||
import { Workflow } from 'lobehub/apps/server/src/workflows/feature';
|
||||
```
|
||||
|
||||
---
|
||||
@@ -326,7 +326,7 @@ lobehub-cloud/
|
||||
│ ├── process-users/route.ts
|
||||
│ ├── paginate-users/route.ts
|
||||
│ └── generate-user/route.ts
|
||||
└── src/server/workflows/welcomePlaceholder/
|
||||
└── apps/server/src/workflows/welcomePlaceholder/
|
||||
└── index.ts
|
||||
```
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ Full code templates for the 3-layer architecture. Read this when actually writin
|
||||
|
||||
## Table of Contents
|
||||
|
||||
1. [Workflow Class](#workflow-class) — `src/server/workflows/{workflowName}/index.ts`
|
||||
1. [Workflow Class](#workflow-class) — `apps/server/src/workflows/{workflowName}/index.ts`
|
||||
2. [Layer 1: Entry Point](#layer-1-entry-point-process-) — `process-*` route
|
||||
3. [Layer 2: Pagination](#layer-2-pagination-paginate-) — `paginate-*` route
|
||||
4. [Layer 3: Execution](#layer-3-execution-execute--generate-) — `execute-*` / `generate-*` route
|
||||
@@ -13,7 +13,7 @@ Full code templates for the 3-layer architecture. Read this when actually writin
|
||||
|
||||
## Workflow Class
|
||||
|
||||
**Location:** `src/server/workflows/{workflowName}/index.ts`
|
||||
**Location:** `apps/server/src/workflows/{workflowName}/index.ts`
|
||||
|
||||
```typescript
|
||||
import { Client } from '@upstash/workflow';
|
||||
|
||||
@@ -32,7 +32,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
name: Test Packages
|
||||
env:
|
||||
PACKAGES: '@lobechat/file-loaders @lobechat/prompts @lobechat/model-runtime @lobechat/web-crawler @lobechat/electron-server-ipc @lobechat/utils @lobechat/python-interpreter @lobechat/context-engine @lobechat/agent-runtime @lobechat/conversation-flow @lobechat/ssrf-safe-fetch @lobechat/memory-user-memory @lobechat/types @lobechat/builtin-tool-lobe-agent model-bank @lobechat/agent-gateway-client @lobechat/agent-manager-runtime @lobechat/device-gateway-client @lobechat/device-identity @lobechat/eval-dataset-parser @lobechat/eval-rubric @lobechat/fetch-sse @lobechat/heterogeneous-agents'
|
||||
PACKAGES: '@lobechat/file-loaders @lobechat/prompts @lobechat/model-runtime @lobechat/web-crawler @lobechat/electron-server-ipc @lobechat/utils @lobechat/python-interpreter @lobechat/context-engine @lobechat/agent-runtime @lobechat/conversation-flow @lobechat/ssrf-safe-fetch @lobechat/memory-user-memory @lobechat/types @lobechat/trpc @lobechat/app-config @lobechat/locales @lobechat/env @lobechat/builtin-tool-lobe-agent model-bank @lobechat/agent-gateway-client @lobechat/agent-manager-runtime @lobechat/device-gateway-client @lobechat/device-identity @lobechat/eval-dataset-parser @lobechat/eval-rubric @lobechat/fetch-sse @lobechat/heterogeneous-agents'
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
@@ -19,7 +19,7 @@ lobehub/
|
||||
├── apps/
|
||||
│ ├── desktop/ # Electron desktop app
|
||||
│ ├── cli/ # LobeHub CLI
|
||||
│ └── device-gateway/ # Device gateway service
|
||||
│ └── server/ # Server service
|
||||
├── packages/ # Shared packages (@lobechat/*)
|
||||
│ ├── database/ # Database schemas, models, repositories
|
||||
│ ├── agent-runtime/ # Agent runtime
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
.\" Code generated by `npm run man:generate`; DO NOT EDIT.
|
||||
.\" Manual command details come from the Commander command tree.
|
||||
.TH LH 1 "" "@lobehub/cli 0.0.24" "User Commands"
|
||||
.TH LH 1 "" "@lobehub/cli 0.0.29" "User Commands"
|
||||
.SH NAME
|
||||
lh \- LobeHub CLI \- manage and connect to LobeHub services
|
||||
.SH SYNOPSIS
|
||||
@@ -113,6 +113,9 @@ Manage plugins
|
||||
.B user
|
||||
Manage user account and settings
|
||||
.TP
|
||||
.B verify
|
||||
Manage the Agent Run delivery checker (criteria, rubrics, plans, results)
|
||||
.TP
|
||||
.B whoami
|
||||
Display current user information
|
||||
.TP
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lobehub/cli",
|
||||
"version": "0.0.24",
|
||||
"version": "0.0.29",
|
||||
"type": "module",
|
||||
"bin": {
|
||||
"lh": "./dist/index.js",
|
||||
|
||||
@@ -3,6 +3,7 @@ import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
|
||||
import type {
|
||||
AgentRunRequestMessage,
|
||||
DeviceSystemInfo,
|
||||
SystemInfoRequestMessage,
|
||||
ToolCallRequestMessage,
|
||||
@@ -25,6 +26,7 @@ import {
|
||||
stopDaemon,
|
||||
writeStatus,
|
||||
} from '../daemon/manager';
|
||||
import { spawnHeteroAgentRun } from '../device/agentRun';
|
||||
import { registerDevice, resolveDeviceIdentity } from '../device/register';
|
||||
import { loadOrCreateConnectionId, loadSettings, normalizeUrl, saveSettings } from '../settings';
|
||||
import { executeToolCall } from '../tools';
|
||||
@@ -286,6 +288,38 @@ async function runConnect(options: ConnectOptions, isDaemonChild: boolean) {
|
||||
});
|
||||
});
|
||||
|
||||
// Handle gateway-dispatched agent runs (heterogeneous agents, e.g. Claude
|
||||
// Code). Mirrors the desktop app: spawn `lh hetero exec`, which owns the full
|
||||
// execution + server-ingest pipeline. Ack with the spawn outcome — `accepted`
|
||||
// once the child starts, `rejected` if it fails to spawn (e.g. bad cwd) — so
|
||||
// a failed dispatch surfaces as an error instead of a stuck assistant message.
|
||||
client.on('agent_run_request', async (request: AgentRunRequestMessage) => {
|
||||
info(
|
||||
`Received agent_run_request: operationId=${request.operationId} type=${request.agentType}`,
|
||||
);
|
||||
try {
|
||||
const ack = await spawnHeteroAgentRun(
|
||||
{
|
||||
agentType: request.agentType,
|
||||
cwd: request.cwd,
|
||||
jwt: request.jwt,
|
||||
operationId: request.operationId,
|
||||
prompt: request.prompt,
|
||||
resumeSessionId: request.resumeSessionId,
|
||||
serverUrl: auth.serverUrl,
|
||||
systemContext: request.systemContext,
|
||||
topicId: request.topicId,
|
||||
},
|
||||
{ error, info },
|
||||
);
|
||||
client.sendAgentRunAck({ operationId: request.operationId, ...ack });
|
||||
} catch (err) {
|
||||
const reason = err instanceof Error ? err.message : String(err);
|
||||
error(`agent_run_request failed: ${reason}`);
|
||||
client.sendAgentRunAck({ operationId: request.operationId, reason, status: 'rejected' });
|
||||
}
|
||||
});
|
||||
|
||||
client.on('connected', () => {
|
||||
updateStatus('connected');
|
||||
});
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import { mkdtemp, readdir, readFile } from 'node:fs/promises';
|
||||
import { tmpdir } from 'node:os';
|
||||
import path from 'node:path';
|
||||
import { PassThrough } from 'node:stream';
|
||||
|
||||
import { Command } from 'commander';
|
||||
@@ -645,4 +648,224 @@ describe('hetero exec command', () => {
|
||||
'finish',
|
||||
]);
|
||||
});
|
||||
|
||||
it('resets the per-message text accumulator at message boundaries (no cross-message duplication)', async () => {
|
||||
// The `replace` snapshot accumulator must not span
|
||||
// message boundaries. Two assistant messages separated by a
|
||||
// stream_end/stream_start boundary must each snapshot only their OWN
|
||||
// text — otherwise the second message re-emits the first's text verbatim.
|
||||
const textSnapshots: string[] = [];
|
||||
mockHeteroIngestMutate.mockImplementation(async ({ events }: any) => {
|
||||
for (const e of events) {
|
||||
if (e.type === 'stream_chunk' && e.data?.chunkType === 'text') {
|
||||
textSnapshots.push(e.data.content);
|
||||
}
|
||||
}
|
||||
return { ack: true };
|
||||
});
|
||||
|
||||
mockSpawnAgent.mockReturnValue(
|
||||
createFakeHandle({
|
||||
events: [
|
||||
{
|
||||
data: { chunkType: 'text', content: 'first message' },
|
||||
operationId: 'op-server',
|
||||
stepIndex: 0,
|
||||
timestamp: 1,
|
||||
type: 'stream_chunk',
|
||||
},
|
||||
{ data: {}, operationId: 'op-server', stepIndex: 0, timestamp: 2, type: 'stream_end' },
|
||||
{
|
||||
data: { newStep: true, provider: 'claude-code' },
|
||||
operationId: 'op-server',
|
||||
stepIndex: 1,
|
||||
timestamp: 3,
|
||||
type: 'stream_start',
|
||||
},
|
||||
{
|
||||
data: { chunkType: 'text', content: 'second message' },
|
||||
operationId: 'op-server',
|
||||
stepIndex: 1,
|
||||
timestamp: 4,
|
||||
type: 'stream_chunk',
|
||||
},
|
||||
{
|
||||
data: { reason: 'success' },
|
||||
operationId: 'op-server',
|
||||
stepIndex: 1,
|
||||
timestamp: 5,
|
||||
type: 'agent_runtime_end',
|
||||
},
|
||||
],
|
||||
exitCode: 0,
|
||||
}),
|
||||
);
|
||||
|
||||
await runCmd([
|
||||
'hetero',
|
||||
'exec',
|
||||
'--type',
|
||||
'claude-code',
|
||||
'--prompt',
|
||||
'hi',
|
||||
'--topic',
|
||||
'topic-1',
|
||||
'--operation-id',
|
||||
'op-server',
|
||||
'--render',
|
||||
'none',
|
||||
]);
|
||||
|
||||
// Second snapshot carries ONLY the second message — not "first messagesecond message".
|
||||
expect(textSnapshots).toEqual(['first message', 'second message']);
|
||||
});
|
||||
|
||||
it('forwards subagent text raw (no snapshot coalescing, no cross-scope pollution of main text)', async () => {
|
||||
// Subagent text is emitted as ONE full block per turn and the server's
|
||||
// subagent path *appends* it (no snapshot semantics). It must therefore
|
||||
// bypass the main-agent `replace`-snapshot coalescing: folding it into the
|
||||
// shared accumulator would (a) splice main text into the subagent message
|
||||
// and (b) make the server append a replace-snapshot → duplicated content.
|
||||
const ingested: any[] = [];
|
||||
mockHeteroIngestMutate.mockImplementation(async ({ events }: any) => {
|
||||
for (const e of events) ingested.push(e);
|
||||
return { ack: true };
|
||||
});
|
||||
|
||||
const subagent = { parentToolCallId: 'task-1', subagentMessageId: 'msg-sub-1' };
|
||||
|
||||
mockSpawnAgent.mockReturnValue(
|
||||
createFakeHandle({
|
||||
events: [
|
||||
// Main-agent streamed text delta (coalesced).
|
||||
{
|
||||
data: { chunkType: 'text', content: 'hello ' },
|
||||
operationId: 'op-server',
|
||||
stepIndex: 0,
|
||||
timestamp: 1,
|
||||
type: 'stream_chunk',
|
||||
},
|
||||
// Subagent full-block text — must pass through untouched.
|
||||
{
|
||||
data: { chunkType: 'text', content: 'I checked the files.', subagent },
|
||||
operationId: 'op-server',
|
||||
stepIndex: 0,
|
||||
timestamp: 2,
|
||||
type: 'stream_chunk',
|
||||
},
|
||||
{
|
||||
data: {
|
||||
chunkType: 'tools_calling',
|
||||
toolsCalling: [
|
||||
{
|
||||
apiName: 'Bash',
|
||||
arguments: '{"cmd":"ls"}',
|
||||
id: 'tc-1',
|
||||
identifier: 'bash',
|
||||
type: 'default',
|
||||
},
|
||||
],
|
||||
},
|
||||
operationId: 'op-server',
|
||||
stepIndex: 1,
|
||||
timestamp: 3,
|
||||
type: 'stream_chunk',
|
||||
},
|
||||
{
|
||||
data: { reason: 'success' },
|
||||
operationId: 'op-server',
|
||||
stepIndex: 1,
|
||||
timestamp: 4,
|
||||
type: 'agent_runtime_end',
|
||||
},
|
||||
],
|
||||
exitCode: 0,
|
||||
}),
|
||||
);
|
||||
|
||||
await runCmd([
|
||||
'hetero',
|
||||
'exec',
|
||||
'--type',
|
||||
'claude-code',
|
||||
'--prompt',
|
||||
'hi',
|
||||
'--topic',
|
||||
'topic-1',
|
||||
'--operation-id',
|
||||
'op-server',
|
||||
'--render',
|
||||
'none',
|
||||
]);
|
||||
|
||||
const textEvents = ingested.filter(
|
||||
(e) => e.type === 'stream_chunk' && e.data?.chunkType === 'text',
|
||||
);
|
||||
|
||||
// Subagent text forwarded verbatim: keeps its subagent tag, original
|
||||
// content, and is NOT converted into a replace snapshot.
|
||||
const subagentText = textEvents.find((e) => e.data?.subagent);
|
||||
expect(subagentText).toBeDefined();
|
||||
expect(subagentText.data.content).toBe('I checked the files.');
|
||||
expect(subagentText.data.snapshotMode).toBeUndefined();
|
||||
|
||||
// Main snapshot is untainted by the subagent block.
|
||||
const mainText = textEvents.find((e) => !e.data?.subagent);
|
||||
expect(mainText).toBeDefined();
|
||||
expect(mainText.data.content).toBe('hello ');
|
||||
expect(mainText.data.snapshotMode).toBe('replace');
|
||||
expect(mainText.data.content).not.toContain('I checked');
|
||||
});
|
||||
|
||||
it('--raw-dump writes a session folder with meta.json, wires onRawStdout, and tees stderr', async () => {
|
||||
const root = await mkdtemp(path.join(tmpdir(), 'hetero-rawdump-'));
|
||||
|
||||
mockSpawnAgent.mockReturnValue(
|
||||
createFakeHandle({
|
||||
events: [
|
||||
{
|
||||
data: { chunkType: 'text', content: 'hi' },
|
||||
operationId: 'op-raw',
|
||||
stepIndex: 0,
|
||||
timestamp: 1,
|
||||
type: 'stream_chunk',
|
||||
},
|
||||
],
|
||||
exitCode: 0,
|
||||
stderrChunks: ['warning: something happened\n'],
|
||||
}),
|
||||
);
|
||||
|
||||
await runCmd([
|
||||
'hetero',
|
||||
'exec',
|
||||
'--type',
|
||||
'claude-code',
|
||||
'--prompt',
|
||||
'hi',
|
||||
'--operation-id',
|
||||
'op-raw',
|
||||
'--render',
|
||||
'none',
|
||||
'--raw-dump',
|
||||
root,
|
||||
]);
|
||||
|
||||
// The raw stdout tee is handed to spawnAgent (the package captures the
|
||||
// pre-adapter bytes — exercised in spawnAgent.test.ts).
|
||||
expect(typeof mockSpawnAgent.mock.calls[0][0].onRawStdout).toBe('function');
|
||||
|
||||
// One session folder per exec, keyed by the operation id.
|
||||
const sessions = await readdir(root);
|
||||
expect(sessions).toHaveLength(1);
|
||||
expect(sessions[0]).toContain('op-raw');
|
||||
const sessionDir = path.join(root, sessions[0]!);
|
||||
|
||||
const meta = JSON.parse(await readFile(path.join(sessionDir, 'meta.json'), 'utf8'));
|
||||
expect(meta).toMatchObject({ agentType: 'claude-code', operationId: 'op-raw' });
|
||||
|
||||
// stderr is teed to the attempt's log file.
|
||||
const stderrDump = await readFile(path.join(sessionDir, 'attempt-1.stderr.log'), 'utf8');
|
||||
expect(stderrDump).toContain('warning: something happened');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { once } from 'node:events';
|
||||
import { readFile } from 'node:fs/promises';
|
||||
import { createWriteStream } from 'node:fs';
|
||||
import { mkdir, readFile, writeFile } from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
|
||||
import type {
|
||||
@@ -59,6 +60,12 @@ interface ExecOptions {
|
||||
inputJson?: string;
|
||||
operationId?: string;
|
||||
prompt?: string;
|
||||
/**
|
||||
* When set, persist the agent process's RAW stdout/stderr (pre-adapter
|
||||
* stream-json) under `<rawDump>/<timestamp>-<operationId>/` for debugging.
|
||||
* Independent of `--render` and the server ingest path.
|
||||
*/
|
||||
rawDump?: string;
|
||||
/**
|
||||
* Output rendering mode.
|
||||
* jsonl — emit each `AgentStreamEvent` as a JSONL line on stdout (default
|
||||
@@ -217,10 +224,25 @@ class SerialServerIngester {
|
||||
push(event: AgentStreamEvent): void {
|
||||
if (this.fatalError) return;
|
||||
|
||||
// Text-snapshot coalescing is a MAIN-AGENT-ONLY transport optimization:
|
||||
// it debounces the main agent's token-level text *deltas* into one
|
||||
// `replace` snapshot to cut ingest calls. Subagent text is explicitly
|
||||
// excluded (`!event.data?.subagent`) for two reasons:
|
||||
// 1. Subagent text is emitted as ONE full block per turn (see
|
||||
// claudeCode adapter `handleSubagentAssistant` — "the full block IS
|
||||
// the only emission"), so there is nothing to coalesce.
|
||||
// 2. `accumulatedText` is a single shared accumulator with no subagent
|
||||
// scope. Folding subagent blocks in would (a) splice main-agent text
|
||||
// into the subagent message via the shared buffer, and (b) emit a
|
||||
// `replace` snapshot that the server's subagent path *appends*
|
||||
// (`persistSubagentText` has no snapshot semantics) → duplicated /
|
||||
// cross-scope content. Forwarding the raw block straight through lets
|
||||
// the server append it exactly once, correctly.
|
||||
if (
|
||||
event.type === 'stream_chunk' &&
|
||||
event.data?.chunkType === 'text' &&
|
||||
typeof event.data?.content === 'string'
|
||||
typeof event.data?.content === 'string' &&
|
||||
!event.data?.subagent
|
||||
) {
|
||||
this.accumulatedText += event.data.content;
|
||||
this.pendingTextEvent = event;
|
||||
@@ -233,6 +255,17 @@ class SerialServerIngester {
|
||||
}
|
||||
|
||||
this.queuePendingTextSnapshot();
|
||||
// `accumulatedText` is a PER-MESSAGE accumulator: it coalesces the text
|
||||
// deltas of the current assistant message into one `replace` snapshot.
|
||||
// A new message boundary (`stream_start` / `stream_end`, emitted by the
|
||||
// adapter's `openMainMessage`) must reset it — otherwise it spans the
|
||||
// whole run and every later message's snapshot re-emits all prior
|
||||
// messages' text verbatim, which the server then persists into the new
|
||||
// DB message: cross-message text duplication. Reset
|
||||
// AFTER flushing the just-ended message's pending snapshot above.
|
||||
if (event.type === 'stream_start' || event.type === 'stream_end') {
|
||||
this.accumulatedText = '';
|
||||
}
|
||||
this.enqueue(async () => {
|
||||
await this.sink.ingest([event]);
|
||||
});
|
||||
@@ -280,6 +313,77 @@ class SerialServerIngester {
|
||||
}
|
||||
}
|
||||
|
||||
interface RawStreamDumpAttempt {
|
||||
/** Flush + close both file streams. Resolves once the bytes are on disk. */
|
||||
close: () => Promise<void>;
|
||||
writeStderr: (chunk: Buffer) => void;
|
||||
writeStdout: (chunk: Buffer) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists the agent process's RAW stdout/stderr — the untouched stream-json,
|
||||
* BEFORE the adapter — to disk for post-hoc debugging. The adapted/ingested
|
||||
* view can't tell a CC-side empty `tool_result` apart from an adapter
|
||||
* extraction bug; the raw dump can.
|
||||
*
|
||||
* Enabled via `lh hetero exec --raw-dump <dir>`. Each exec gets its own
|
||||
* `<dir>/<timestamp>-<operationId>/` session folder; each spawn attempt (the
|
||||
* resume retry is a second attempt) writes `<label>.stdout.jsonl` /
|
||||
* `<label>.stderr.log`. Fully best-effort: any dump failure is logged and
|
||||
* swallowed so it never affects the run or its exit code.
|
||||
*
|
||||
* Future: the server-side sandbox runner (`spawnHeteroSandbox`) and the
|
||||
* desktop device path (`spawnLhHeteroExec`) can pass `--raw-dump` pointing at
|
||||
* a collectable location to capture remote runs the same way.
|
||||
*/
|
||||
class RawStreamDump {
|
||||
private constructor(private readonly dir: string) {}
|
||||
|
||||
static async create(
|
||||
root: string,
|
||||
operationId: string,
|
||||
meta: Record<string, unknown>,
|
||||
): Promise<RawStreamDump | undefined> {
|
||||
try {
|
||||
const safeTs = new Date().toISOString().replaceAll(/[.:]/g, '-');
|
||||
const dir = path.join(path.resolve(root), `${safeTs}-${operationId}`);
|
||||
await mkdir(dir, { recursive: true });
|
||||
await writeFile(
|
||||
path.join(dir, 'meta.json'),
|
||||
`${JSON.stringify({ ...meta, operationId, startedAt: new Date().toISOString() }, null, 2)}\n`,
|
||||
);
|
||||
log.info(`Raw stream dump enabled → ${dir}`);
|
||||
return new RawStreamDump(dir);
|
||||
} catch (err) {
|
||||
log.warn(
|
||||
`Failed to initialize raw stream dump: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
openAttempt(label: string): RawStreamDumpAttempt {
|
||||
const stdout = createWriteStream(path.join(this.dir, `${label}.stdout.jsonl`));
|
||||
const stderr = createWriteStream(path.join(this.dir, `${label}.stderr.log`));
|
||||
// A failed dump write must never crash the run — drop write errors.
|
||||
stdout.on('error', () => {});
|
||||
stderr.on('error', () => {});
|
||||
return {
|
||||
close: () =>
|
||||
Promise.all([
|
||||
new Promise<void>((resolve) => stdout.end(() => resolve())),
|
||||
new Promise<void>((resolve) => stderr.end(() => resolve())),
|
||||
]).then(() => undefined),
|
||||
writeStderr: (chunk: Buffer) => {
|
||||
stderr.write(chunk);
|
||||
},
|
||||
writeStdout: (chunk: Buffer) => {
|
||||
stdout.write(chunk);
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const exec = async (options: ExecOptions): Promise<void> => {
|
||||
if (!SUPPORTED_AGENT_TYPES.has(options.type)) {
|
||||
log.error(
|
||||
@@ -314,6 +418,17 @@ const exec = async (options: ExecOptions): Promise<void> => {
|
||||
|
||||
const operationId = options.operationId || randomUUID();
|
||||
|
||||
// Optional raw stream dump (pre-adapter stdout/stderr) for debugging.
|
||||
let rawDump: RawStreamDump | undefined;
|
||||
if (options.rawDump) {
|
||||
rawDump = await RawStreamDump.create(options.rawDump, operationId, {
|
||||
agentType: options.type,
|
||||
cwd: options.cwd || process.cwd(),
|
||||
resume: options.resume ?? null,
|
||||
topicId: options.topic ?? null,
|
||||
});
|
||||
}
|
||||
|
||||
// Determine JSONL output mode.
|
||||
// Explicit --render flag always wins. Otherwise: emit JSONL in standalone
|
||||
// mode; suppress in server-ingest mode (sink handles the data path).
|
||||
@@ -357,6 +472,7 @@ const exec = async (options: ExecOptions): Promise<void> => {
|
||||
const runOneAgent = async (
|
||||
spawnOpts: Parameters<typeof spawnAgent>[0],
|
||||
interceptResumeErrors: boolean,
|
||||
runLabel: string,
|
||||
): Promise<{
|
||||
code: number | null;
|
||||
ingestError: boolean;
|
||||
@@ -365,12 +481,17 @@ const exec = async (options: ExecOptions): Promise<void> => {
|
||||
signal: NodeJS.Signals | null;
|
||||
stderrContent: string;
|
||||
}> => {
|
||||
// One raw-dump file pair per spawn attempt (the resume retry is a second
|
||||
// attempt). The stdout tee runs inside `spawnAgent` before the adapter.
|
||||
const dumpAttempt = rawDump?.openAttempt(runLabel);
|
||||
|
||||
// `spawnAgent` is async and can reject DURING image normalization — fetch
|
||||
// failures, missing local --image paths, decode errors.
|
||||
let handle: Awaited<ReturnType<typeof spawnAgent>>;
|
||||
try {
|
||||
handle = await spawnAgent(spawnOpts);
|
||||
handle = await spawnAgent({ ...spawnOpts, onRawStdout: dumpAttempt?.writeStdout });
|
||||
} catch (err) {
|
||||
await dumpAttempt?.close();
|
||||
log.error('Failed to start agent:', err instanceof Error ? err.message : String(err));
|
||||
process.exit(1);
|
||||
}
|
||||
@@ -387,6 +508,7 @@ const exec = async (options: ExecOptions): Promise<void> => {
|
||||
if (stderrContent.length < STDERR_CAP) {
|
||||
stderrContent += chunk.toString();
|
||||
}
|
||||
dumpAttempt?.writeStderr(chunk);
|
||||
});
|
||||
handle.stderr.pipe(process.stderr);
|
||||
|
||||
@@ -460,6 +582,7 @@ const exec = async (options: ExecOptions): Promise<void> => {
|
||||
// best-effort
|
||||
}
|
||||
}
|
||||
await dumpAttempt?.close();
|
||||
process.exit(1);
|
||||
} finally {
|
||||
process.off('SIGINT', onSigint);
|
||||
@@ -468,6 +591,7 @@ const exec = async (options: ExecOptions): Promise<void> => {
|
||||
|
||||
const { code, signal } = await handle.exit;
|
||||
await stderrEnded;
|
||||
await dumpAttempt?.close();
|
||||
|
||||
// Fallback stderr detection: CC may exit non-zero without emitting a
|
||||
// result event (e.g. it writes to stderr and quits immediately).
|
||||
@@ -503,6 +627,7 @@ const exec = async (options: ExecOptions): Promise<void> => {
|
||||
resumeSessionId: options.resume,
|
||||
},
|
||||
interceptResume,
|
||||
'attempt-1',
|
||||
);
|
||||
|
||||
// ─── Auto-retry without --resume when the session cannot be used ─────────
|
||||
@@ -531,6 +656,7 @@ const exec = async (options: ExecOptions): Promise<void> => {
|
||||
// No resumeSessionId — start fresh
|
||||
},
|
||||
false, // no need to intercept resume errors on a fresh run
|
||||
'attempt-2-noresume',
|
||||
);
|
||||
}
|
||||
|
||||
@@ -618,5 +744,9 @@ export function registerHeteroCommand(program: Command) {
|
||||
'--render <mode>',
|
||||
'Output mode: jsonl (emit events as JSONL on stdout) | none (suppress stdout). Defaults to jsonl in standalone, none in server-ingest mode.',
|
||||
)
|
||||
.option(
|
||||
'--raw-dump <dir>',
|
||||
'Persist the agent process RAW stdout/stderr (pre-adapter stream-json) under <dir>/<timestamp>-<operationId>/ for debugging. Each spawn attempt writes its own .stdout.jsonl / .stderr.log. Best-effort; never affects the run.',
|
||||
)
|
||||
.action(exec);
|
||||
}
|
||||
|
||||
@@ -64,15 +64,18 @@ describe('skill command', () => {
|
||||
|
||||
describe('list', () => {
|
||||
it('should display skills in table format', async () => {
|
||||
mockTrpcClient.agentSkills.list.query.mockResolvedValue([
|
||||
{
|
||||
description: 'A skill',
|
||||
id: 's1',
|
||||
identifier: 'test-skill',
|
||||
name: 'Test Skill',
|
||||
source: 'user',
|
||||
},
|
||||
]);
|
||||
mockTrpcClient.agentSkills.list.query.mockResolvedValue({
|
||||
data: [
|
||||
{
|
||||
description: 'A skill',
|
||||
id: 's1',
|
||||
identifier: 'test-skill',
|
||||
name: 'Test Skill',
|
||||
source: 'user',
|
||||
},
|
||||
],
|
||||
total: 1,
|
||||
});
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'skill', 'list']);
|
||||
@@ -83,7 +86,7 @@ describe('skill command', () => {
|
||||
|
||||
it('should output JSON when --json flag is used', async () => {
|
||||
const items = [{ id: 's1', name: 'Test' }];
|
||||
mockTrpcClient.agentSkills.list.query.mockResolvedValue(items);
|
||||
mockTrpcClient.agentSkills.list.query.mockResolvedValue({ data: items, total: items.length });
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'skill', 'list', '--json']);
|
||||
@@ -92,7 +95,7 @@ describe('skill command', () => {
|
||||
});
|
||||
|
||||
it('should filter by source', async () => {
|
||||
mockTrpcClient.agentSkills.list.query.mockResolvedValue([]);
|
||||
mockTrpcClient.agentSkills.list.query.mockResolvedValue({ data: [], total: 0 });
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'skill', 'list', '--source', 'builtin']);
|
||||
@@ -111,7 +114,7 @@ describe('skill command', () => {
|
||||
});
|
||||
|
||||
it('should show message when no skills found', async () => {
|
||||
mockTrpcClient.agentSkills.list.query.mockResolvedValue([]);
|
||||
mockTrpcClient.agentSkills.list.query.mockResolvedValue({ data: [], total: 0 });
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'skill', 'list']);
|
||||
@@ -211,9 +214,10 @@ describe('skill command', () => {
|
||||
|
||||
describe('search', () => {
|
||||
it('should search skills', async () => {
|
||||
mockTrpcClient.agentSkills.search.query.mockResolvedValue([
|
||||
{ description: 'A skill', id: 's1', name: 'Found Skill' },
|
||||
]);
|
||||
mockTrpcClient.agentSkills.search.query.mockResolvedValue({
|
||||
data: [{ description: 'A skill', id: 's1', name: 'Found Skill' }],
|
||||
total: 1,
|
||||
});
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'skill', 'search', 'test']);
|
||||
@@ -223,7 +227,7 @@ describe('skill command', () => {
|
||||
});
|
||||
|
||||
it('should show message when no results', async () => {
|
||||
mockTrpcClient.agentSkills.search.query.mockResolvedValue([]);
|
||||
mockTrpcClient.agentSkills.search.query.mockResolvedValue({ data: [], total: 0 });
|
||||
|
||||
const program = createProgram();
|
||||
await program.parseAsync(['node', 'test', 'skill', 'search', 'nothing']);
|
||||
|
||||
@@ -47,7 +47,7 @@ export function registerSkillCommand(program: Command) {
|
||||
if (options.source) input.source = options.source as 'builtin' | 'market' | 'user';
|
||||
|
||||
const result = await client.agentSkills.list.query(input);
|
||||
const items = Array.isArray(result) ? result : [];
|
||||
const items = result?.data ?? [];
|
||||
|
||||
if (options.json !== undefined) {
|
||||
const fields = typeof options.json === 'string' ? options.json : undefined;
|
||||
@@ -206,7 +206,7 @@ export function registerSkillCommand(program: Command) {
|
||||
.action(async (query: string, options: { json?: string | boolean }) => {
|
||||
const client = await getTrpcClient();
|
||||
const result = await client.agentSkills.search.query({ query });
|
||||
const items = Array.isArray(result) ? result : [];
|
||||
const items = result?.data ?? [];
|
||||
|
||||
if (options.json !== undefined) {
|
||||
const fields = typeof options.json === 'string' ? options.json : undefined;
|
||||
|
||||
@@ -0,0 +1,125 @@
|
||||
import { EventEmitter } from 'node:events';
|
||||
|
||||
import { afterEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { spawnHeteroAgentRun } from './agentRun';
|
||||
|
||||
const { spawnMock } = vi.hoisted(() => ({ spawnMock: vi.fn() }));
|
||||
|
||||
vi.mock('node:child_process', () => ({ spawn: spawnMock }));
|
||||
|
||||
const makeFakeChild = () => {
|
||||
const child = new EventEmitter() as EventEmitter & {
|
||||
stdin: { end: ReturnType<typeof vi.fn>; write: ReturnType<typeof vi.fn> };
|
||||
};
|
||||
child.stdin = { end: vi.fn(), write: vi.fn() };
|
||||
return child;
|
||||
};
|
||||
|
||||
const baseParams = {
|
||||
agentType: 'claudeCode',
|
||||
jwt: 'jwt',
|
||||
operationId: 'op',
|
||||
prompt: 'hi',
|
||||
serverUrl: 'https://app.lobehub.com',
|
||||
topicId: 'tpc',
|
||||
};
|
||||
|
||||
describe('spawnHeteroAgentRun', () => {
|
||||
afterEach(() => {
|
||||
spawnMock.mockReset();
|
||||
});
|
||||
|
||||
it('spawns `lh hetero exec` in server-ingest mode via the current CLI entry', async () => {
|
||||
const child = makeFakeChild();
|
||||
spawnMock.mockReturnValue(child);
|
||||
|
||||
const ackPromise = spawnHeteroAgentRun({
|
||||
...baseParams,
|
||||
cwd: '/work/dir',
|
||||
jwt: 'jwt-token',
|
||||
operationId: 'op-1',
|
||||
topicId: 'tpc-1',
|
||||
});
|
||||
|
||||
expect(spawnMock).toHaveBeenCalledTimes(1);
|
||||
const [bin, args, opts] = spawnMock.mock.calls[0];
|
||||
|
||||
expect(bin).toBe(process.execPath);
|
||||
expect(args).toEqual([
|
||||
...process.execArgv,
|
||||
process.argv[1],
|
||||
'hetero',
|
||||
'exec',
|
||||
'--type',
|
||||
'claudeCode',
|
||||
'--operation-id',
|
||||
'op-1',
|
||||
'--topic',
|
||||
'tpc-1',
|
||||
'--render',
|
||||
'none',
|
||||
'--input-json',
|
||||
'-',
|
||||
'--cwd',
|
||||
'/work/dir',
|
||||
]);
|
||||
expect(opts).toMatchObject({
|
||||
cwd: '/work/dir',
|
||||
env: expect.objectContaining({
|
||||
LOBEHUB_JWT: 'jwt-token',
|
||||
LOBEHUB_SERVER: 'https://app.lobehub.com',
|
||||
}),
|
||||
});
|
||||
|
||||
// stdin is only written after the child actually spawns.
|
||||
expect(child.stdin.write).not.toHaveBeenCalled();
|
||||
child.emit('spawn');
|
||||
|
||||
await expect(ackPromise).resolves.toEqual({ status: 'accepted' });
|
||||
expect(child.stdin.write).toHaveBeenCalledWith(JSON.stringify('hi'));
|
||||
expect(child.stdin.end).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('rejects (no stuck run) when the child errors before spawning, e.g. bad cwd', async () => {
|
||||
const child = makeFakeChild();
|
||||
spawnMock.mockReturnValue(child);
|
||||
|
||||
const ackPromise = spawnHeteroAgentRun({ ...baseParams, cwd: '/missing' });
|
||||
child.emit('error', new Error('spawn ENOENT'));
|
||||
|
||||
await expect(ackPromise).resolves.toEqual({ reason: 'spawn ENOENT', status: 'rejected' });
|
||||
expect(child.stdin.write).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('appends --resume when resuming a session', () => {
|
||||
const child = makeFakeChild();
|
||||
spawnMock.mockReturnValue(child);
|
||||
|
||||
void spawnHeteroAgentRun({ ...baseParams, resumeSessionId: 'sess-9' });
|
||||
|
||||
const [, args] = spawnMock.mock.calls[0];
|
||||
expect(args).toContain('--resume');
|
||||
expect(args).toContain('sess-9');
|
||||
});
|
||||
|
||||
it('sends a content-block array to stdin when systemContext is provided', async () => {
|
||||
const child = makeFakeChild();
|
||||
spawnMock.mockReturnValue(child);
|
||||
|
||||
const ackPromise = spawnHeteroAgentRun({
|
||||
...baseParams,
|
||||
prompt: 'do it',
|
||||
systemContext: 'workspace rules',
|
||||
});
|
||||
child.emit('spawn');
|
||||
await ackPromise;
|
||||
|
||||
expect(child.stdin.write).toHaveBeenCalledWith(
|
||||
JSON.stringify([
|
||||
{ text: 'workspace rules', type: 'text' },
|
||||
{ text: 'do it', type: 'text' },
|
||||
]),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,130 @@
|
||||
import { spawn } from 'node:child_process';
|
||||
|
||||
export interface SpawnHeteroAgentRunParams {
|
||||
agentType: string;
|
||||
cwd?: string;
|
||||
jwt: string;
|
||||
operationId: string;
|
||||
prompt: string;
|
||||
resumeSessionId?: string;
|
||||
serverUrl: string;
|
||||
systemContext?: string;
|
||||
topicId: string;
|
||||
}
|
||||
|
||||
export interface AgentRunAckResult {
|
||||
reason?: string;
|
||||
status: 'accepted' | 'rejected';
|
||||
}
|
||||
|
||||
interface SpawnHeteroAgentRunLogger {
|
||||
error?: (msg: string) => void;
|
||||
info?: (msg: string) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn `lh hetero exec` for a gateway-dispatched agent run. Mirrors the
|
||||
* desktop app's `spawnLhHeteroExec`: the spawned CLI owns the full pipeline
|
||||
* (spawn -> adapt -> BatchIngester -> server ingest), so the connect daemon
|
||||
* needs no local stream handling — it only kicks off the process.
|
||||
*
|
||||
* Re-invokes the current CLI entry (`process.execPath` + `process.argv[1]`)
|
||||
* instead of relying on `lh` being on `PATH`, so it also works inside the
|
||||
* detached `lh connect --daemon` child where `PATH` may be minimal.
|
||||
*
|
||||
* Resolves only once the child's outcome is known: `accepted` on the `spawn`
|
||||
* event, `rejected` on an early `error`. `spawn()` reports failures (missing or
|
||||
* inaccessible `cwd`, etc.) asynchronously via `error`, so acking eagerly would
|
||||
* report a false success and leave the run with no process to emit
|
||||
* `heteroFinish` — surfacing as a stuck assistant message. A rejected ack
|
||||
* instead flows back as a dispatch failure the user can see.
|
||||
*/
|
||||
export function spawnHeteroAgentRun(
|
||||
params: SpawnHeteroAgentRunParams,
|
||||
logger?: SpawnHeteroAgentRunLogger,
|
||||
): Promise<AgentRunAckResult> {
|
||||
const {
|
||||
agentType,
|
||||
cwd,
|
||||
jwt,
|
||||
operationId,
|
||||
prompt,
|
||||
resumeSessionId,
|
||||
serverUrl,
|
||||
systemContext,
|
||||
topicId,
|
||||
} = params;
|
||||
const workDir = cwd ?? process.cwd();
|
||||
|
||||
// Server-ingest mode (--topic + --operation-id): events are batch-POSTed to
|
||||
// the server, not rendered. `--input-json -` reads the prompt from stdin.
|
||||
const cliArgs = [
|
||||
process.argv[1],
|
||||
'hetero',
|
||||
'exec',
|
||||
'--type',
|
||||
agentType,
|
||||
'--operation-id',
|
||||
operationId,
|
||||
'--topic',
|
||||
topicId,
|
||||
'--render',
|
||||
'none',
|
||||
'--input-json',
|
||||
'-',
|
||||
'--cwd',
|
||||
workDir,
|
||||
...(resumeSessionId ? ['--resume', resumeSessionId] : []),
|
||||
];
|
||||
|
||||
// With systemContext, send a content-block array so the agent sees the
|
||||
// context block first, then the user's actual prompt — mirrors the desktop
|
||||
// path. `lh hetero exec` coerces both shapes via coerceJsonPrompt.
|
||||
const stdinPayload = systemContext
|
||||
? JSON.stringify([
|
||||
{ text: systemContext, type: 'text' },
|
||||
{ text: prompt, type: 'text' },
|
||||
])
|
||||
: JSON.stringify(prompt);
|
||||
|
||||
return new Promise<AgentRunAckResult>((resolve) => {
|
||||
let settled = false;
|
||||
const settle = (result: AgentRunAckResult) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
resolve(result);
|
||||
};
|
||||
|
||||
const child = spawn(process.execPath, [...process.execArgv, ...cliArgs], {
|
||||
cwd: workDir,
|
||||
env: {
|
||||
...process.env,
|
||||
LOBEHUB_JWT: jwt,
|
||||
LOBEHUB_SERVER: serverUrl,
|
||||
},
|
||||
stdio: ['pipe', 'inherit', 'inherit'],
|
||||
});
|
||||
|
||||
child.once('spawn', () => {
|
||||
// Only safe to write stdin once the process actually started.
|
||||
try {
|
||||
child.stdin?.write(stdinPayload);
|
||||
child.stdin?.end();
|
||||
} catch (err) {
|
||||
logger?.error?.(
|
||||
`hetero exec stdin write failed (op=${operationId}): ${(err as Error).message}`,
|
||||
);
|
||||
}
|
||||
settle({ status: 'accepted' });
|
||||
});
|
||||
|
||||
child.once('error', (err) => {
|
||||
logger?.error?.(`hetero exec spawn failed (op=${operationId}): ${err.message}`);
|
||||
settle({ reason: err.message, status: 'rejected' });
|
||||
});
|
||||
|
||||
child.on('exit', (code, signal) => {
|
||||
logger?.info?.(`hetero exec exited (op=${operationId}) code=${code} signal=${signal}`);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { fromIpcErrorEnvelope, isIpcErrorEnvelope, toIpcErrorEnvelope } from './ipcError';
|
||||
|
||||
describe('ipcError envelope', () => {
|
||||
it('round-trips an Error and preserves its cause + code', () => {
|
||||
const cause = Object.assign(new Error('getaddrinfo ENOTFOUND example.com'), {
|
||||
code: 'ENOTFOUND',
|
||||
});
|
||||
const error = new TypeError('fetch failed', { cause });
|
||||
|
||||
const envelope = toIpcErrorEnvelope(error);
|
||||
expect(isIpcErrorEnvelope(envelope)).toBe(true);
|
||||
|
||||
const revived = fromIpcErrorEnvelope(envelope);
|
||||
expect(revived).toBeInstanceOf(Error);
|
||||
expect(revived.name).toBe('TypeError');
|
||||
expect(revived.message).toBe('fetch failed');
|
||||
|
||||
const revivedCause = revived.cause as Error & { code?: unknown };
|
||||
expect(revivedCause).toBeInstanceOf(Error);
|
||||
expect(revivedCause.message).toBe('getaddrinfo ENOTFOUND example.com');
|
||||
expect(revivedCause.code).toBe('ENOTFOUND');
|
||||
});
|
||||
|
||||
it('is clone-safe: the envelope survives structuredClone (the IPC boundary)', () => {
|
||||
const error = new Error('boom', { cause: new Error('root') });
|
||||
const envelope = toIpcErrorEnvelope(error);
|
||||
|
||||
const cloned = structuredClone(envelope);
|
||||
const revived = fromIpcErrorEnvelope(cloned);
|
||||
|
||||
expect(revived.message).toBe('boom');
|
||||
expect((revived.cause as Error).message).toBe('root');
|
||||
});
|
||||
|
||||
it('handles non-Error thrown values', () => {
|
||||
const revived = fromIpcErrorEnvelope(toIpcErrorEnvelope('plain string failure'));
|
||||
expect(revived.message).toBe('plain string failure');
|
||||
});
|
||||
|
||||
it('caps a deep / cyclic cause chain instead of recursing forever', () => {
|
||||
const a = new Error('a');
|
||||
const b = new Error('b', { cause: a });
|
||||
(a as { cause?: unknown }).cause = b; // cycle
|
||||
|
||||
// Should not throw (stack overflow) — depth is bounded.
|
||||
expect(() => toIpcErrorEnvelope(b)).not.toThrow();
|
||||
});
|
||||
|
||||
it('isIpcErrorEnvelope rejects plain values and look-alikes', () => {
|
||||
expect(isIpcErrorEnvelope(null)).toBe(false);
|
||||
expect(isIpcErrorEnvelope(undefined)).toBe(false);
|
||||
expect(isIpcErrorEnvelope('error')).toBe(false);
|
||||
expect(isIpcErrorEnvelope({ data: 'ok' })).toBe(false);
|
||||
expect(isIpcErrorEnvelope({ __lobeIpcError__: false })).toBe(false);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,86 @@
|
||||
/**
|
||||
* IPC error envelope.
|
||||
*
|
||||
* Electron's `ipcRenderer.invoke` rebuilds a thrown handler error from a
|
||||
* *string* on the renderer side (roughly `new Error("Error invoking remote
|
||||
* method '<channel>': " + String(mainError))`), so the original error object —
|
||||
* including a non-enumerable `cause` — never crosses the boundary. The real
|
||||
* failure reason (e.g. undici's `ENOTFOUND` / `ECONNREFUSED` hidden under a
|
||||
* generic `TypeError: fetch failed`) is therefore lost.
|
||||
*
|
||||
* To preserve it, the main process *returns* a clone-safe envelope (a plain
|
||||
* object) instead of throwing, and the preload `invoke` wrapper rebuilds a real
|
||||
* `Error` (with `cause`) from the envelope before re-throwing — keeping the
|
||||
* existing "promise rejects on failure" contract for every caller.
|
||||
*/
|
||||
|
||||
const IPC_ERROR_MARKER = '__lobeIpcError__';
|
||||
|
||||
/** Bound recursion on a deliberately malicious / cyclic `cause` chain. */
|
||||
const MAX_CAUSE_DEPTH = 5;
|
||||
|
||||
export interface SerializedIpcError {
|
||||
cause?: SerializedIpcError | string;
|
||||
/** Node/undici machine-readable reason (`ENOTFOUND`, `ECONNREFUSED`, …). */
|
||||
code?: unknown;
|
||||
message: string;
|
||||
name: string;
|
||||
stack?: string;
|
||||
}
|
||||
|
||||
export interface IpcErrorEnvelope {
|
||||
error: SerializedIpcError;
|
||||
[IPC_ERROR_MARKER]: true;
|
||||
}
|
||||
|
||||
const serializeError = (value: unknown, depth: number): SerializedIpcError => {
|
||||
if (value instanceof Error) {
|
||||
const serialized: SerializedIpcError = { message: value.message, name: value.name };
|
||||
|
||||
if (typeof value.stack === 'string') serialized.stack = value.stack;
|
||||
|
||||
const { code } = value as { code?: unknown };
|
||||
if (code !== undefined) serialized.code = code;
|
||||
|
||||
if (value.cause !== undefined && value.cause !== null && depth < MAX_CAUSE_DEPTH) {
|
||||
serialized.cause =
|
||||
value.cause instanceof Error ? serializeError(value.cause, depth + 1) : String(value.cause);
|
||||
}
|
||||
|
||||
return serialized;
|
||||
}
|
||||
|
||||
return { message: typeof value === 'string' ? value : String(value), name: 'Error' };
|
||||
};
|
||||
|
||||
/** Build a clone-safe envelope from a thrown value (main process). */
|
||||
export const toIpcErrorEnvelope = (value: unknown): IpcErrorEnvelope => ({
|
||||
[IPC_ERROR_MARKER]: true,
|
||||
error: serializeError(value, 0),
|
||||
});
|
||||
|
||||
/** Detect an envelope produced by {@link toIpcErrorEnvelope} (preload). */
|
||||
export const isIpcErrorEnvelope = (value: unknown): value is IpcErrorEnvelope =>
|
||||
typeof value === 'object' &&
|
||||
value !== null &&
|
||||
(value as Record<string, unknown>)[IPC_ERROR_MARKER] === true;
|
||||
|
||||
const reviveError = (serialized: SerializedIpcError): Error => {
|
||||
const cause =
|
||||
serialized.cause === undefined
|
||||
? undefined
|
||||
: typeof serialized.cause === 'string'
|
||||
? serialized.cause
|
||||
: reviveError(serialized.cause);
|
||||
|
||||
const error = new Error(serialized.message, cause === undefined ? undefined : { cause });
|
||||
error.name = serialized.name;
|
||||
if (serialized.stack !== undefined) error.stack = serialized.stack;
|
||||
if (serialized.code !== undefined) (error as { code?: unknown }).code = serialized.code;
|
||||
|
||||
return error;
|
||||
};
|
||||
|
||||
/** Rebuild a real `Error` (with `cause`) from an envelope (preload). */
|
||||
export const fromIpcErrorEnvelope = (envelope: IpcErrorEnvelope): Error =>
|
||||
reviveError(envelope.error);
|
||||
@@ -321,7 +321,9 @@ export default class AuthCtr extends ControllerModule {
|
||||
this.stopAutoRefresh();
|
||||
await this.remoteServerConfigCtr.clearTokens();
|
||||
await this.remoteServerConfigCtr.setRemoteServerConfig({ active: false });
|
||||
this.broadcastAuthorizationRequired();
|
||||
this.broadcastAuthorizationRequired(
|
||||
`auto-refresh:non_retryable ${result.error ?? ''}`.trim(),
|
||||
);
|
||||
} else {
|
||||
// For other errors (after retries exhausted), log but don't clear tokens immediately
|
||||
// The next refresh cycle will retry
|
||||
@@ -432,7 +434,7 @@ export default class AuthCtr extends ControllerModule {
|
||||
this.stopAutoRefresh();
|
||||
await this.remoteServerConfigCtr.clearTokens();
|
||||
await this.remoteServerConfigCtr.setRemoteServerConfig({ active: false });
|
||||
this.broadcastAuthorizationRequired();
|
||||
this.broadcastAuthorizationRequired(`refresh:non_retryable ${result.error ?? ''}`.trim());
|
||||
} else {
|
||||
// For transient errors, don't clear tokens - allow manual retry
|
||||
logger.warn('Refresh failed but error may be transient, tokens preserved for retry');
|
||||
@@ -450,7 +452,7 @@ export default class AuthCtr extends ControllerModule {
|
||||
this.stopAutoRefresh();
|
||||
await this.remoteServerConfigCtr.clearTokens();
|
||||
await this.remoteServerConfigCtr.setRemoteServerConfig({ active: false });
|
||||
this.broadcastAuthorizationRequired();
|
||||
this.broadcastAuthorizationRequired(`refresh:exception ${errorMessage}`);
|
||||
}
|
||||
|
||||
return { error: errorMessage, success: false };
|
||||
@@ -618,15 +620,17 @@ export default class AuthCtr extends ControllerModule {
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast authorization required event
|
||||
* Broadcast authorization required event.
|
||||
* `reason` is a short tag (e.g. `refresh:invalid_grant`, `startup:non_retryable`)
|
||||
* recorded so the renderer can log why the Session Expired modal appeared.
|
||||
*/
|
||||
private broadcastAuthorizationRequired() {
|
||||
logger.debug('Broadcasting authorizationRequired event to all windows');
|
||||
private broadcastAuthorizationRequired(reason: string) {
|
||||
logger.info(`Broadcasting authorizationRequired event (reason=${reason})`);
|
||||
const allWindows = BrowserWindow.getAllWindows();
|
||||
|
||||
for (const win of allWindows) {
|
||||
if (!win.isDestroyed()) {
|
||||
win.webContents.send('authorizationRequired');
|
||||
win.webContents.send('authorizationRequired', { reason });
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -751,7 +755,9 @@ export default class AuthCtr extends ControllerModule {
|
||||
logger.warn('Non-retryable error during proactive refresh, clearing tokens');
|
||||
await this.remoteServerConfigCtr.clearTokens();
|
||||
await this.remoteServerConfigCtr.setRemoteServerConfig({ active: false });
|
||||
this.broadcastAuthorizationRequired();
|
||||
this.broadcastAuthorizationRequired(
|
||||
`startup:non_retryable ${refreshResult.error ?? ''}`.trim(),
|
||||
);
|
||||
} else {
|
||||
// For transient errors, still start auto-refresh timer to retry later
|
||||
logger.warn('Transient error during proactive refresh, will retry via auto-refresh');
|
||||
|
||||
@@ -41,6 +41,33 @@ import { createLogger } from '@/utils/logger';
|
||||
import { ControllerModule, IpcMethod } from './index';
|
||||
|
||||
const logger = createLogger('controllers:HeterogeneousAgentCtr');
|
||||
|
||||
// Anthropic auth env vars that must NOT be inherited from the desktop process
|
||||
// when spawning a local CLI agent. A developer with `ANTHROPIC_API_KEY` (or an
|
||||
// auth token / base url) exported in their shell would otherwise have it
|
||||
// forwarded to `claude`, which then switches from its own subscription login to
|
||||
// that key — an expired / wrong key surfaces as a baffling "Invalid API key"
|
||||
// and the run exits non-zero. Agents that genuinely want an API key still set
|
||||
// it through `session.env`, which is spread AFTER the inherited env below and
|
||||
// therefore wins.
|
||||
const STRIPPED_INHERITED_ENV_KEYS = [
|
||||
'ANTHROPIC_API_KEY',
|
||||
'ANTHROPIC_AUTH_TOKEN',
|
||||
'ANTHROPIC_BASE_URL',
|
||||
] as const;
|
||||
|
||||
/**
|
||||
* Inherited `process.env` with the Anthropic auth vars removed. Keep this pure
|
||||
* and exported so the "never leak host Anthropic creds into the CLI" invariant
|
||||
* can be unit-tested directly.
|
||||
*/
|
||||
export const buildInheritedSpawnEnv = (
|
||||
sourceEnv: NodeJS.ProcessEnv = process.env,
|
||||
): NodeJS.ProcessEnv => {
|
||||
const env = { ...sourceEnv };
|
||||
for (const key of STRIPPED_INHERITED_ENV_KEYS) delete env[key];
|
||||
return env;
|
||||
};
|
||||
const CODEX_RESUME_THREAD_NOT_FOUND_PATTERNS = [
|
||||
/no conversation found/i,
|
||||
/thread .*not found/i,
|
||||
@@ -920,7 +947,10 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
const spawnOptions = {
|
||||
cwd,
|
||||
detached: process.platform !== 'win32',
|
||||
env: { ...process.env, ...proxyEnv, ...session.env },
|
||||
// Strip host Anthropic creds from the inherited env so a developer's
|
||||
// shell `ANTHROPIC_API_KEY` can't hijack the CLI's own auth. `session.env`
|
||||
// is spread last, so an agent that explicitly configures a key still wins.
|
||||
env: { ...buildInheritedSpawnEnv(), ...proxyEnv, ...session.env },
|
||||
stdio: [useStdin ? 'pipe' : 'ignore', 'pipe', 'pipe'] as ['pipe' | 'ignore', 'pipe', 'pipe'],
|
||||
};
|
||||
|
||||
@@ -1308,6 +1338,14 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
} = params;
|
||||
const workDir = cwd ?? process.cwd();
|
||||
|
||||
// When CLI tracing is enabled (dev builds, or the Help-menu toggle in
|
||||
// packaged builds), have `lh hetero exec` persist the agent process's RAW
|
||||
// stream-json (pre-adapter) on this device. The remote-device path
|
||||
// otherwise leaves no local record — the CLI consumes stdout internally and
|
||||
// only POSTs adapted events to the server — so without this there's nothing
|
||||
// to inspect when a remote run misbehaves.
|
||||
const rawDumpDir = this.shouldTraceCliOutput ? this.resolveTraceRootDir(workDir) : undefined;
|
||||
|
||||
const args = [
|
||||
'hetero',
|
||||
'exec',
|
||||
@@ -1324,6 +1362,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
'--cwd',
|
||||
workDir,
|
||||
...(resumeSessionId ? ['--resume', resumeSessionId] : []),
|
||||
...(rawDumpDir ? ['--raw-dump', rawDumpDir] : []),
|
||||
];
|
||||
|
||||
const env = {
|
||||
|
||||
@@ -797,7 +797,12 @@ describe('AuthCtr', () => {
|
||||
expect(mockRemoteServerConfigCtr.setRemoteServerConfig).toHaveBeenCalledWith({
|
||||
active: false,
|
||||
});
|
||||
expect(mockWindow.webContents.send).toHaveBeenCalledWith('authorizationRequired');
|
||||
expect(mockWindow.webContents.send).toHaveBeenCalledWith(
|
||||
'authorizationRequired',
|
||||
expect.objectContaining({
|
||||
reason: expect.stringContaining('startup:non_retryable'),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should preserve tokens on transient error', async () => {
|
||||
|
||||
@@ -313,6 +313,53 @@ describe('HeterogeneousAgentCtr', () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it('does not leak host Anthropic auth env into the spawned CLI', async () => {
|
||||
// A developer with these exported in their shell would otherwise have them
|
||||
// forwarded to `claude`, overriding its subscription login and surfacing
|
||||
// as a baffling "Invalid API key" / non-zero exit. Regression guard for
|
||||
// that env-leak.
|
||||
const original = {
|
||||
ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY,
|
||||
ANTHROPIC_AUTH_TOKEN: process.env.ANTHROPIC_AUTH_TOKEN,
|
||||
ANTHROPIC_BASE_URL: process.env.ANTHROPIC_BASE_URL,
|
||||
};
|
||||
process.env.ANTHROPIC_API_KEY = 'sk-host-should-not-leak';
|
||||
process.env.ANTHROPIC_AUTH_TOKEN = 'host-token-should-not-leak';
|
||||
process.env.ANTHROPIC_BASE_URL = 'https://host.example/should-not-leak';
|
||||
|
||||
try {
|
||||
const { options } = await runSendPrompt('hello');
|
||||
|
||||
expect(options.env).not.toHaveProperty('ANTHROPIC_API_KEY');
|
||||
expect(options.env).not.toHaveProperty('ANTHROPIC_AUTH_TOKEN');
|
||||
expect(options.env).not.toHaveProperty('ANTHROPIC_BASE_URL');
|
||||
// Unrelated inherited vars must still pass through.
|
||||
expect(options.env.PATH).toBe(process.env.PATH);
|
||||
} finally {
|
||||
for (const [key, value] of Object.entries(original)) {
|
||||
if (value === undefined) delete process.env[key];
|
||||
else process.env[key] = value;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it('lets an agent-configured Anthropic key in session.env override the stripped host env', async () => {
|
||||
const originalKey = process.env.ANTHROPIC_API_KEY;
|
||||
process.env.ANTHROPIC_API_KEY = 'sk-host-should-not-leak';
|
||||
|
||||
try {
|
||||
const { options } = await runSendPrompt('hello', {
|
||||
env: { ANTHROPIC_API_KEY: 'sk-agent-explicit' },
|
||||
});
|
||||
|
||||
// Explicit per-agent config wins; the host value is never seen.
|
||||
expect(options.env.ANTHROPIC_API_KEY).toBe('sk-agent-explicit');
|
||||
} finally {
|
||||
if (originalKey === undefined) delete process.env.ANTHROPIC_API_KEY;
|
||||
else process.env.ANTHROPIC_API_KEY = originalKey;
|
||||
}
|
||||
});
|
||||
|
||||
it('captures the Claude Code session id from stream-json init events', async () => {
|
||||
const { ctr, sessionId } = await runSendPrompt('hello', {}, [
|
||||
`${JSON.stringify({ session_id: 'sess_cc_123', subtype: 'init', type: 'system' })}\n`,
|
||||
|
||||
@@ -32,22 +32,30 @@ export class BackendProxyProtocolManager {
|
||||
private readonly logger = createLogger('core:BackendProxyProtocolManager');
|
||||
|
||||
private authRequiredDebounceTimer: NodeJS.Timeout | null = null;
|
||||
private pendingAuthRequiredReason: string | null = null;
|
||||
private static readonly AUTH_REQUIRED_DEBOUNCE_MS = 1000;
|
||||
|
||||
private notifyAuthorizationRequired() {
|
||||
private notifyAuthorizationRequired(reason: string) {
|
||||
// Trailing-edge debounce: coalesce rapid 401 bursts and fire AFTER the burst settles.
|
||||
// This ensures the IPC event is sent after the renderer has had time to mount listeners.
|
||||
// The most recent reason wins — within a burst they almost always describe the same cause.
|
||||
this.pendingAuthRequiredReason = reason;
|
||||
|
||||
if (this.authRequiredDebounceTimer) {
|
||||
clearTimeout(this.authRequiredDebounceTimer);
|
||||
}
|
||||
|
||||
this.authRequiredDebounceTimer = setTimeout(() => {
|
||||
this.authRequiredDebounceTimer = null;
|
||||
const finalReason = this.pendingAuthRequiredReason ?? reason;
|
||||
this.pendingAuthRequiredReason = null;
|
||||
|
||||
this.logger.info(`Broadcasting authorizationRequired (reason=${finalReason})`);
|
||||
|
||||
const allWindows = BrowserWindow.getAllWindows();
|
||||
for (const win of allWindows) {
|
||||
if (!win.isDestroyed()) {
|
||||
win.webContents.send('authorizationRequired');
|
||||
win.webContents.send('authorizationRequired', { reason: finalReason });
|
||||
}
|
||||
}
|
||||
}, BackendProxyProtocolManager.AUTH_REQUIRED_DEBOUNCE_MS);
|
||||
@@ -196,7 +204,32 @@ export class BackendProxyProtocolManager {
|
||||
// Other failures keep 401 without this header (e.g., invalid API keys) and must not notify here.
|
||||
const authRequired = upstreamResponse.headers.get(AUTH_REQUIRED_HEADER) === 'true';
|
||||
if (authRequired) {
|
||||
this.notifyAuthorizationRequired();
|
||||
const pathTag = (() => {
|
||||
try {
|
||||
return new URL(rewrittenUrl).pathname;
|
||||
} catch {
|
||||
return rewrittenUrl;
|
||||
}
|
||||
})();
|
||||
const sourceTag = context.source ? `${context.source}:` : '';
|
||||
const wwwAuth = upstreamResponse.headers.get('www-authenticate') ?? '';
|
||||
// Clone before forwarding the body downstream — the original stream stays
|
||||
// intact for the renderer. Body snippet is truncated to keep logs small
|
||||
// and to avoid leaking large payloads if the server ever returns one.
|
||||
let bodySnippet: string;
|
||||
try {
|
||||
bodySnippet = (await upstreamResponse.clone().text()).slice(0, 300).replaceAll(/\s+/g, ' ');
|
||||
} catch (error) {
|
||||
bodySnippet = `<body-read-failed:${error instanceof Error ? error.message : 'unknown'}>`;
|
||||
}
|
||||
const parts = [
|
||||
`proxy:${sourceTag}status=${upstreamResponse.status}`,
|
||||
`${request.method} ${pathTag}`,
|
||||
`hadToken=${Boolean(token)}`,
|
||||
];
|
||||
if (wwwAuth) parts.push(`wwwAuth=${wwwAuth}`);
|
||||
if (bodySnippet) parts.push(`body=${bodySnippet}`);
|
||||
this.notifyAuthorizationRequired(parts.join(' '));
|
||||
}
|
||||
|
||||
return new Response(upstreamResponse.body, {
|
||||
|
||||
+57
-1
@@ -258,7 +258,63 @@ describe('BackendProxyProtocolManager', () => {
|
||||
|
||||
expect(send).not.toHaveBeenCalled();
|
||||
await vi.advanceTimersByTimeAsync(1000);
|
||||
expect(send).toHaveBeenCalledWith('authorizationRequired');
|
||||
expect(send).toHaveBeenCalledWith(
|
||||
'authorizationRequired',
|
||||
expect.objectContaining({
|
||||
reason: expect.stringContaining('status=207'),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('captures www-authenticate, body snippet and hadToken in reason on 401', async () => {
|
||||
vi.useFakeTimers();
|
||||
const send = vi.fn();
|
||||
vi.mocked(BrowserWindow.getAllWindows).mockReturnValue([
|
||||
{ isDestroyed: () => false, webContents: { send } },
|
||||
] as any);
|
||||
|
||||
const manager = new BackendProxyProtocolManager();
|
||||
const session = {} as any;
|
||||
|
||||
const upstreamBody = JSON.stringify({
|
||||
error: { json: { data: { code: 'UNAUTHORIZED' }, message: 'token expired at 2026-06-09' } },
|
||||
});
|
||||
const headers = new Headers({
|
||||
[AUTH_REQUIRED_HEADER]: 'true',
|
||||
'Content-Type': 'application/json',
|
||||
'www-authenticate': 'Bearer error="invalid_token", error_description="expired"',
|
||||
});
|
||||
const fetchMock = vi.fn<FetchMock>(
|
||||
async () => new Response(upstreamBody, { headers, status: 401, statusText: 'Unauthorized' }),
|
||||
);
|
||||
vi.stubGlobal('fetch', fetchMock as any);
|
||||
|
||||
manager.registerWithRemoteBaseUrl(session, {
|
||||
getAccessToken: async () => 'fake-token',
|
||||
getRemoteBaseUrl: async () => 'https://remote.example.com',
|
||||
});
|
||||
|
||||
const response = await manager.proxy(
|
||||
{
|
||||
headers: new Headers(),
|
||||
method: 'POST',
|
||||
url: 'app://renderer/trpc/lambda/me',
|
||||
} as any,
|
||||
session,
|
||||
);
|
||||
|
||||
// Original body is still readable by the downstream caller — clone() must not consume it.
|
||||
expect(await response!.text()).toBe(upstreamBody);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1000);
|
||||
expect(send).toHaveBeenCalledTimes(1);
|
||||
const [, payload] = send.mock.calls[0];
|
||||
expect(payload.reason).toContain('status=401');
|
||||
expect(payload.reason).toContain('POST /trpc/lambda/me');
|
||||
expect(payload.reason).toContain('hadToken=true');
|
||||
expect(payload.reason).toContain('wwwAuth=Bearer error="invalid_token"');
|
||||
expect(payload.reason).toContain('UNAUTHORIZED');
|
||||
expect(payload.reason).toContain('token expired');
|
||||
});
|
||||
|
||||
describe('createAppRequestInterceptor', () => {
|
||||
|
||||
@@ -3,6 +3,8 @@ import { AsyncLocalStorage } from 'node:async_hooks';
|
||||
import type { IpcMainInvokeEvent, WebContents } from 'electron';
|
||||
import { ipcMain } from 'electron';
|
||||
|
||||
import { toIpcErrorEnvelope } from '~common/ipcError';
|
||||
|
||||
// Base context for IPC methods
|
||||
export interface IpcContext {
|
||||
event: IpcMainInvokeEvent;
|
||||
@@ -63,7 +65,11 @@ export class IpcHandler {
|
||||
return await handler(...typedArgs);
|
||||
} catch (error) {
|
||||
console.error(`Error in IPC method ${channel}:`, error);
|
||||
throw error;
|
||||
// Return a clone-safe envelope rather than throwing: Electron rebuilds
|
||||
// a thrown handler error from its string form, dropping `cause` and
|
||||
// other structured fields. The preload `invoke` wrapper rebuilds a
|
||||
// real Error from the envelope and re-throws it. See `~common/ipcError`.
|
||||
return toIpcErrorEnvelope(error);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -62,6 +62,30 @@ describe('invoke', () => {
|
||||
expect(mockIpcRendererInvoke).toHaveBeenCalledWith('system.getAppVersion');
|
||||
});
|
||||
|
||||
it('should rebuild and throw a real Error (with cause) from a main-process error envelope', async () => {
|
||||
mockIpcRendererInvoke.mockResolvedValue({
|
||||
__lobeIpcError__: true,
|
||||
error: {
|
||||
cause: { code: 'ENOTFOUND', message: 'getaddrinfo ENOTFOUND example.com', name: 'Error' },
|
||||
message: 'fetch failed',
|
||||
name: 'TypeError',
|
||||
},
|
||||
});
|
||||
|
||||
await expect(invoke('heterogeneousAgent.sendPrompt')).rejects.toMatchObject({
|
||||
cause: { code: 'ENOTFOUND', message: 'getaddrinfo ENOTFOUND example.com' },
|
||||
message: 'fetch failed',
|
||||
name: 'TypeError',
|
||||
});
|
||||
});
|
||||
|
||||
it('should not treat a plain object result as an error envelope', async () => {
|
||||
const result = { __lobeIpcError__: false, data: 'ok' };
|
||||
mockIpcRendererInvoke.mockResolvedValue(result);
|
||||
|
||||
await expect(invoke('someEvent')).resolves.toEqual(result);
|
||||
});
|
||||
|
||||
it('should handle ipcRenderer returning undefined', async () => {
|
||||
mockIpcRendererInvoke.mockResolvedValue(undefined);
|
||||
|
||||
|
||||
@@ -1,8 +1,24 @@
|
||||
import { ipcRenderer } from 'electron';
|
||||
|
||||
import { fromIpcErrorEnvelope, isIpcErrorEnvelope } from '~common/ipcError';
|
||||
|
||||
type IpcInvoke = <T = unknown>(event: string, ...data: unknown[]) => Promise<T>;
|
||||
|
||||
/**
|
||||
* Client-side method to invoke electron main process
|
||||
* Client-side method to invoke electron main process.
|
||||
*
|
||||
* The main-process handler returns an error envelope instead of throwing (see
|
||||
* `~common/ipcError`), so structured failure detail — notably `cause` — isn't
|
||||
* flattened away by Electron's thrown-error serialization. Rebuild the real
|
||||
* Error here and re-throw it, preserving the "promise rejects on failure"
|
||||
* contract every caller already relies on.
|
||||
*/
|
||||
export const invoke: IpcInvoke = async (event, ...data) => ipcRenderer.invoke(event, ...data);
|
||||
export const invoke: IpcInvoke = async (event, ...data) => {
|
||||
const result = await ipcRenderer.invoke(event, ...data);
|
||||
|
||||
if (isIpcErrorEnvelope(result)) {
|
||||
throw fromIpcErrorEnvelope(result);
|
||||
}
|
||||
|
||||
return result as never;
|
||||
};
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"name": "@lobechat/server",
|
||||
"version": "0.0.0",
|
||||
"private": true,
|
||||
"scripts": {
|
||||
"type-check": "tsc --noEmit"
|
||||
}
|
||||
}
|
||||
+7
-7
@@ -17,24 +17,23 @@ const log = debug('lobe-server:agent-runtime:coordinator');
|
||||
* decision) starts, but that resume runs under a **new** operationId with
|
||||
* its own event stream. For the paused operationId no further events will
|
||||
* arrive, so clients should stop waiting the same way they do on done.
|
||||
*
|
||||
* `waiting_for_async_tool` is different: deferred tools such as server
|
||||
* sub-agents resume the SAME operationId after the out-of-band result is
|
||||
* backfilled. Ending the stream at park time makes the client mark the turn
|
||||
* as stopped while the server is still waiting for sub-agents.
|
||||
*/
|
||||
const STREAM_END_STATUSES = new Set<AgentState['status']>([
|
||||
'done',
|
||||
'error',
|
||||
'interrupted',
|
||||
'waiting_for_human',
|
||||
'waiting_for_async_tool',
|
||||
]);
|
||||
|
||||
const hasEnteredStreamEndState = (
|
||||
previousStatus?: AgentState['status'],
|
||||
nextStatus?: AgentState['status'],
|
||||
): nextStatus is
|
||||
| 'done'
|
||||
| 'error'
|
||||
| 'interrupted'
|
||||
| 'waiting_for_human'
|
||||
| 'waiting_for_async_tool' => {
|
||||
): nextStatus is 'done' | 'error' | 'interrupted' | 'waiting_for_human' => {
|
||||
const wasStreamEnd = previousStatus ? STREAM_END_STATUSES.has(previousStatus) : false;
|
||||
return Boolean(nextStatus && STREAM_END_STATUSES.has(nextStatus) && !wasStreamEnd);
|
||||
};
|
||||
@@ -94,6 +93,7 @@ export class AgentRuntimeCoordinator {
|
||||
agentConfig?: any;
|
||||
modelRuntimeConfig?: any;
|
||||
userId?: string;
|
||||
workspaceId?: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
try {
|
||||
+11
@@ -27,6 +27,13 @@ export interface AgentOperationMetadata {
|
||||
totalCost: number;
|
||||
totalSteps: number;
|
||||
userId?: string;
|
||||
/**
|
||||
* Workspace the operation runs in (null/undefined = personal). Persisted so
|
||||
* queue workers (e.g. QStash `runStep`) can reconstruct a workspace-scoped
|
||||
* runtime; without it the runtime is personal-scoped and message/topic
|
||||
* lookups miss workspace-scoped rows.
|
||||
*/
|
||||
workspaceId?: string;
|
||||
}
|
||||
|
||||
export class AgentStateManager {
|
||||
@@ -194,6 +201,7 @@ export class AgentStateManager {
|
||||
totalCost: parseFloat(metadata.totalCost) || 0,
|
||||
totalSteps: parseInt(metadata.totalSteps) || 0,
|
||||
userId: metadata.userId,
|
||||
workspaceId: metadata.workspaceId,
|
||||
};
|
||||
} catch (error) {
|
||||
console.error('Failed to get operation metadata:', error);
|
||||
@@ -210,6 +218,7 @@ export class AgentStateManager {
|
||||
agentConfig?: any;
|
||||
modelRuntimeConfig?: any;
|
||||
userId?: string;
|
||||
workspaceId?: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
const metaKey = `${this.METADATA_PREFIX}:${operationId}`;
|
||||
@@ -224,6 +233,7 @@ export class AgentStateManager {
|
||||
totalCost: 0,
|
||||
totalSteps: 0,
|
||||
userId: data.userId,
|
||||
workspaceId: data.workspaceId,
|
||||
};
|
||||
|
||||
// Serialize complex objects
|
||||
@@ -236,6 +246,7 @@ export class AgentStateManager {
|
||||
};
|
||||
|
||||
if (metadata.userId) redisData.userId = metadata.userId;
|
||||
if (metadata.workspaceId) redisData.workspaceId = metadata.workspaceId;
|
||||
if (metadata.modelRuntimeConfig)
|
||||
redisData.modelRuntimeConfig = JSON.stringify(metadata.modelRuntimeConfig);
|
||||
if (metadata.agentConfig) redisData.agentConfig = JSON.stringify(metadata.agentConfig);
|
||||
+2
@@ -122,6 +122,7 @@ export class InMemoryAgentStateManager implements IAgentStateManager {
|
||||
agentConfig?: any;
|
||||
modelRuntimeConfig?: any;
|
||||
userId?: string;
|
||||
workspaceId?: string;
|
||||
},
|
||||
): Promise<void> {
|
||||
const metadata: AgentOperationMetadata = {
|
||||
@@ -133,6 +134,7 @@ export class InMemoryAgentStateManager implements IAgentStateManager {
|
||||
totalCost: 0,
|
||||
totalSteps: 0,
|
||||
userId: data.userId,
|
||||
workspaceId: data.workspaceId,
|
||||
};
|
||||
|
||||
this.metadata.set(operationId, metadata);
|
||||
+234
-34
@@ -24,6 +24,7 @@ import { BRANDING_PROVIDER } from '@lobechat/business-const';
|
||||
import { KLAVIS_SERVER_TYPES } from '@lobechat/const';
|
||||
import {
|
||||
type AgentContextDocument,
|
||||
type AgentGroupConfig,
|
||||
type BotPlatformContext,
|
||||
buildStepSkillDelta,
|
||||
buildStepToolDelta,
|
||||
@@ -59,7 +60,7 @@ import {
|
||||
import { chainCompressContext } from '@lobechat/prompts';
|
||||
import {
|
||||
type ChatToolPayload,
|
||||
type ExecSubAgentTaskParams,
|
||||
type ExecSubAgentParams,
|
||||
type MessageToolCall,
|
||||
type UIChatMessage,
|
||||
} from '@lobechat/types';
|
||||
@@ -131,6 +132,38 @@ const LLM_RETRY_MAX_DELAY_MS = 30_000;
|
||||
*/
|
||||
const EMPTY_COMPLETION_MAX_RETRIES = 2;
|
||||
|
||||
const buildBotAgentGroupContext = (params: {
|
||||
agentConfig?: any;
|
||||
agentId?: string;
|
||||
botContext?: unknown;
|
||||
}): AgentGroupConfig | undefined => {
|
||||
if (!params.botContext || !params.agentId) return undefined;
|
||||
|
||||
const title = params.agentConfig?.title;
|
||||
const description = params.agentConfig?.description;
|
||||
const name = typeof title === 'string' && title.trim() ? title.trim() : 'Current Agent';
|
||||
|
||||
return {
|
||||
agentMap: {
|
||||
[params.agentId]: {
|
||||
name,
|
||||
role: 'participant',
|
||||
},
|
||||
},
|
||||
currentAgentId: params.agentId,
|
||||
currentAgentName: name,
|
||||
currentAgentRole: 'participant',
|
||||
members: [
|
||||
{
|
||||
id: params.agentId,
|
||||
name,
|
||||
role: 'participant',
|
||||
},
|
||||
],
|
||||
systemPrompt: typeof description === 'string' ? description : undefined,
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
* Output-token count at or below this — combined with no content, reasoning,
|
||||
* tool calls, or images — marks a turn as an empty completion.
|
||||
@@ -193,6 +226,7 @@ const archiveRuntimeToolResult = async (
|
||||
toolCallId,
|
||||
topicId,
|
||||
userId,
|
||||
workspaceId,
|
||||
}: {
|
||||
agentId?: string | null;
|
||||
identifier?: string;
|
||||
@@ -201,6 +235,7 @@ const archiveRuntimeToolResult = async (
|
||||
toolCallId?: string;
|
||||
topicId?: string | null;
|
||||
userId?: string;
|
||||
workspaceId?: string;
|
||||
},
|
||||
): Promise<ToolExecutionResultResponse> => {
|
||||
const archive = await archiveToolResultIfNeeded({
|
||||
@@ -212,6 +247,7 @@ const archiveRuntimeToolResult = async (
|
||||
toolCallId,
|
||||
topicId,
|
||||
userId,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
return archive.content === result.content ? result : { ...result, content: archive.content };
|
||||
@@ -226,11 +262,13 @@ const archiveRuntimeToolResult = async (
|
||||
// FileService is constructed lazily so environments without S3 config (unit
|
||||
// tests) don't fail at context-build time; failure returns undefined, which
|
||||
// leaves URLs as raw keys — same behavior as before this helper existed.
|
||||
const buildPostProcessUrl = (ctx: Pick<RuntimeExecutorContext, 'serverDB' | 'userId'>) => {
|
||||
const buildPostProcessUrl = (
|
||||
ctx: Pick<RuntimeExecutorContext, 'serverDB' | 'userId' | 'workspaceId'>,
|
||||
) => {
|
||||
if (!ctx.userId || !ctx.serverDB) return undefined;
|
||||
let fileService: FileService | undefined;
|
||||
try {
|
||||
fileService = new FileService(ctx.serverDB, ctx.userId);
|
||||
fileService = new FileService(ctx.serverDB, ctx.userId, ctx.workspaceId);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
@@ -248,7 +286,7 @@ const buildPostProcessUrl = (ctx: Pick<RuntimeExecutorContext, 'serverDB' | 'use
|
||||
* isolation thread (so the UI shows a loading state and the completion bridge
|
||||
* has a message to backfill), then kicks off the child op asynchronously and
|
||||
* returns immediately. Returns `undefined` when sub-agent execution is not
|
||||
* available (no `execSubAgentTask` callback, or missing agent/topic context).
|
||||
* available (no `execSubAgent` callback, or missing agent/topic context).
|
||||
*/
|
||||
const buildServerSubAgentRunner = (
|
||||
ctx: RuntimeExecutorContext,
|
||||
@@ -256,8 +294,8 @@ const buildServerSubAgentRunner = (
|
||||
chatToolPayload: ChatToolPayload,
|
||||
parentMessageId: string,
|
||||
): ServerSubAgentRunner | undefined => {
|
||||
const execSubAgentTask = ctx.execSubAgentTask;
|
||||
if (!execSubAgentTask) return undefined;
|
||||
const execSubAgent = ctx.execSubAgent;
|
||||
if (!execSubAgent) return undefined;
|
||||
|
||||
const agentId = state.metadata?.agentId;
|
||||
const topicId = ctx.topicId ?? state.metadata?.topicId;
|
||||
@@ -281,9 +319,9 @@ const buildServerSubAgentRunner = (
|
||||
});
|
||||
|
||||
// 2. Fork the child op anchored to the placeholder. `resumeParentOnComplete`
|
||||
// tells execSubAgentTask to register the completion bridge that
|
||||
// tells execSubAgent to register the completion bridge that
|
||||
// backfills this tool message and resumes the parent op.
|
||||
const result = (await execSubAgentTask({
|
||||
const result = (await execSubAgent({
|
||||
agentId: targetAgentId ?? agentId,
|
||||
groupId: state.metadata?.groupId ?? undefined,
|
||||
instruction,
|
||||
@@ -315,11 +353,25 @@ const buildServerSubAgentRunner = (
|
||||
started: true,
|
||||
subOperationId: result?.operationId,
|
||||
threadId: result?.threadId ?? '',
|
||||
toolMessageId: placeholder.id,
|
||||
};
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const getDeferredToolMessageId = (result: ToolExecutionResultResponse): string | undefined => {
|
||||
const toolMessageId = result.state?.toolMessageId;
|
||||
return typeof toolMessageId === 'string' ? toolMessageId : undefined;
|
||||
};
|
||||
|
||||
const withDeferredToolResultMessageId = (
|
||||
tool: ChatToolPayload,
|
||||
result: ToolExecutionResultResponse,
|
||||
): ChatToolPayload => {
|
||||
const resultMessageId = getDeferredToolMessageId(result);
|
||||
return resultMessageId ? { ...tool, result_msg_id: resultMessageId } : tool;
|
||||
};
|
||||
|
||||
const shouldRetryLLM = (kind: LLMErrorKind, attempt: number, maxRetries: number) =>
|
||||
kind === 'retry' && attempt <= maxRetries;
|
||||
|
||||
@@ -433,6 +485,7 @@ const buildToolDiscoveryConfig = (operationToolSet: OperationToolSet, enabledToo
|
||||
|
||||
export interface RuntimeExecutorContext {
|
||||
agentConfig?: any;
|
||||
botContext?: unknown;
|
||||
botPlatformContext?: BotPlatformContext;
|
||||
discordContext?: any;
|
||||
evalContext?: EvalContext;
|
||||
@@ -441,7 +494,7 @@ export interface RuntimeExecutorContext {
|
||||
* Injected by AiAgentService so exec_sub_agent / exec_sub_agents executors
|
||||
* can dispatch callAgent-triggered tasks without a circular import.
|
||||
*/
|
||||
execSubAgentTask?: (params: ExecSubAgentTaskParams) => Promise<unknown>;
|
||||
execSubAgent?: (params: ExecSubAgentParams) => Promise<unknown>;
|
||||
hookDispatcher?: HookDispatcher;
|
||||
loadAgentState?: (operationId: string) => Promise<AgentState | null>;
|
||||
messageModel: MessageModel;
|
||||
@@ -466,6 +519,13 @@ export interface RuntimeExecutorContext {
|
||||
tracingContextEngine?: (input: unknown, output: unknown) => void;
|
||||
userId?: string;
|
||||
userTimezone?: string;
|
||||
/**
|
||||
* Workspace scoping for ownership filters on models/services constructed
|
||||
* inside the agent runtime. Threaded down from the originating request
|
||||
* (chat/task router) and forwarded to tool executions via
|
||||
* `ToolExecutionContext.workspaceId`.
|
||||
*/
|
||||
workspaceId?: string;
|
||||
}
|
||||
|
||||
export const createRuntimeExecutors = (
|
||||
@@ -614,6 +674,8 @@ export const createRuntimeExecutors = (
|
||||
|
||||
try {
|
||||
type ContentPart = { text: string; type: 'text' } | { image: string; type: 'image' };
|
||||
let shouldPersistAssistantReasoning = false;
|
||||
let preserveThinkingForPayload: boolean | undefined;
|
||||
|
||||
// Process messages through serverMessagesEngine to inject system role, knowledge, etc.
|
||||
// Rebuild params from agentConfig at execution time (capabilities built dynamically)
|
||||
@@ -623,6 +685,41 @@ export const createRuntimeExecutors = (
|
||||
const { loadModels } = await import('@/business/client/model-bank/loadModels');
|
||||
const builtinModels = await loadModels();
|
||||
|
||||
const preserveThinkingConfigured =
|
||||
typeof agentConfig.chatConfig?.preserveThinking === 'boolean'
|
||||
? agentConfig.chatConfig.preserveThinking
|
||||
: undefined;
|
||||
const preserveThinkingRequested = preserveThinkingConfigured === true;
|
||||
|
||||
const modelCard = builtinModels.find(
|
||||
(item) =>
|
||||
item.providerId === provider &&
|
||||
(item.id === model || item.config?.deploymentName === model),
|
||||
);
|
||||
const modelExtendParams =
|
||||
modelCard &&
|
||||
'settings' in modelCard &&
|
||||
modelCard.settings &&
|
||||
typeof modelCard.settings === 'object' &&
|
||||
'extendParams' in modelCard.settings
|
||||
? (modelCard.settings as { extendParams?: string[] }).extendParams
|
||||
: undefined;
|
||||
|
||||
const modelSupportsPreserveThinkingFromCard =
|
||||
Array.isArray(modelExtendParams) && modelExtendParams.includes('preserveThinking');
|
||||
const providerSupportsPreserveThinkingFallback =
|
||||
provider === 'qwen' || provider === 'zhipu';
|
||||
const modelSupportsPreserveThinking =
|
||||
modelSupportsPreserveThinkingFromCard ||
|
||||
(!modelCard && providerSupportsPreserveThinkingFallback);
|
||||
|
||||
shouldPersistAssistantReasoning =
|
||||
preserveThinkingRequested && modelSupportsPreserveThinking;
|
||||
preserveThinkingForPayload =
|
||||
modelSupportsPreserveThinking && typeof preserveThinkingConfigured === 'boolean'
|
||||
? preserveThinkingConfigured
|
||||
: undefined;
|
||||
|
||||
// Extract <refer_topic> tags from messages and fetch summaries.
|
||||
// Skip if messages already contain injected topic_reference_context
|
||||
// (e.g., from client-side contextEngineering preprocessing) to avoid double injection.
|
||||
@@ -634,8 +731,8 @@ export const createRuntimeExecutors = (
|
||||
);
|
||||
|
||||
if (!alreadyHasTopicRefs && ctx.serverDB && ctx.userId) {
|
||||
const topicModel = new TopicModel(ctx.serverDB, ctx.userId);
|
||||
const messageModel = new MessageModelClass(ctx.serverDB, ctx.userId);
|
||||
const topicModel = new TopicModel(ctx.serverDB, ctx.userId, ctx.workspaceId);
|
||||
const messageModel = new MessageModelClass(ctx.serverDB, ctx.userId, ctx.workspaceId);
|
||||
topicReferences = await resolveTopicReferences(
|
||||
llmPayload.messages as Array<{ content: string | unknown }>,
|
||||
async (topicId) => topicModel.findById(topicId),
|
||||
@@ -658,7 +755,11 @@ export const createRuntimeExecutors = (
|
||||
const agentId = state.metadata?.agentId;
|
||||
if (agentId && ctx.serverDB && ctx.userId) {
|
||||
try {
|
||||
const agentDocService = new AgentDocumentsService(ctx.serverDB, ctx.userId);
|
||||
const agentDocService = new AgentDocumentsService(
|
||||
ctx.serverDB,
|
||||
ctx.userId,
|
||||
state.metadata?.workspaceId ?? ctx.workspaceId,
|
||||
);
|
||||
const docs = await agentDocService.getAgentContextDocuments(agentId);
|
||||
if (docs.length > 0) {
|
||||
agentDocuments = toAgentContextDocuments(docs);
|
||||
@@ -692,7 +793,11 @@ export const createRuntimeExecutors = (
|
||||
await import('@lobechat/builtin-tool-web-onboarding/utils');
|
||||
const { UserPersonaModel } = await import('@/database/models/userMemory/persona');
|
||||
const onboardingService = new OnboardingService(ctx.serverDB, ctx.userId);
|
||||
const docService = new AgentDocumentsService(ctx.serverDB, ctx.userId);
|
||||
const docService = new AgentDocumentsService(
|
||||
ctx.serverDB,
|
||||
ctx.userId,
|
||||
state.metadata?.workspaceId ?? ctx.workspaceId,
|
||||
);
|
||||
const personaModel = new UserPersonaModel(ctx.serverDB, ctx.userId);
|
||||
|
||||
const [onboardingState, soulDoc, persona, userInfo] = await Promise.all([
|
||||
@@ -752,7 +857,7 @@ export const createRuntimeExecutors = (
|
||||
let lobehubSkillTopicTitle = '';
|
||||
if (lobehubSkillTopicId && ctx.serverDB && ctx.userId) {
|
||||
try {
|
||||
const topicModelForLobehub = new TopicModel(ctx.serverDB, ctx.userId);
|
||||
const topicModelForLobehub = new TopicModel(ctx.serverDB, ctx.userId, ctx.workspaceId);
|
||||
const topicRecord = await topicModelForLobehub.findById(lobehubSkillTopicId);
|
||||
lobehubSkillTopicTitle = topicRecord?.title ?? '';
|
||||
} catch (error) {
|
||||
@@ -853,7 +958,7 @@ export const createRuntimeExecutors = (
|
||||
if (ctx.serverDB && ctx.userId && !!klavisEnv.KLAVIS_API_KEY) {
|
||||
try {
|
||||
const { PluginModel } = await import('@/database/models/plugin');
|
||||
const pluginModel = new PluginModel(ctx.serverDB, ctx.userId);
|
||||
const pluginModel = new PluginModel(ctx.serverDB, ctx.userId, ctx.workspaceId);
|
||||
const allPlugins = await pluginModel.query();
|
||||
const validKlavisIds = new Set(KLAVIS_SERVER_TYPES.map((t) => t.identifier));
|
||||
const connectedIds = new Set(
|
||||
@@ -887,6 +992,11 @@ export const createRuntimeExecutors = (
|
||||
|
||||
const contextEngineInput = {
|
||||
agentDocuments,
|
||||
agentGroup: buildBotAgentGroupContext({
|
||||
agentConfig,
|
||||
agentId: state.metadata?.agentId,
|
||||
botContext: state.metadata?.botContext ?? ctx.botContext,
|
||||
}),
|
||||
additionalVariables: {
|
||||
...state.metadata?.deviceSystemInfo,
|
||||
...lobehubSkillVariables,
|
||||
@@ -1034,11 +1144,24 @@ export const createRuntimeExecutors = (
|
||||
}
|
||||
|
||||
// Initialize ModelRuntime (read user's keyVaults from database)
|
||||
const modelRuntime = await initModelRuntimeFromDB(ctx.serverDB, ctx.userId!, provider);
|
||||
const modelRuntime = await initModelRuntimeFromDB(
|
||||
ctx.serverDB,
|
||||
ctx.userId!,
|
||||
provider,
|
||||
ctx.workspaceId,
|
||||
);
|
||||
|
||||
// Construct ChatStreamPayload
|
||||
const stream = ctx.stream ?? true;
|
||||
const chatPayload = { messages: processedMessages, model, stream, tools };
|
||||
const chatPayload = {
|
||||
messages: processedMessages,
|
||||
model,
|
||||
stream,
|
||||
tools,
|
||||
...(typeof preserveThinkingForPayload === 'boolean' && {
|
||||
preserveThinking: preserveThinkingForPayload,
|
||||
}),
|
||||
};
|
||||
|
||||
// Buffer: accumulate text and reasoning, send every 50ms
|
||||
const BUFFER_INTERVAL = 50;
|
||||
@@ -1530,6 +1653,10 @@ export const createRuntimeExecutors = (
|
||||
};
|
||||
}
|
||||
|
||||
const persistedReasoning = shouldPersistAssistantReasoning
|
||||
? finalReasoning
|
||||
: undefined;
|
||||
|
||||
try {
|
||||
// Build metadata object
|
||||
const metadata: Record<string, any> = {};
|
||||
@@ -1562,7 +1689,7 @@ export const createRuntimeExecutors = (
|
||||
content: finalContent,
|
||||
imageList: imageList.length > 0 ? imageList : undefined,
|
||||
metadata: Object.keys(metadata).length > 0 ? metadata : undefined,
|
||||
reasoning: finalReasoning,
|
||||
reasoning: persistedReasoning,
|
||||
search: grounding,
|
||||
tools: persistedTools,
|
||||
});
|
||||
@@ -1595,7 +1722,8 @@ export const createRuntimeExecutors = (
|
||||
newState.messages.push({
|
||||
content,
|
||||
id: assistantMessageItem.id,
|
||||
reasoning: finalReasoning,
|
||||
parentId,
|
||||
reasoning: persistedReasoning,
|
||||
role: 'assistant',
|
||||
tool_calls: stateToolCalls,
|
||||
});
|
||||
@@ -1874,7 +2002,11 @@ export const createRuntimeExecutors = (
|
||||
}
|
||||
|
||||
const latestAssistantMessage = dbMessages.findLast((message) => message.role === 'assistant');
|
||||
const messageService = new MessageService(ctx.serverDB, ctx.userId);
|
||||
const messageService = new MessageService(
|
||||
ctx.serverDB,
|
||||
ctx.userId,
|
||||
state.metadata?.workspaceId ?? ctx.workspaceId,
|
||||
);
|
||||
const compressionResult = await messageService.createCompressionGroup(topicId, messageIds, {
|
||||
agentId: state.metadata?.agentId,
|
||||
threadId: state.metadata?.threadId,
|
||||
@@ -1911,6 +2043,7 @@ export const createRuntimeExecutors = (
|
||||
ctx.serverDB,
|
||||
ctx.userId,
|
||||
compressionModel.provider,
|
||||
ctx.workspaceId,
|
||||
);
|
||||
|
||||
let summaryContent = '';
|
||||
@@ -2284,9 +2417,10 @@ export const createRuntimeExecutors = (
|
||||
activeDeviceId: state.metadata?.activeDeviceId,
|
||||
agentId: state.metadata?.agentId,
|
||||
documentId: state.metadata?.documentId,
|
||||
execSubAgentTask: ctx.execSubAgentTask,
|
||||
execSubAgent: ctx.execSubAgent,
|
||||
executionTimeoutMs: timeoutMs,
|
||||
groupId: state.metadata?.groupId,
|
||||
isSubAgent: state.metadata?.isSubAgent === true,
|
||||
memoryToolPermission: agentConfig?.chatConfig?.memory?.toolPermission,
|
||||
messageId: state.metadata?.sourceMessageId,
|
||||
operationId,
|
||||
@@ -2315,6 +2449,7 @@ export const createRuntimeExecutors = (
|
||||
toolResultMaxLength,
|
||||
topicId: ctx.topicId,
|
||||
userId: ctx.userId,
|
||||
workspaceId: state.metadata?.workspaceId ?? ctx.workspaceId,
|
||||
}),
|
||||
{
|
||||
isInterrupted: () => isOperationInterrupted(ctx),
|
||||
@@ -2347,7 +2482,9 @@ export const createRuntimeExecutors = (
|
||||
interruptedAt: new Date().toISOString(),
|
||||
reason: 'async_tool',
|
||||
};
|
||||
newState.pendingToolsCalling = [chatToolPayload];
|
||||
newState.pendingToolsCalling = [
|
||||
withDeferredToolResultMessageId(chatToolPayload, execution.result),
|
||||
];
|
||||
return {
|
||||
events: [
|
||||
{
|
||||
@@ -2369,6 +2506,7 @@ export const createRuntimeExecutors = (
|
||||
toolCallId: chatToolPayload.id,
|
||||
topicId: ctx.topicId ?? state.metadata?.topicId,
|
||||
userId: ctx.userId,
|
||||
workspaceId: state.metadata?.workspaceId ?? ctx.workspaceId,
|
||||
});
|
||||
const executionTime = executionResult.executionTime;
|
||||
const isSuccess = executionResult.success;
|
||||
@@ -2648,9 +2786,11 @@ export const createRuntimeExecutors = (
|
||||
*/
|
||||
call_tools_batch: async (instruction, state) => {
|
||||
const { payload } = instruction as Extract<AgentInstruction, { type: 'call_tools_batch' }>;
|
||||
const { parentMessageId, toolsCalling } = payload;
|
||||
const { parentMessageId } = payload;
|
||||
const toolsCalling = payload.toolsCalling as ChatToolPayload[];
|
||||
const { operationId, stepIndex, streamManager, toolExecutionService } = ctx;
|
||||
const events: AgentEvent[] = [];
|
||||
const toolCallOrder = new Map(toolsCalling.map((tool, index) => [tool.id, index] as const));
|
||||
|
||||
const operationLogId = `${operationId}:${stepIndex}`;
|
||||
log(
|
||||
@@ -2862,9 +3002,10 @@ export const createRuntimeExecutors = (
|
||||
activeDeviceId: state.metadata?.activeDeviceId,
|
||||
agentId: state.metadata?.agentId,
|
||||
documentId: state.metadata?.documentId,
|
||||
execSubAgentTask: ctx.execSubAgentTask,
|
||||
execSubAgent: ctx.execSubAgent,
|
||||
executionTimeoutMs: timeoutMs,
|
||||
groupId: state.metadata?.groupId,
|
||||
isSubAgent: state.metadata?.isSubAgent === true,
|
||||
memoryToolPermission: batchAgentConfig?.chatConfig?.memory?.toolPermission,
|
||||
messageId: state.metadata?.sourceMessageId,
|
||||
operationId,
|
||||
@@ -2884,6 +3025,7 @@ export const createRuntimeExecutors = (
|
||||
toolResultMaxLength: batchAgentConfig?.chatConfig?.toolResultMaxLength,
|
||||
topicId: ctx.topicId,
|
||||
userId: ctx.userId,
|
||||
workspaceId: state.metadata?.workspaceId ?? ctx.workspaceId,
|
||||
}),
|
||||
{
|
||||
isInterrupted: () => isOperationInterrupted(ctx),
|
||||
@@ -2899,7 +3041,9 @@ export const createRuntimeExecutors = (
|
||||
// the batch parks for it after all server tools settle.
|
||||
if (execution.result.deferred) {
|
||||
log(`[${operationLogId}] Tool ${toolName} deferred; will park after batch`);
|
||||
deferredTools.push(chatToolPayload);
|
||||
deferredTools.push(
|
||||
withDeferredToolResultMessageId(chatToolPayload, execution.result),
|
||||
);
|
||||
batchExecuteToolSpan.setAttributes(
|
||||
buildExecuteToolResultAttributes({ attempts: execution.attempts, success: true }),
|
||||
);
|
||||
@@ -2914,6 +3058,7 @@ export const createRuntimeExecutors = (
|
||||
toolCallId: chatToolPayload.id,
|
||||
topicId: ctx.topicId ?? state.metadata?.topicId,
|
||||
userId: ctx.userId,
|
||||
workspaceId: state.metadata?.workspaceId ?? ctx.workspaceId,
|
||||
});
|
||||
const executionTime = executionResult.executionTime;
|
||||
const isSuccess = executionResult.success;
|
||||
@@ -3163,7 +3308,9 @@ export const createRuntimeExecutors = (
|
||||
// Park if any tools still owe an out-of-band result: client tools (run on
|
||||
// the client) and/or deferred async tools (e.g. sub-agents). The operation
|
||||
// resumes once every pending tool's result is delivered.
|
||||
const pendingTools = [...deferredTools, ...clientTools];
|
||||
const pendingTools = [...deferredTools, ...clientTools].sort(
|
||||
(a, b) => (toolCallOrder.get(a.id) ?? 0) - (toolCallOrder.get(b.id) ?? 0),
|
||||
);
|
||||
if (pendingTools.length > 0) {
|
||||
// Prefer the async-tool reason when any deferred tool is present; the
|
||||
// individual pending payloads still carry their own identity for the
|
||||
@@ -3227,7 +3374,7 @@ export const createRuntimeExecutors = (
|
||||
* Mirrors the client-side exec_sub_agent executor in createAgentExecutors.ts
|
||||
* but runs entirely server-side (no polling required). Flow:
|
||||
* 1. Create a task message (role: 'task') as a placeholder visible in the UI.
|
||||
* 2. Fire execSubAgentTask via the injected callback so the sub-agent runs as
|
||||
* 2. Fire execSubAgent via the injected callback so the sub-agent runs as
|
||||
* an independent QStash operation.
|
||||
* 3. Return a sub_agent_result context so GeneralChatAgent calls the LLM once
|
||||
* more and the parent agent can acknowledge the delegation.
|
||||
@@ -3244,6 +3391,32 @@ export const createRuntimeExecutors = (
|
||||
// targetAgentId is a cloud extension injected by agentManagement.callAgent
|
||||
const targetAgentId = (task as any).targetAgentId ?? agentId;
|
||||
|
||||
if (state.metadata?.isSubAgent === true) {
|
||||
log('[%s] Nested sub-agent dispatch blocked', taskLogId);
|
||||
return {
|
||||
events,
|
||||
newState: state,
|
||||
nextContext: {
|
||||
payload: {
|
||||
parentMessageId,
|
||||
result: {
|
||||
error: 'Sub-agent calls cannot be triggered from within another sub-agent.',
|
||||
success: false,
|
||||
taskMessageId: parentMessageId,
|
||||
threadId: '',
|
||||
},
|
||||
},
|
||||
phase: 'sub_agent_result',
|
||||
session: {
|
||||
messageCount: state.messages.length,
|
||||
sessionId: operationId,
|
||||
status: 'running',
|
||||
stepCount: state.stepCount + 1,
|
||||
},
|
||||
} as unknown as AgentRuntimeContext,
|
||||
};
|
||||
}
|
||||
|
||||
let taskMessageId: string | undefined;
|
||||
try {
|
||||
const taskMessage = await ctx.messageModel.create({
|
||||
@@ -3268,9 +3441,9 @@ export const createRuntimeExecutors = (
|
||||
const effectiveTaskMessageId = taskMessageId ?? parentMessageId;
|
||||
|
||||
let dispatched = false;
|
||||
if (ctx.execSubAgentTask && topicId && agentId) {
|
||||
if (ctx.execSubAgent && topicId && agentId) {
|
||||
try {
|
||||
await ctx.execSubAgentTask({
|
||||
await ctx.execSubAgent({
|
||||
agentId: targetAgentId,
|
||||
groupId: state.metadata?.groupId ?? undefined,
|
||||
instruction: task.instruction,
|
||||
@@ -3295,7 +3468,7 @@ export const createRuntimeExecutors = (
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log('[%s] execSubAgentTask not available, skipping sub-agent dispatch', taskLogId);
|
||||
log('[%s] execSubAgent not available, skipping sub-agent dispatch', taskLogId);
|
||||
}
|
||||
|
||||
return {
|
||||
@@ -3325,7 +3498,7 @@ export const createRuntimeExecutors = (
|
||||
* Server-side exec_sub_agents executor
|
||||
*
|
||||
* Same as exec_sub_agent but for a batch. Each sub-agent is fired
|
||||
* independently via execSubAgentTask and a task message is created for each.
|
||||
* independently via execSubAgent and a task message is created for each.
|
||||
*/
|
||||
exec_sub_agents: async (instruction, state) => {
|
||||
const { payload } = instruction as AgentInstructionExecSubAgents;
|
||||
@@ -3339,6 +3512,33 @@ export const createRuntimeExecutors = (
|
||||
|
||||
log('[%s] Starting batch of %d tasks', taskLogId, tasks.length);
|
||||
|
||||
if (state.metadata?.isSubAgent === true) {
|
||||
log('[%s] Nested sub-agent batch dispatch blocked', taskLogId);
|
||||
return {
|
||||
events,
|
||||
newState: state,
|
||||
nextContext: {
|
||||
payload: {
|
||||
parentMessageId,
|
||||
results: tasks.map((task) => ({
|
||||
description: task.description,
|
||||
error: 'Sub-agent calls cannot be triggered from within another sub-agent.',
|
||||
success: false,
|
||||
taskMessageId: parentMessageId,
|
||||
threadId: '',
|
||||
})),
|
||||
},
|
||||
phase: 'sub_agents_batch_result',
|
||||
session: {
|
||||
messageCount: state.messages.length,
|
||||
sessionId: operationId,
|
||||
status: 'running',
|
||||
stepCount: state.stepCount + 1,
|
||||
},
|
||||
} as unknown as AgentRuntimeContext,
|
||||
};
|
||||
}
|
||||
|
||||
let lastTaskMessageId: string | undefined;
|
||||
const taskResults: Array<{ success: boolean; taskMessageId: string; threadId: string }> = [];
|
||||
|
||||
@@ -3367,9 +3567,9 @@ export const createRuntimeExecutors = (
|
||||
}
|
||||
|
||||
let taskDispatched = false;
|
||||
if (ctx.execSubAgentTask && topicId && agentId) {
|
||||
if (ctx.execSubAgent && topicId && agentId) {
|
||||
try {
|
||||
await ctx.execSubAgentTask({
|
||||
await ctx.execSubAgent({
|
||||
agentId: targetAgentId,
|
||||
groupId: state.metadata?.groupId ?? undefined,
|
||||
instruction: task.instruction,
|
||||
@@ -3437,7 +3637,7 @@ export const createRuntimeExecutors = (
|
||||
// Clear runningOperation from topic metadata so reconnect doesn't trigger after completion
|
||||
if (ctx.topicId && ctx.userId) {
|
||||
try {
|
||||
const topicModel = new TopicModel(ctx.serverDB, ctx.userId);
|
||||
const topicModel = new TopicModel(ctx.serverDB, ctx.userId, ctx.workspaceId);
|
||||
await topicModel.updateMetadata(ctx.topicId, { runningOperation: null });
|
||||
} catch (e) {
|
||||
log('[%s] Failed to clear runningOperation metadata: %O', operationId, e);
|
||||
+1
-1
@@ -51,7 +51,7 @@ export const getDefaultReasonDetail = (finalState: any, reason?: string): string
|
||||
*
|
||||
* - `messages` — canonical copy lives in the DB (UIChatMessage rows)
|
||||
* and the runtime in-memory state; in-process consumers that need
|
||||
* it (e.g. `execSubAgentTask.onComplete`) receive the full state
|
||||
* it (e.g. `execSubAgent.onComplete`) receive the full state
|
||||
* via the local `HookContext` channel, not via the stream.
|
||||
* - `operationToolSet`, `toolManifestMap`, `toolSourceMap`, `tools`
|
||||
* — operation-level snapshot; back-compat copies of one struct.
|
||||
+29
@@ -176,6 +176,19 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('should not publish end event when status changes to waiting_for_async_tool because the same stream will resume', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const previousState = { status: 'running', stepCount: 3 };
|
||||
const newState = { status: 'waiting_for_async_tool', stepCount: 4 };
|
||||
|
||||
mockStateManager.loadAgentState.mockResolvedValue(previousState);
|
||||
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStateManager.saveAgentState).toHaveBeenCalledWith(operationId, newState);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not publish end event when status was already done', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const previousState = { status: 'done', stepCount: 5 };
|
||||
@@ -291,6 +304,22 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('should not publish end event when status becomes waiting_for_async_tool because deferred tools resume this operation', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const stepResult = {
|
||||
executionTime: 1000,
|
||||
newState: { status: 'waiting_for_async_tool', stepCount: 4 },
|
||||
stepIndex: 4,
|
||||
};
|
||||
|
||||
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 3 });
|
||||
|
||||
await coordinator.saveStepResult(operationId, stepResult as any);
|
||||
|
||||
expect(mockStateManager.saveStepResult).toHaveBeenCalledWith(operationId, stepResult);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should publish end event when status becomes interrupted', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const stepResult = {
|
||||
+377
-46
@@ -16,6 +16,12 @@ const mockBuiltinModels = vi.hoisted(() => [
|
||||
id: 'gpt-4',
|
||||
providerId: 'openai',
|
||||
},
|
||||
{
|
||||
abilities: { functionCall: true, video: true, vision: true },
|
||||
id: 'qwen3.6-plus',
|
||||
providerId: 'qwen',
|
||||
settings: { extendParams: ['preserveThinking'] },
|
||||
},
|
||||
{
|
||||
abilities: { functionCall: false, video: false, vision: false },
|
||||
id: 'no-tools-model',
|
||||
@@ -25,6 +31,7 @@ const mockBuiltinModels = vi.hoisted(() => [
|
||||
abilities: { functionCall: true, video: true, vision: true },
|
||||
id: 'gemini-3.1-flash-lite-preview',
|
||||
providerId: 'google',
|
||||
settings: { extendParams: ['preserveThinking'] },
|
||||
},
|
||||
]);
|
||||
|
||||
@@ -100,12 +107,21 @@ describe('RuntimeExecutors', () => {
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
vi.mocked(initModelRuntimeFromDB).mockReset();
|
||||
mockCreateCompressionGroup.mockReset();
|
||||
mockFinalizeCompression.mockReset();
|
||||
mockCreateCompressionGroup.mockResolvedValue({
|
||||
messageGroupId: 'group-123',
|
||||
messagesToSummarize: [],
|
||||
success: true,
|
||||
});
|
||||
mockFinalizeCompression.mockResolvedValue({ success: true });
|
||||
vi.mocked(initModelRuntimeFromDB).mockResolvedValue({
|
||||
chat: vi.fn().mockImplementation(async (_payload: any, options: any) => {
|
||||
await options?.callback?.onText?.('done');
|
||||
return new Response('done');
|
||||
}),
|
||||
} as any);
|
||||
|
||||
mockMessageModel = {
|
||||
create: vi.fn().mockResolvedValue({ id: 'msg-123' }),
|
||||
@@ -237,6 +253,31 @@ describe('RuntimeExecutors', () => {
|
||||
);
|
||||
});
|
||||
|
||||
it('passes workspaceId to model runtime initialization', async () => {
|
||||
const workspaceCtx = { ...ctx, workspaceId: 'ws-1' };
|
||||
const executors = createRuntimeExecutors(workspaceCtx);
|
||||
const state = createMockState();
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
messages: [{ content: 'Hello', role: 'user' }],
|
||||
model: 'gpt-4',
|
||||
provider: 'openai',
|
||||
tools: [],
|
||||
},
|
||||
type: 'call_llm' as const,
|
||||
};
|
||||
|
||||
await executors.call_llm!(instruction, state);
|
||||
|
||||
expect(initModelRuntimeFromDB).toHaveBeenCalledWith(
|
||||
workspaceCtx.serverDB,
|
||||
'user-123',
|
||||
'openai',
|
||||
'ws-1',
|
||||
);
|
||||
});
|
||||
|
||||
it('should pass parentId from payload.parentMessageId to messageModel.create', async () => {
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
@@ -367,52 +408,233 @@ describe('RuntimeExecutors', () => {
|
||||
);
|
||||
});
|
||||
|
||||
it('should preserve reasoning in newState when assistant returns tool calls', async () => {
|
||||
const toolCallPayload = [
|
||||
{
|
||||
function: { arguments: '{}', name: 'search' },
|
||||
id: 'call_1',
|
||||
type: 'function',
|
||||
},
|
||||
];
|
||||
describe('reasoning persistence gate', () => {
|
||||
it('should persist assistant reasoning with tool calls when preserveThinking is enabled on a supported model', async () => {
|
||||
const toolCallPayload = [
|
||||
{
|
||||
function: { arguments: '{}', name: 'search' },
|
||||
id: 'call_1',
|
||||
type: 'function',
|
||||
},
|
||||
];
|
||||
|
||||
const mockChat = vi.fn().mockImplementation(async (_payload, options) => {
|
||||
await options?.callback?.onThinking?.('Need to inspect the search results first.');
|
||||
await options?.callback?.onToolsCalling?.({ toolsCalling: toolCallPayload });
|
||||
await options?.callback?.onCompletion?.({
|
||||
usage: {
|
||||
totalInputTokens: 1,
|
||||
totalOutputTokens: 2,
|
||||
totalTokens: 3,
|
||||
const mockChat = vi.fn().mockImplementation(async (_payload, options) => {
|
||||
await options?.callback?.onThinking?.('Need to inspect the search results first.');
|
||||
await options?.callback?.onToolsCalling?.({ toolsCalling: toolCallPayload });
|
||||
await options?.callback?.onCompletion?.({
|
||||
usage: {
|
||||
totalInputTokens: 1,
|
||||
totalOutputTokens: 2,
|
||||
totalTokens: 3,
|
||||
},
|
||||
});
|
||||
return new Response('done');
|
||||
});
|
||||
vi.mocked(initModelRuntimeFromDB).mockResolvedValueOnce({ chat: mockChat } as any);
|
||||
|
||||
const ctxWithConfig: RuntimeExecutorContext = {
|
||||
...ctx,
|
||||
agentConfig: {
|
||||
chatConfig: { preserveThinking: true },
|
||||
plugins: [],
|
||||
systemRole: 'test',
|
||||
},
|
||||
};
|
||||
|
||||
const executors = createRuntimeExecutors(ctxWithConfig);
|
||||
const state = createMockState({
|
||||
modelRuntimeConfig: {
|
||||
model: 'qwen3.6-plus',
|
||||
provider: 'qwen',
|
||||
},
|
||||
});
|
||||
return new Response('done');
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
messages: [{ content: 'Hello', role: 'user' }],
|
||||
model: 'qwen3.6-plus',
|
||||
provider: 'qwen',
|
||||
tools: [],
|
||||
},
|
||||
type: 'call_llm' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_llm!(instruction, state);
|
||||
|
||||
expect(result.newState.messages.at(-1)).toEqual(
|
||||
expect.objectContaining({
|
||||
reasoning: { content: 'Need to inspect the search results first.' },
|
||||
role: 'assistant',
|
||||
tool_calls: [expect.objectContaining({ id: 'call_1' })],
|
||||
}),
|
||||
);
|
||||
expect(mockChat).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ preserveThinking: true }),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
vi.mocked(initModelRuntimeFromDB).mockResolvedValueOnce({ chat: mockChat } as any);
|
||||
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
it('should not persist assistant reasoning when preserveThinking is not enabled', async () => {
|
||||
const mockChat = vi.fn().mockImplementation(async (_payload, options) => {
|
||||
await options?.callback?.onThinking?.('hidden reasoning');
|
||||
await options?.callback?.onText?.('answer');
|
||||
return new Response('done');
|
||||
});
|
||||
vi.mocked(initModelRuntimeFromDB).mockResolvedValueOnce({ chat: mockChat } as any);
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
messages: [{ content: 'Hello', role: 'user' }],
|
||||
model: 'gpt-4',
|
||||
provider: 'openai',
|
||||
tools: [],
|
||||
},
|
||||
type: 'call_llm' as const,
|
||||
};
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
|
||||
const result = await executors.call_llm!(instruction, state);
|
||||
const instruction = {
|
||||
payload: {
|
||||
messages: [{ content: 'Hello', role: 'user' }],
|
||||
model: 'gpt-4',
|
||||
provider: 'openai',
|
||||
},
|
||||
type: 'call_llm' as const,
|
||||
};
|
||||
|
||||
expect(result.newState.messages.at(-1)).toEqual(
|
||||
expect.objectContaining({
|
||||
id: 'msg-123',
|
||||
reasoning: { content: 'Need to inspect the search results first.' },
|
||||
role: 'assistant',
|
||||
tool_calls: [expect.objectContaining({ id: 'call_1' })],
|
||||
}),
|
||||
);
|
||||
const result = await executors.call_llm!(instruction, state);
|
||||
const assistant = result.newState.messages.at(-1) as any;
|
||||
|
||||
expect(assistant.reasoning).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should persist assistant reasoning when preserveThinking is enabled on a supported model', async () => {
|
||||
const mockChat = vi.fn().mockImplementation(async (_payload, options) => {
|
||||
await options?.callback?.onThinking?.('preserved reasoning');
|
||||
await options?.callback?.onText?.('answer');
|
||||
return new Response('done');
|
||||
});
|
||||
vi.mocked(initModelRuntimeFromDB).mockResolvedValueOnce({ chat: mockChat } as any);
|
||||
|
||||
const ctxWithConfig: RuntimeExecutorContext = {
|
||||
...ctx,
|
||||
agentConfig: {
|
||||
chatConfig: { preserveThinking: true },
|
||||
plugins: [],
|
||||
systemRole: 'test',
|
||||
},
|
||||
};
|
||||
|
||||
const executors = createRuntimeExecutors(ctxWithConfig);
|
||||
const state = createMockState({
|
||||
modelRuntimeConfig: {
|
||||
model: 'qwen3.6-plus',
|
||||
provider: 'qwen',
|
||||
},
|
||||
});
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
messages: [{ content: 'Hello', role: 'user' }],
|
||||
model: 'qwen3.6-plus',
|
||||
provider: 'qwen',
|
||||
},
|
||||
type: 'call_llm' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_llm!(instruction, state);
|
||||
const assistant = result.newState.messages.at(-1) as any;
|
||||
|
||||
expect(assistant.reasoning).toEqual({
|
||||
content: 'preserved reasoning',
|
||||
});
|
||||
expect(mockChat).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ preserveThinking: true }),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
|
||||
it('should persist reasoning for unknown custom deployments on supported providers', async () => {
|
||||
const mockChat = vi.fn().mockImplementation(async (_payload, options) => {
|
||||
await options?.callback?.onThinking?.('custom deployment reasoning');
|
||||
await options?.callback?.onText?.('answer');
|
||||
return new Response('done');
|
||||
});
|
||||
vi.mocked(initModelRuntimeFromDB).mockResolvedValueOnce({ chat: mockChat } as any);
|
||||
|
||||
const ctxWithConfig: RuntimeExecutorContext = {
|
||||
...ctx,
|
||||
agentConfig: {
|
||||
chatConfig: { preserveThinking: true },
|
||||
plugins: [],
|
||||
systemRole: 'test',
|
||||
},
|
||||
};
|
||||
|
||||
const executors = createRuntimeExecutors(ctxWithConfig);
|
||||
const state = createMockState({
|
||||
modelRuntimeConfig: {
|
||||
model: 'my-qwen-custom-deployment',
|
||||
provider: 'qwen',
|
||||
},
|
||||
});
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
messages: [{ content: 'Hello', role: 'user' }],
|
||||
model: 'my-qwen-custom-deployment',
|
||||
provider: 'qwen',
|
||||
},
|
||||
type: 'call_llm' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_llm!(instruction, state);
|
||||
const assistant = result.newState.messages.at(-1) as any;
|
||||
|
||||
expect(assistant.reasoning).toEqual({
|
||||
content: 'custom deployment reasoning',
|
||||
});
|
||||
expect(mockChat).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ preserveThinking: true }),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
|
||||
it('should not persist reasoning when model does not declare preserveThinking capability', async () => {
|
||||
const mockChat = vi.fn().mockImplementation(async (_payload, options) => {
|
||||
await options?.callback?.onThinking?.('reasoning that should not be saved');
|
||||
await options?.callback?.onText?.('answer');
|
||||
return new Response('done');
|
||||
});
|
||||
vi.mocked(initModelRuntimeFromDB).mockResolvedValueOnce({ chat: mockChat } as any);
|
||||
|
||||
const ctxWithConfig: RuntimeExecutorContext = {
|
||||
...ctx,
|
||||
agentConfig: {
|
||||
chatConfig: { preserveThinking: true },
|
||||
plugins: [],
|
||||
systemRole: 'test',
|
||||
},
|
||||
};
|
||||
|
||||
const executors = createRuntimeExecutors(ctxWithConfig);
|
||||
const state = createMockState({
|
||||
modelRuntimeConfig: {
|
||||
model: 'gpt-4',
|
||||
provider: 'openai',
|
||||
},
|
||||
});
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
messages: [{ content: 'Hello', role: 'user' }],
|
||||
model: 'gpt-4',
|
||||
provider: 'openai',
|
||||
},
|
||||
type: 'call_llm' as const,
|
||||
};
|
||||
|
||||
const result = await executors.call_llm!(instruction, state);
|
||||
const assistant = result.newState.messages.at(-1) as any;
|
||||
|
||||
expect(assistant.reasoning).toBeUndefined();
|
||||
expect(mockChat).toHaveBeenCalledWith(
|
||||
expect.not.objectContaining({ preserveThinking: expect.any(Boolean) }),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it('retries empty completions on the branded provider then throws ModelEmptyError', async () => {
|
||||
@@ -549,7 +771,14 @@ describe('RuntimeExecutors', () => {
|
||||
});
|
||||
vi.mocked(initModelRuntimeFromDB).mockResolvedValueOnce({ chat: mockChat } as any);
|
||||
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
// Reasoning only lands in the finalized message when preserveThinking is
|
||||
// enabled on a supported model; otherwise it is intentionally dropped.
|
||||
// Enable it here so this still guards reasoning_part capture (not drop).
|
||||
const ctxWithThinking: RuntimeExecutorContext = {
|
||||
...ctx,
|
||||
agentConfig: { chatConfig: { preserveThinking: true }, plugins: [], systemRole: 'test' },
|
||||
};
|
||||
const executors = createRuntimeExecutors(ctxWithThinking);
|
||||
const result = await executors.call_llm!(geminiInstruction(), createMockState());
|
||||
|
||||
expect(result.newState.messages.at(-1)).toEqual(
|
||||
@@ -1449,6 +1678,68 @@ describe('RuntimeExecutors', () => {
|
||||
expect(engineSpy).toHaveBeenCalledWith(expect.objectContaining({ evalContext }));
|
||||
});
|
||||
|
||||
it('should inject current agent identity for bot-originated runs', async () => {
|
||||
const ctxWithConfig: RuntimeExecutorContext = {
|
||||
...ctx,
|
||||
agentConfig: {
|
||||
description: 'Answers customer support questions.',
|
||||
plugins: [],
|
||||
systemRole: 'test',
|
||||
title: 'Support Bot',
|
||||
},
|
||||
botContext: {
|
||||
applicationId: 'discord-app',
|
||||
isOwner: true,
|
||||
platform: 'discord',
|
||||
platformThreadId: 'discord:channel-1',
|
||||
senderExternalUserId: 'user-platform-id',
|
||||
},
|
||||
};
|
||||
const executors = createRuntimeExecutors(ctxWithConfig);
|
||||
const state = createMockState({
|
||||
metadata: {
|
||||
agentId: 'agent-support',
|
||||
botContext: ctxWithConfig.botContext,
|
||||
topicId: 'topic-123',
|
||||
},
|
||||
});
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
messages: [{ content: 'Hello', role: 'user' }],
|
||||
model: 'gpt-4',
|
||||
provider: 'openai',
|
||||
},
|
||||
type: 'call_llm' as const,
|
||||
};
|
||||
|
||||
await executors.call_llm!(instruction, state);
|
||||
|
||||
expect(engineSpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
agentGroup: {
|
||||
agentMap: {
|
||||
'agent-support': {
|
||||
name: 'Support Bot',
|
||||
role: 'participant',
|
||||
},
|
||||
},
|
||||
currentAgentId: 'agent-support',
|
||||
currentAgentName: 'Support Bot',
|
||||
currentAgentRole: 'participant',
|
||||
members: [
|
||||
{
|
||||
id: 'agent-support',
|
||||
name: 'Support Bot',
|
||||
role: 'participant',
|
||||
},
|
||||
],
|
||||
systemPrompt: 'Answers customer support questions.',
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should build capabilities from LOBE_DEFAULT_MODEL_LIST', async () => {
|
||||
const ctxWithConfig: RuntimeExecutorContext = {
|
||||
...ctx,
|
||||
@@ -3763,9 +4054,9 @@ describe('RuntimeExecutors', () => {
|
||||
|
||||
// Import real implementations directly from source (bypassing the @lobechat/model-runtime mock)
|
||||
const { consumeStreamUntilDone: realConsume } =
|
||||
await import('../../../../../packages/model-runtime/src/utils/consumeStream');
|
||||
await import('../../../../../../packages/model-runtime/src/utils/consumeStream');
|
||||
const { createCallbacksTransformer } =
|
||||
await import('../../../../../packages/model-runtime/src/core/streams/protocol');
|
||||
await import('../../../../../../packages/model-runtime/src/core/streams/protocol');
|
||||
|
||||
// Use real consumeStreamUntilDone so the stream is actually consumed
|
||||
vi.mocked(consumeStreamUntilDone).mockImplementation(realConsume);
|
||||
@@ -3945,7 +4236,7 @@ describe('RuntimeExecutors', () => {
|
||||
|
||||
await vi.runOnlyPendingTimersAsync();
|
||||
|
||||
const result = await resultPromise;
|
||||
await resultPromise;
|
||||
|
||||
expect(mockChat).toHaveBeenCalledTimes(2);
|
||||
expect(mockMessageModel.create).toHaveBeenCalledTimes(1);
|
||||
@@ -4426,13 +4717,13 @@ describe('RuntimeExecutors', () => {
|
||||
expect((result.nextContext?.payload as any).stop).toBe(true);
|
||||
});
|
||||
|
||||
it('exec_sub_agent executor creates task message and calls execSubAgentTask callback', async () => {
|
||||
it('exec_sub_agent executor creates task message and calls execSubAgent callback', async () => {
|
||||
const mockExecSubAgentTask = vi
|
||||
.fn()
|
||||
.mockResolvedValue({ success: true, operationId: 'child-op', threadId: 'thread-child' });
|
||||
const ctxWithCallback = {
|
||||
...ctx,
|
||||
execSubAgentTask: mockExecSubAgentTask,
|
||||
execSubAgent: mockExecSubAgentTask,
|
||||
topicId: 'topic-123',
|
||||
};
|
||||
|
||||
@@ -4464,7 +4755,7 @@ describe('RuntimeExecutors', () => {
|
||||
}),
|
||||
);
|
||||
|
||||
// execSubAgentTask callback fired with targetAgentId
|
||||
// execSubAgent callback fired with targetAgentId
|
||||
expect(mockExecSubAgentTask).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
agentId: 'target-agent-id',
|
||||
@@ -4478,7 +4769,47 @@ describe('RuntimeExecutors', () => {
|
||||
expect(result.nextContext?.phase).toBe('sub_agent_result');
|
||||
});
|
||||
|
||||
it('exec_sub_agent gracefully skips dispatch when execSubAgentTask not injected', async () => {
|
||||
it('exec_sub_agent blocks nested dispatch when current state is already a sub-agent', async () => {
|
||||
const mockExecSubAgentTask = vi.fn();
|
||||
const ctxWithCallback = {
|
||||
...ctx,
|
||||
execSubAgentTask: mockExecSubAgentTask,
|
||||
topicId: 'topic-123',
|
||||
};
|
||||
|
||||
const executors = createRuntimeExecutors(ctxWithCallback);
|
||||
const state = createMockState({
|
||||
metadata: {
|
||||
agentId: 'parent-agent-id',
|
||||
isSubAgent: true,
|
||||
topicId: 'topic-123',
|
||||
},
|
||||
});
|
||||
|
||||
const instruction = {
|
||||
payload: {
|
||||
parentMessageId: 'tool-msg-id',
|
||||
task: {
|
||||
description: 'Nested call',
|
||||
instruction: 'Do nested work',
|
||||
targetAgentId: 'target-agent-id',
|
||||
},
|
||||
},
|
||||
type: 'exec_sub_agent' as const,
|
||||
};
|
||||
|
||||
const result = await executors.exec_sub_agent!(instruction as any, state);
|
||||
|
||||
expect(result.nextContext?.phase).toBe('sub_agent_result');
|
||||
expect((result.nextContext?.payload as any).result).toMatchObject({
|
||||
error: 'Sub-agent calls cannot be triggered from within another sub-agent.',
|
||||
success: false,
|
||||
});
|
||||
expect(mockMessageModel.create).not.toHaveBeenCalled();
|
||||
expect(mockExecSubAgentTask).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('exec_sub_agent gracefully skips dispatch when execSubAgent not injected', async () => {
|
||||
// No callback injected (e.g. in tests that don't set it up)
|
||||
const executors = createRuntimeExecutors(ctx);
|
||||
const state = createMockState();
|
||||
+1
@@ -38,6 +38,7 @@ export interface IAgentStateManager {
|
||||
agentConfig?: any;
|
||||
modelRuntimeConfig?: any;
|
||||
userId?: string;
|
||||
workspaceId?: string;
|
||||
},
|
||||
) => Promise<void>;
|
||||
|
||||
-1
@@ -188,7 +188,6 @@ describe('AssistantStore', () => {
|
||||
global.fetch = vi.fn().mockRejectedValue(new Error('something else'));
|
||||
const store = new AssistantStore();
|
||||
|
||||
|
||||
vi.spyOn(console, 'error').mockImplementation(() => {});
|
||||
|
||||
await expect(store.getAgentIndex()).rejects.toThrow('something else');
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user