mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-14 03:30:19 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 78ef131cb7 | |||
| 9ce61d41f7 | |||
| 4d138539ca |
@@ -0,0 +1,182 @@
|
||||
---
|
||||
name: upstash-workflow-testing
|
||||
description: Local testing guide for Upstash Workflow endpoints via the QStash dev server. Use when verifying workflow handlers end-to-end, debugging why a workflow step isn't firing, inspecting step-level logs and outputs, or triggering a dry-run from curl. Triggers on 'test workflow locally', 'smoke test workflow', 'qstash dev server', 'workflow dry run', 'debug workflow step'.
|
||||
---
|
||||
|
||||
# Upstash Workflow Local Testing
|
||||
|
||||
How to trigger, observe, and debug Upstash Workflow endpoints against the local QStash dev server — **without** writing unit tests.
|
||||
|
||||
## TL;DR
|
||||
|
||||
Workflow endpoints reject raw curl (signature verification). To test locally:
|
||||
|
||||
1. **Publish via QStash dev server** at `localhost:8080`, not directly to your handler
|
||||
2. **Query workflow logs** via `/v2/workflows/logs?workflowRunId=...` — NOT `/v2/events` (events only lists direct publishes, not workflow-internal step publishes)
|
||||
|
||||
## Prerequisites
|
||||
|
||||
The `.env` file already ships dev defaults:
|
||||
|
||||
```bash
|
||||
QSTASH_URL="http://localhost:8080"
|
||||
QSTASH_TOKEN="eyJVc2VySUQiOiJkZWZhdWx0VXNlciIs..." # dev default token
|
||||
QSTASH_CURRENT_SIGNING_KEY="sig_..."
|
||||
QSTASH_NEXT_SIGNING_KEY="sig_..."
|
||||
```
|
||||
|
||||
Dev startup (`bun run dev`) boots both the Next.js server and a local QStash dev server. Verify:
|
||||
|
||||
```bash
|
||||
lsof -i :3011 # Next.js
|
||||
lsof -i :8080 # QStash dev server
|
||||
```
|
||||
|
||||
If QStash isn't up: `brew install upstash/qstash/qstash-cli && qstash dev` (or check the dev startup script).
|
||||
|
||||
## 1. Trigger a workflow
|
||||
|
||||
Publish to QStash dev server, which signs + forwards to your handler:
|
||||
|
||||
```bash
|
||||
TOKEN="$(grep '^QSTASH_TOKEN=' .env | cut -d '"' -f2)"
|
||||
TARGET="http://localhost:3011/api/workflows/memory-user-memory/cron/hourly"
|
||||
|
||||
curl -X POST "http://localhost:8080/v2/publish/$TARGET" \
|
||||
-H "Authorization: Bearer $TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"dryRun": true, "baseUrl": "http://localhost:3011"}'
|
||||
# → {"messageId":"msg_..."}
|
||||
```
|
||||
|
||||
For dry-run, always pass `"dryRun": true` in the body. The handler should return stats without triggering downstream L2/L3 pipelines.
|
||||
|
||||
## 2. Observe execution
|
||||
|
||||
### Workflow logs (authoritative source)
|
||||
|
||||
```bash
|
||||
# List recent runs (all workflows)
|
||||
curl -s "http://localhost:8080/v2/workflows/logs" \
|
||||
-H "Authorization: Bearer $TOKEN" | jq '.runs[] | {id: .workflowRunId, url: .workflowUrl, state: .workflowState}'
|
||||
|
||||
# Inspect a specific run
|
||||
WFR="wfr_xxxxxxxxxxxxxxxxx"
|
||||
curl -s "http://localhost:8080/v2/workflows/logs?workflowRunId=$WFR" \
|
||||
-H "Authorization: Bearer $TOKEN" | jq '.runs[0].steps'
|
||||
```
|
||||
|
||||
Each step entry includes:
|
||||
|
||||
- `stepName` — what you passed to `context.run('<name>', ...)`
|
||||
- `stepType` — `Initial` | `Run` | `Call` | `Invoke` | `SleepFor` | etc.
|
||||
- `state` — `STEP_SUCCESS` | `STEP_FAILED` | `STEP_RETRY`
|
||||
- `out` — JSON-serialized return value of the step (base64 in some fields)
|
||||
- `messageId` — underlying QStash message for that step
|
||||
|
||||
Run-level `workflowState`:
|
||||
|
||||
- `RUN_SUCCESS` — all steps completed
|
||||
- `RUN_FAILED` — a step hit max retries
|
||||
- `RUN_CANCELED` — explicitly canceled
|
||||
|
||||
### Events (direct-publish only — NOT step-level)
|
||||
|
||||
```bash
|
||||
curl -s "http://localhost:8080/v2/events?count=50" \
|
||||
-H "Authorization: Bearer $TOKEN" | jq '.events[] | {state, url, messageId}'
|
||||
```
|
||||
|
||||
`/v2/events` only shows messages you publish to QStash directly (the initial trigger, plus any `client.trigger(...)` calls from inside a `context.run`). It does **NOT** show internal workflow-step messages that `serve()` publishes to itself — for those, use `/v2/workflows/logs`.
|
||||
|
||||
If you trigger pipeline A → B and only see A's messages in `/v2/events`, that usually means A's handler published correctly but B hasn't been inspected by workflow logs yet. Query `/v2/workflows/logs` for B's workflowRunId instead.
|
||||
|
||||
## 3. Common failure modes
|
||||
|
||||
### a. 500 "Upstash-Signature header is not passed"
|
||||
|
||||
You curl'd the handler directly. Publish via `http://localhost:8080/v2/publish/<target>` instead.
|
||||
|
||||
### b. Handler runs but no downstream workflow fires
|
||||
|
||||
The `qstashClient` passed to `serve()` or used by your `triggerXxx` helper probably doesn't honor `QSTASH_URL`. **Both clients must point at the dev server.**
|
||||
|
||||
`@upstash/qstash`'s `Client` uses **`baseUrl`** in the config object (NOT `url`) and also reads `QSTASH_URL` from env automatically:
|
||||
|
||||
```ts
|
||||
// ✅ Correct
|
||||
new Client({ token, baseUrl: process.env.QSTASH_URL });
|
||||
|
||||
// ⚠️ Works too (env var fallback) but explicit is safer
|
||||
new Client({ token });
|
||||
```
|
||||
|
||||
`@upstash/workflow`'s `Client` — used by `MemoryExtractionWorkflowService` and similar trigger helpers — forwards to the same QStash client internally.
|
||||
|
||||
### c. `triggerXxx()` returns `{workflowRunId}` but `/v2/events` shows nothing
|
||||
|
||||
`/v2/events` only lists direct publishes. A `client.trigger()` call publishes to QStash's workflow API, which creates a run log entry (visible via `/v2/workflows/logs`) plus its own initial QStash message. Always cross-check with `/v2/workflows/logs` before concluding the trigger failed.
|
||||
|
||||
### d. Dry-run path still cascades to L2
|
||||
|
||||
Means the handler read `dryRun` from the wrong field. For our codebase the convention is to put `dryRun: true` at the **top level** of the body; the L1 handler reads it off `context.requestPayload` directly (not via `normalizeMemoryExtractionPayload`, which strips unknown fields). When in doubt, `appendFileSync('/tmp/<wf>-debug.log', ...)` inside the handler to log the exact payload received.
|
||||
|
||||
### e. You need to see handler logs but can't access dev server stdout
|
||||
|
||||
Dev is usually started in the background. When you can't tail stdout, drop a **temporary** file logger into the handler:
|
||||
|
||||
```ts
|
||||
import { appendFileSync } from 'node:fs';
|
||||
|
||||
appendFileSync('/tmp/wf-debug.log', `[${new Date().toISOString()}] <message>\n`);
|
||||
```
|
||||
|
||||
Delete before committing. Also consider `verbose: true` on the `serve()` options — that routes @upstash/workflow's internal tracing to console (which, again, you need stdout access for).
|
||||
|
||||
## 4. End-to-end smoke recipes
|
||||
|
||||
### Dry-run the entire hourly cron dispatcher
|
||||
|
||||
```bash
|
||||
TOKEN=$(grep '^QSTASH_TOKEN=' .env | cut -d '"' -f2)
|
||||
TARGET='http://localhost:3011/api/workflows/memory-user-memory/cron/hourly'
|
||||
MSG=$(curl -s -X POST "http://localhost:8080/v2/publish/$TARGET" \
|
||||
-H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' \
|
||||
-d '{"dryRun": true, "baseUrl": "http://localhost:3011"}' \
|
||||
| jq -r .messageId)
|
||||
|
||||
# Follow the cron/hourly run to completion (polls until RUN_SUCCESS or RUN_FAILED)
|
||||
while :; do
|
||||
STATE=$(curl -s "http://localhost:8080/v2/workflows/logs" \
|
||||
-H "Authorization: Bearer $TOKEN" \
|
||||
| jq -r --arg url "$TARGET" '.runs[] | select(.workflowUrl == $url) | .workflowState' | head -1)
|
||||
echo "state: $STATE"
|
||||
[[ "$STATE" == "RUN_SUCCESS" || "$STATE" == "RUN_FAILED" ]] && break
|
||||
sleep 2
|
||||
done
|
||||
```
|
||||
|
||||
Expected on success: two child workflow runs appear in `/v2/workflows/logs` — one at `/topics/process-users`, one at `/persona/process-users`. Each should also reach `RUN_SUCCESS` in dry-run (L1 returns stats; no L2 triggered).
|
||||
|
||||
### Directly target a single L1 (skip the cron dispatcher)
|
||||
|
||||
```bash
|
||||
TARGET='http://localhost:3011/api/workflows/memory-user-memory/topics/process-users'
|
||||
curl -X POST "http://localhost:8080/v2/publish/$TARGET" \
|
||||
-H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' \
|
||||
-d '{"dryRun": true, "baseUrl": "http://localhost:3011", "mode": "workflow"}'
|
||||
```
|
||||
|
||||
Then query logs for that workflow run — should complete in 1–2 steps with stats in the final step's `out`.
|
||||
|
||||
## 5. What NOT to do
|
||||
|
||||
- ❌ Unit-testing the handler by constructing a fake `WorkflowContext`. The workflow runtime does step caching, replay, and QStash round-trips that you can't realistically mock. Integration via QStash dev server is faster and more accurate.
|
||||
- ❌ Bypassing signature verification by clearing `QSTASH_*_SIGNING_KEY` env. Dev QStash signs requests — leaving verification on catches misconfigured receivers.
|
||||
- ❌ Relying on `/v2/events` as the full picture of a workflow run. Use `/v2/workflows/logs` for step-level truth.
|
||||
|
||||
## References
|
||||
|
||||
- Upstash QStash local dev: <https://upstash.com/docs/qstash/howto/local-development>
|
||||
- Workflow basics (serve/context/run): <https://upstash.com/docs/workflow/basics/context>
|
||||
- Related skill: `upstash-workflow` (implementation patterns)
|
||||
@@ -395,6 +395,40 @@ export class UserModel {
|
||||
return options.limit !== undefined ? query.limit(options.limit) : query;
|
||||
};
|
||||
|
||||
static countUsersForHourlyMemoryExtractor = async (
|
||||
db: LobeChatDatabase,
|
||||
options: Pick<ListUsersForHourlyMemoryExtractorOptions, 'whitelist'> = {},
|
||||
): Promise<number> => {
|
||||
// NOTICE: Reversed from the per-row EXISTS pattern in listUsersForHourlyMemoryExtractor.
|
||||
// Start with a CTE of users who actually chatted (messages ⋈ topics on user_id) then join
|
||||
// to users+settings. On our data this is ~20× faster than the EXISTS variant because the
|
||||
// chatted set is much smaller than the total users table.
|
||||
const whitelistSql =
|
||||
options.whitelist && options.whitelist.length > 0
|
||||
? sql`AND u.id IN (${sql.join(
|
||||
options.whitelist.map((id) => sql`${id}`),
|
||||
sql`, `,
|
||||
)})`
|
||||
: sql``;
|
||||
|
||||
const rows = await db.execute<{ n: number }>(sql`
|
||||
WITH chatted AS (
|
||||
SELECT DISTINCT m.user_id
|
||||
FROM ${messages} m
|
||||
INNER JOIN ${topics} t ON t.id = m.topic_id AND t.user_id = m.user_id
|
||||
WHERE m.role = 'user'
|
||||
)
|
||||
SELECT COUNT(*)::int AS n
|
||||
FROM chatted c
|
||||
INNER JOIN ${users} u ON u.id = c.user_id
|
||||
LEFT JOIN ${userSettings} s ON s.id = u.id
|
||||
WHERE COALESCE((s.memory ->> 'enabled')::boolean, true) = true
|
||||
${whitelistSql}
|
||||
`);
|
||||
|
||||
return rows.rows[0]?.n ?? 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* Get user info for AI generation (name and language preference)
|
||||
*/
|
||||
|
||||
@@ -41,7 +41,7 @@ export const POST = async (req: Request) => {
|
||||
|
||||
const params = normalizeMemoryExtractionPayload(payload, origin);
|
||||
if (params.mode === 'workflow') {
|
||||
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerProcessUsers(
|
||||
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerTopicsProcessUsers(
|
||||
buildWorkflowPayloadInput(params),
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
);
|
||||
|
||||
@@ -63,7 +63,7 @@ export const POST = async (req: Request) => {
|
||||
if (params.mode === 'workflow') {
|
||||
const results = await Promise.all(
|
||||
params.userIds.map(async (userId) => {
|
||||
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerPersonaUpdate(
|
||||
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerPersonaExecuteUser(
|
||||
userId,
|
||||
params.baseUrl,
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
|
||||
@@ -12,8 +12,8 @@ const mockFindById = vi.fn();
|
||||
|
||||
const mockCountTopicsForMemoryExtractor = vi.fn();
|
||||
const mockDeleteAll = vi.fn();
|
||||
const { mockTriggerProcessUsers } = vi.hoisted(() => ({
|
||||
mockTriggerProcessUsers: vi.fn(),
|
||||
const { mockTriggerTopicsProcessUsers } = vi.hoisted(() => ({
|
||||
mockTriggerTopicsProcessUsers: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock('@/database/models/asyncTask', () => ({
|
||||
@@ -59,7 +59,7 @@ vi.mock('@/server/globalConfig/parseMemoryExtractionConfig', () => ({
|
||||
|
||||
vi.mock('@/server/services/memory/userMemory/extract', () => ({
|
||||
MemoryExtractionWorkflowService: {
|
||||
triggerProcessUsers: mockTriggerProcessUsers,
|
||||
triggerTopicsProcessUsers: mockTriggerTopicsProcessUsers,
|
||||
},
|
||||
buildWorkflowPayloadInput: (payload: any) => payload,
|
||||
normalizeMemoryExtractionPayload: (payload: any) => payload,
|
||||
@@ -78,7 +78,7 @@ const createCaller = (ctxOverrides: Partial<any> = {}) => {
|
||||
describe('userMemoryRouter.requestMemoryFromChatTopic', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
mockTriggerProcessUsers.mockResolvedValue({ workflowRunId: 'workflow-run-1' });
|
||||
mockTriggerTopicsProcessUsers.mockResolvedValue({ workflowRunId: 'workflow-run-1' });
|
||||
});
|
||||
|
||||
it('dedupes when an active task exists', async () => {
|
||||
@@ -98,7 +98,7 @@ describe('userMemoryRouter.requestMemoryFromChatTopic', () => {
|
||||
status: AsyncTaskStatus.Pending,
|
||||
});
|
||||
expect(mockCreate).not.toHaveBeenCalled();
|
||||
expect(mockTriggerProcessUsers).not.toHaveBeenCalled();
|
||||
expect(mockTriggerTopicsProcessUsers).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('creates task and triggers workflow with user context and dates', async () => {
|
||||
@@ -124,7 +124,7 @@ describe('userMemoryRouter.requestMemoryFromChatTopic', () => {
|
||||
status: AsyncTaskStatus.Pending,
|
||||
type: AsyncTaskType.UserMemoryExtractionWithChatTopic,
|
||||
});
|
||||
expect(mockTriggerProcessUsers).toHaveBeenCalledWith(
|
||||
expect(mockTriggerTopicsProcessUsers).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
asyncTaskId: 'new-task',
|
||||
baseUrl: 'https://internal.example.com',
|
||||
@@ -170,7 +170,7 @@ describe('userMemoryRouter.requestMemoryFromChatTopic', () => {
|
||||
},
|
||||
status: AsyncTaskStatus.Success,
|
||||
});
|
||||
expect(mockTriggerProcessUsers).not.toHaveBeenCalled();
|
||||
expect(mockTriggerTopicsProcessUsers).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('throws on invalid date range', async () => {
|
||||
|
||||
@@ -275,7 +275,7 @@ export const userMemoryRouter = router({
|
||||
const baseUrl = webhook.baseUrl || appEnv.INTERNAL_APP_URL || appEnv.APP_URL;
|
||||
|
||||
try {
|
||||
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerProcessUsers(
|
||||
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerTopicsProcessUsers(
|
||||
buildWorkflowPayloadInput(
|
||||
normalizeMemoryExtractionPayload({
|
||||
asyncTaskId: taskId,
|
||||
|
||||
@@ -1709,6 +1709,13 @@ export class MemoryExtractionExecutor {
|
||||
};
|
||||
}
|
||||
|
||||
async countUsersForHourlyExtraction(): Promise<number> {
|
||||
const db = await this.db;
|
||||
return UserModel.countUsersForHourlyMemoryExtractor(db, {
|
||||
whitelist: this.privateConfig.whitelistUsers,
|
||||
});
|
||||
}
|
||||
|
||||
async getUsersForHourlyExtraction(
|
||||
limit: number,
|
||||
cursor?: ListUsersForMemoryExtractorCursor,
|
||||
@@ -2264,12 +2271,34 @@ export class MemoryExtractionExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
const WORKFLOW_PATHS = {
|
||||
hourly: '/api/workflows/memory-user-memory/call-cron-hourly-analysis',
|
||||
personaUpdate: '/api/workflows/memory-user-memory/pipelines/persona/update-writing',
|
||||
topicBatch: '/api/workflows/memory-user-memory/pipelines/chat-topic/process-topics',
|
||||
userTopics: '/api/workflows/memory-user-memory/pipelines/chat-topic/process-user-topics',
|
||||
users: '/api/workflows/memory-user-memory/pipelines/chat-topic/process-users',
|
||||
export interface PersonaWorkflowCursor {
|
||||
createdAt: string;
|
||||
id: string;
|
||||
}
|
||||
|
||||
export interface PersonaProcessUsersWorkflowPayload {
|
||||
baseUrl?: string;
|
||||
cursor?: PersonaWorkflowCursor;
|
||||
dryRun?: boolean;
|
||||
userIds?: string[];
|
||||
}
|
||||
|
||||
export interface PersonaExecuteUserWorkflowPayload {
|
||||
userId: string;
|
||||
}
|
||||
|
||||
export const WORKFLOW_PATHS = {
|
||||
hourlyCron: '/api/workflows/memory-user-memory/cron/hourly',
|
||||
|
||||
personaExecuteUser: '/api/workflows/memory-user-memory/persona/execute-user',
|
||||
personaPaginateUsers: '/api/workflows/memory-user-memory/persona/paginate-users',
|
||||
personaProcessUsers: '/api/workflows/memory-user-memory/persona/process-users',
|
||||
|
||||
topicsExecuteUser: '/api/workflows/memory-user-memory/topics/execute-user',
|
||||
// Invoked internally via context.invoke from topics/execute-user; workflowId must match the last path segment.
|
||||
topicsExtractTopic: '/api/workflows/memory-user-memory/topics/extract-topic',
|
||||
topicsPaginateUsers: '/api/workflows/memory-user-memory/topics/paginate-users',
|
||||
topicsProcessUsers: '/api/workflows/memory-user-memory/topics/process-users',
|
||||
} as const;
|
||||
|
||||
const getWorkflowUrl = (path: string, baseUrl: string) => {
|
||||
@@ -2291,6 +2320,13 @@ const getWorkflowClient = () => {
|
||||
return new Client(config);
|
||||
};
|
||||
|
||||
type TriggerOptions = { extraHeaders?: Record<string, string> };
|
||||
|
||||
const requireBaseUrl = (baseUrl: string | undefined): string => {
|
||||
if (!baseUrl) throw new Error('Missing baseUrl for workflow trigger');
|
||||
return baseUrl;
|
||||
};
|
||||
|
||||
export class MemoryExtractionWorkflowService {
|
||||
private static client: Client;
|
||||
|
||||
@@ -2302,87 +2338,80 @@ export class MemoryExtractionWorkflowService {
|
||||
return this.client;
|
||||
}
|
||||
|
||||
static triggerProcessUsers(
|
||||
payload: MemoryExtractionPayloadInput,
|
||||
options?: { extraHeaders?: Record<string, string> },
|
||||
) {
|
||||
if (!payload.baseUrl) {
|
||||
throw new Error('Missing baseUrl for workflow trigger');
|
||||
}
|
||||
// ─── External cron entry ──────────────────────────────────────────────
|
||||
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.users, payload.baseUrl);
|
||||
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
|
||||
}
|
||||
|
||||
static triggerHourly(
|
||||
static triggerHourlyCron(
|
||||
payload: MemoryExtractionHourlyWorkflowPayload,
|
||||
options?: { extraHeaders?: Record<string, string> },
|
||||
options?: TriggerOptions,
|
||||
) {
|
||||
if (!payload.baseUrl) {
|
||||
throw new Error('Missing baseUrl for workflow trigger');
|
||||
}
|
||||
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.hourly, payload.baseUrl);
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.hourlyCron, requireBaseUrl(payload.baseUrl));
|
||||
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
|
||||
}
|
||||
|
||||
static triggerProcessUserTopics(
|
||||
payload: UserTopicWorkflowPayload,
|
||||
options?: { extraHeaders?: Record<string, string> },
|
||||
) {
|
||||
if (!payload.baseUrl) {
|
||||
throw new Error('Missing baseUrl for workflow trigger');
|
||||
}
|
||||
// ─── Topics pipeline (3 layers + inner extract-topic workflow) ────────
|
||||
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.userTopics, payload.baseUrl);
|
||||
return this.getClient().trigger({
|
||||
body: payload,
|
||||
headers: options?.extraHeaders,
|
||||
url,
|
||||
});
|
||||
static triggerTopicsProcessUsers(
|
||||
payload: MemoryExtractionPayloadInput,
|
||||
options?: TriggerOptions,
|
||||
) {
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.topicsProcessUsers, requireBaseUrl(payload.baseUrl));
|
||||
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
|
||||
}
|
||||
|
||||
static triggerProcessTopics(
|
||||
static triggerTopicsPaginateUsers(
|
||||
payload: MemoryExtractionPayloadInput,
|
||||
options?: TriggerOptions,
|
||||
) {
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.topicsPaginateUsers, requireBaseUrl(payload.baseUrl));
|
||||
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
|
||||
}
|
||||
|
||||
static triggerTopicsExecuteUser(
|
||||
userId: string,
|
||||
payload: MemoryExtractionPayloadInput,
|
||||
options?: { extraHeaders?: Record<string, string> },
|
||||
options?: TriggerOptions,
|
||||
) {
|
||||
if (!payload.baseUrl) {
|
||||
throw new Error('Missing baseUrl for workflow trigger');
|
||||
}
|
||||
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.topicBatch, payload.baseUrl);
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.topicsExecuteUser, requireBaseUrl(payload.baseUrl));
|
||||
return this.getClient().trigger({
|
||||
body: payload,
|
||||
flowControl: {
|
||||
key: `memory-user-memory.pipelines.chat-topic.process-topics.user.${userId}`,
|
||||
// NOTICE: if modified the parallelism of
|
||||
// src/server/workflows-hono/memory-user-memory/workflows/processTopics.ts
|
||||
// or added new memory layer, make sure to update the number below.
|
||||
//
|
||||
// Currently, CEPA (context, experience, preference, activity) + identity = 5 layers.
|
||||
// and since identity requires sequential processing, we set parallelism to 5.
|
||||
key: `memory-user-memory.topics.execute-user.${userId}`,
|
||||
// NOTICE: one execute-user run invokes extract-topic per topic in parallel.
|
||||
// Keep this conservative so a single user with many topics does not starve the queue.
|
||||
parallelism: 5,
|
||||
},
|
||||
} satisfies FlowControl,
|
||||
headers: options?.extraHeaders,
|
||||
url,
|
||||
});
|
||||
}
|
||||
|
||||
static triggerPersonaUpdate(
|
||||
userId: string,
|
||||
baseUrl: string,
|
||||
options?: { extraHeaders?: Record<string, string> },
|
||||
) {
|
||||
if (!baseUrl) {
|
||||
throw new Error('Missing baseUrl for workflow trigger');
|
||||
}
|
||||
// ─── Persona pipeline (3 layers) ──────────────────────────────────────
|
||||
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.personaUpdate, baseUrl);
|
||||
static triggerPersonaProcessUsers(
|
||||
payload: PersonaProcessUsersWorkflowPayload,
|
||||
options?: TriggerOptions,
|
||||
) {
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.personaProcessUsers, requireBaseUrl(payload.baseUrl));
|
||||
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
|
||||
}
|
||||
|
||||
static triggerPersonaPaginateUsers(
|
||||
payload: PersonaProcessUsersWorkflowPayload,
|
||||
options?: TriggerOptions,
|
||||
) {
|
||||
const url = getWorkflowUrl(
|
||||
WORKFLOW_PATHS.personaPaginateUsers,
|
||||
requireBaseUrl(payload.baseUrl),
|
||||
);
|
||||
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
|
||||
}
|
||||
|
||||
static triggerPersonaExecuteUser(userId: string, baseUrl: string, options?: TriggerOptions) {
|
||||
const url = getWorkflowUrl(WORKFLOW_PATHS.personaExecuteUser, requireBaseUrl(baseUrl));
|
||||
return this.getClient().trigger({
|
||||
body: { userIds: [userId] },
|
||||
body: { userId } satisfies PersonaExecuteUserWorkflowPayload,
|
||||
flowControl: {
|
||||
key: `memory-user-memory.pipelines.persona.update-write.${userId}`,
|
||||
key: `memory-user-memory.persona.execute-user.${userId}`,
|
||||
parallelism: 1,
|
||||
} satisfies FlowControl,
|
||||
headers: options?.extraHeaders,
|
||||
|
||||
@@ -2,52 +2,60 @@ import { serve, serveMany } from '@upstash/workflow/hono';
|
||||
import { Hono } from 'hono';
|
||||
|
||||
import { createWorkflowQstashClient } from './qstashClient';
|
||||
import { hourlyWorkflowHandler, hourlyWorkflowOptions } from './workflows/hourly';
|
||||
import { personaUpdateHandler } from './workflows/personaUpdate';
|
||||
import { processTopicWorkflow } from './workflows/processTopic';
|
||||
import { processTopicsHandler } from './workflows/processTopics';
|
||||
import { processUsersHandler } from './workflows/processUsers';
|
||||
import { processUserTopicsHandler } from './workflows/processUserTopics';
|
||||
import { hourlyCronHandler } from './workflows/cron/hourly';
|
||||
import { executeUserHandler as personaExecuteUserHandler } from './workflows/persona/executeUser';
|
||||
import { paginateUsersHandler as personaPaginateUsersHandler } from './workflows/persona/paginateUsers';
|
||||
import { processUsersHandler as personaProcessUsersHandler } from './workflows/persona/processUsers';
|
||||
import { executeUserHandler as topicsExecuteUserHandler } from './workflows/topics/executeUser';
|
||||
import { extractTopicWorkflow } from './workflows/topics/extractTopic';
|
||||
import { paginateUsersHandler as topicsPaginateUsersHandler } from './workflows/topics/paginateUsers';
|
||||
import { processUsersHandler as topicsProcessUsersHandler } from './workflows/topics/processUsers';
|
||||
|
||||
const app = new Hono().basePath('/api/workflows/memory-user-memory');
|
||||
|
||||
// ─── External cron entry ───────────────────────────────────────────────
|
||||
app.post('/cron/hourly', serve(hourlyCronHandler, { qstashClient: createWorkflowQstashClient() }));
|
||||
|
||||
// ─── Topics pipeline (3 layers + inner extract-topic workflow) ─────────
|
||||
app.post(
|
||||
'/call-cron-hourly-analysis',
|
||||
serve(hourlyWorkflowHandler, {
|
||||
...hourlyWorkflowOptions,
|
||||
qstashClient: createWorkflowQstashClient(),
|
||||
}),
|
||||
'/topics/process-users',
|
||||
serve(topicsProcessUsersHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
);
|
||||
|
||||
app.post(
|
||||
'/pipelines/persona/update-writing',
|
||||
serve(personaUpdateHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
'/topics/paginate-users',
|
||||
serve(topicsPaginateUsersHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
);
|
||||
|
||||
app.post(
|
||||
'/pipelines/chat-topic/process-users',
|
||||
serve(processUsersHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
'/topics/execute-user',
|
||||
serve(topicsExecuteUserHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
);
|
||||
|
||||
// NOTICE: `context.invoke(extractTopicWorkflow)` in topics/execute-user rewrites the URL last
|
||||
// segment to the workflowId (`extract-topic`); serveMany dispatches that to the right workflow.
|
||||
app.post(
|
||||
'/pipelines/chat-topic/process-user-topics',
|
||||
serve(processUserTopicsHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
);
|
||||
|
||||
app.post(
|
||||
'/pipelines/chat-topic/process-topics',
|
||||
serve(processTopicsHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
);
|
||||
|
||||
// NOTICE: Must use serveMany here. The `context.invoke(processTopicWorkflow)` call in
|
||||
// process-topics rewrites the URL last segment to the workflowId ("process-topic"). serveMany
|
||||
// multiplexes by that final segment to dispatch to the right workflow.
|
||||
app.post(
|
||||
'/pipelines/chat-topic/process-topic',
|
||||
'/topics/extract-topic',
|
||||
serveMany(
|
||||
{ 'process-topic': processTopicWorkflow },
|
||||
{ 'extract-topic': extractTopicWorkflow },
|
||||
{ qstashClient: createWorkflowQstashClient() },
|
||||
),
|
||||
);
|
||||
|
||||
// ─── Persona pipeline (3 layers) ───────────────────────────────────────
|
||||
app.post(
|
||||
'/persona/process-users',
|
||||
serve(personaProcessUsersHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
);
|
||||
|
||||
app.post(
|
||||
'/persona/paginate-users',
|
||||
serve(personaPaginateUsersHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
);
|
||||
|
||||
app.post(
|
||||
'/persona/execute-user',
|
||||
serve(personaExecuteUserHandler, { qstashClient: createWorkflowQstashClient() }),
|
||||
);
|
||||
|
||||
export default app;
|
||||
|
||||
@@ -10,6 +10,9 @@ const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
// https://upstash.com/docs/workflow/troubleshooting/vercel#step-2-pass-header-when-triggering
|
||||
export const createWorkflowQstashClient = () =>
|
||||
new Client({
|
||||
// QStash Client reads QSTASH_URL/QSTASH_TOKEN from env automatically; explicit baseUrl here
|
||||
// keeps local-dev behavior visible and avoids any config-shape confusion with `url`.
|
||||
...(process.env.QSTASH_URL ? { baseUrl: process.env.QSTASH_URL } : {}),
|
||||
headers: { ...upstashWorkflowExtraHeaders },
|
||||
token: process.env.QSTASH_TOKEN!,
|
||||
});
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
|
||||
import { appEnv } from '@/envs/app';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import {
|
||||
type MemoryExtractionHourlyWorkflowPayload,
|
||||
type MemoryExtractionPayloadInput,
|
||||
MemoryExtractionWorkflowService,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
const { webhook, upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
const resolveBaseUrl = () => webhook.baseUrl || appEnv.INTERNAL_APP_URL || appEnv.APP_URL;
|
||||
|
||||
/**
|
||||
* External cron entry.
|
||||
*
|
||||
* QStash scheduled cron posts here hourly. Responsibility: dispatch both pipelines (topics + persona)
|
||||
* L1 in parallel. Each pipeline then does its own pagination and filtering — this handler stays thin.
|
||||
*/
|
||||
export const hourlyCronHandler = async (
|
||||
context: WorkflowContext<MemoryExtractionHourlyWorkflowPayload>,
|
||||
) => {
|
||||
const payload = context.requestPayload || ({} as MemoryExtractionHourlyWorkflowPayload);
|
||||
const baseUrl = payload.baseUrl || resolveBaseUrl();
|
||||
if (!baseUrl) {
|
||||
throw new Error('Missing baseUrl for hourly cron dispatcher');
|
||||
}
|
||||
|
||||
const dryRun = !!payload.dryRun;
|
||||
|
||||
await context.run('memory:cron:hourly:trigger-topics-process-users', () =>
|
||||
MemoryExtractionWorkflowService.triggerTopicsProcessUsers(
|
||||
// `dryRun` isn't part of MemoryExtractionPayloadInput but is forwarded verbatim in the JSON
|
||||
// body; the topics L1 handler reads it off the raw request payload.
|
||||
{ baseUrl, dryRun, mode: 'workflow' } as MemoryExtractionPayloadInput & { dryRun?: boolean },
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
|
||||
await context.run('memory:cron:hourly:trigger-persona-process-users', () =>
|
||||
MemoryExtractionWorkflowService.triggerPersonaProcessUsers(
|
||||
{ baseUrl, dryRun },
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
|
||||
return {
|
||||
dispatched: { persona: true, topics: true },
|
||||
dryRun,
|
||||
message: dryRun
|
||||
? '[DryRun] Dispatched both topics and persona pipelines in dry-run mode.'
|
||||
: 'Dispatched both topics and persona pipelines.',
|
||||
success: true,
|
||||
};
|
||||
};
|
||||
@@ -1,105 +0,0 @@
|
||||
import { MemorySourceType } from '@lobechat/types';
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
import { chunk } from 'es-toolkit/compat';
|
||||
|
||||
import { appEnv } from '@/envs/app';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import {
|
||||
buildWorkflowPayloadInput,
|
||||
MemoryExtractionExecutor,
|
||||
type MemoryExtractionHourlyWorkflowPayload,
|
||||
MemoryExtractionWorkflowService,
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
const USER_PAGE_SIZE = 200;
|
||||
const USER_BATCH_SIZE = 20;
|
||||
|
||||
const { webhook, upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
const resolveBaseUrl = () => webhook.baseUrl || appEnv.INTERNAL_APP_URL || appEnv.APP_URL;
|
||||
|
||||
export const hourlyWorkflowHandler = async (
|
||||
context: WorkflowContext<MemoryExtractionHourlyWorkflowPayload>,
|
||||
) => {
|
||||
const { cursor, dryRun } = context.requestPayload || {};
|
||||
|
||||
const baseUrl = resolveBaseUrl();
|
||||
if (!baseUrl) {
|
||||
throw new Error('Missing baseUrl for hourly memory extraction workflow');
|
||||
}
|
||||
|
||||
const parsedCursor = cursor
|
||||
? { createdAt: new Date(cursor.createdAt), id: cursor.id }
|
||||
: undefined;
|
||||
if (parsedCursor && Number.isNaN(parsedCursor.createdAt.getTime())) {
|
||||
throw new Error('Invalid cursor date for hourly memory extraction workflow');
|
||||
}
|
||||
|
||||
const executor = await MemoryExtractionExecutor.create();
|
||||
const userBatch = await context.run(
|
||||
`memory:user-memory:hourly:list-users:${parsedCursor?.id || 'root'}`,
|
||||
() => executor.getUsersForHourlyExtraction(USER_PAGE_SIZE, parsedCursor),
|
||||
);
|
||||
|
||||
const userIds = userBatch.ids;
|
||||
if (userIds.length === 0) {
|
||||
return { message: 'No eligible users for hourly memory extraction.', processedUsers: 0 };
|
||||
}
|
||||
|
||||
const nextCursor = userBatch.cursor
|
||||
? {
|
||||
createdAt: userBatch.cursor.createdAt.toISOString(),
|
||||
id: userBatch.cursor.id,
|
||||
}
|
||||
: undefined;
|
||||
|
||||
if (!dryRun) {
|
||||
const batches = chunk(userIds, USER_BATCH_SIZE);
|
||||
await Promise.all(
|
||||
batches.map((batchUserIds, index) =>
|
||||
context.run(`memory:user-memory:hourly:trigger-users:${index}`, () =>
|
||||
MemoryExtractionWorkflowService.triggerProcessUsers(
|
||||
buildWorkflowPayloadInput(
|
||||
normalizeMemoryExtractionPayload({
|
||||
baseUrl,
|
||||
mode: 'workflow',
|
||||
sources: [MemorySourceType.ChatTopic],
|
||||
userIds: batchUserIds,
|
||||
}),
|
||||
),
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
if (nextCursor) {
|
||||
await context.run('memory:user-memory:hourly:schedule-next-page', () =>
|
||||
MemoryExtractionWorkflowService.triggerHourly(
|
||||
{
|
||||
baseUrl,
|
||||
cursor: nextCursor,
|
||||
dryRun,
|
||||
},
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
dryRun: !!dryRun,
|
||||
hasNextPage: !!nextCursor,
|
||||
processedUsers: userIds.length,
|
||||
scheduledBatches: dryRun ? 0 : chunk(userIds, USER_BATCH_SIZE).length,
|
||||
};
|
||||
};
|
||||
|
||||
export const hourlyWorkflowOptions = {
|
||||
flowControl: {
|
||||
key: 'memory-user-memory.call-cron-hourly-analysis',
|
||||
parallelism: 1,
|
||||
ratePerSecond: 1,
|
||||
},
|
||||
};
|
||||
@@ -0,0 +1,45 @@
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { getServerDB } from '@/database/server';
|
||||
import { type PersonaExecuteUserWorkflowPayload } from '@/server/services/memory/userMemory/extract';
|
||||
import {
|
||||
buildUserPersonaJobInput,
|
||||
UserPersonaService,
|
||||
} from '@/server/services/memory/userMemory/persona/service';
|
||||
|
||||
const payloadSchema = z.object({
|
||||
userId: z.string(),
|
||||
});
|
||||
|
||||
/**
|
||||
* L3: Compose persona writing for ONE user.
|
||||
*/
|
||||
export const executeUserHandler = async (
|
||||
context: WorkflowContext<PersonaExecuteUserWorkflowPayload>,
|
||||
) => {
|
||||
const payload = await context.run('memory:persona:execute-user:parse-payload', () =>
|
||||
payloadSchema.parse(context.requestPayload || {}),
|
||||
);
|
||||
|
||||
const { userId } = payload;
|
||||
const db = await getServerDB();
|
||||
const service = new UserPersonaService(db);
|
||||
|
||||
const result = await context.run(`memory:persona:execute-user:${userId}:compose`, async () => {
|
||||
const jobInput = await buildUserPersonaJobInput(db, userId);
|
||||
const composed = await service.composeWriting({ ...jobInput, userId });
|
||||
return {
|
||||
diffId: composed.diff?.id,
|
||||
documentId: composed.document.id,
|
||||
userId,
|
||||
version: composed.document.version,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
message: 'Persona composed.',
|
||||
success: true,
|
||||
...result,
|
||||
};
|
||||
};
|
||||
@@ -0,0 +1,109 @@
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
import { chunk } from 'es-toolkit/compat';
|
||||
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import {
|
||||
MemoryExtractionExecutor,
|
||||
MemoryExtractionWorkflowService,
|
||||
type PersonaProcessUsersWorkflowPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
const USER_PAGE_SIZE = 50;
|
||||
const CHUNK_SIZE = 20;
|
||||
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
const requireBaseUrl = (baseUrl?: string) => {
|
||||
if (!baseUrl) throw new Error('Missing baseUrl for persona paginate-users');
|
||||
return baseUrl;
|
||||
};
|
||||
|
||||
/**
|
||||
* L2: Paginate eligible users and fan-out to execute-user.
|
||||
*
|
||||
* Reuses `executor.getUsers` (same base set as topics) — persona eligibility finer-grained
|
||||
* filtering lives inside `UserPersonaService.composeWriting`, which is a no-op for users whose
|
||||
* persona is up-to-date.
|
||||
*/
|
||||
export const paginateUsersHandler = async (
|
||||
context: WorkflowContext<PersonaProcessUsersWorkflowPayload>,
|
||||
) => {
|
||||
const payload = context.requestPayload || ({} as PersonaProcessUsersWorkflowPayload);
|
||||
const baseUrl = requireBaseUrl(payload.baseUrl);
|
||||
|
||||
// Fan-out chunk path.
|
||||
if (payload.userIds && payload.userIds.length > 0 && !payload.cursor) {
|
||||
await Promise.all(
|
||||
payload.userIds.map((userId) =>
|
||||
context.run(`memory:persona:paginate-users:fanout:execute:${userId}`, () =>
|
||||
MemoryExtractionWorkflowService.triggerPersonaExecuteUser(userId, baseUrl, {
|
||||
extraHeaders: upstashWorkflowExtraHeaders,
|
||||
}),
|
||||
),
|
||||
),
|
||||
);
|
||||
return { fannedOut: payload.userIds.length };
|
||||
}
|
||||
|
||||
// Cursor pagination.
|
||||
const cursor = payload.cursor
|
||||
? { createdAt: new Date(payload.cursor.createdAt), id: payload.cursor.id }
|
||||
: undefined;
|
||||
if (cursor && Number.isNaN(cursor.createdAt.getTime())) {
|
||||
throw new Error('Invalid cursor.createdAt for persona paginate-users');
|
||||
}
|
||||
|
||||
const executor = await MemoryExtractionExecutor.create();
|
||||
const batch = await context.run(
|
||||
`memory:persona:paginate-users:list:${cursor?.id || 'root'}`,
|
||||
() => executor.getUsersForHourlyExtraction(USER_PAGE_SIZE, cursor),
|
||||
);
|
||||
|
||||
const userIds = batch.ids;
|
||||
const nextCursor = 'cursor' in batch ? batch.cursor : undefined;
|
||||
|
||||
if (userIds.length === 0) {
|
||||
return { message: 'No users in page, pagination complete.' };
|
||||
}
|
||||
|
||||
if (userIds.length > CHUNK_SIZE) {
|
||||
const chunks = chunk(userIds, CHUNK_SIZE);
|
||||
await Promise.all(
|
||||
chunks.map((chunkIds, idx) =>
|
||||
context.run(`memory:persona:paginate-users:fanout:${idx + 1}/${chunks.length}`, () =>
|
||||
MemoryExtractionWorkflowService.triggerPersonaPaginateUsers(
|
||||
{ baseUrl, userIds: chunkIds },
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
} else {
|
||||
await Promise.all(
|
||||
userIds.map((userId) =>
|
||||
context.run(`memory:persona:paginate-users:execute:${userId}`, () =>
|
||||
MemoryExtractionWorkflowService.triggerPersonaExecuteUser(userId, baseUrl, {
|
||||
extraHeaders: upstashWorkflowExtraHeaders,
|
||||
}),
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
if (nextCursor) {
|
||||
await context.run('memory:persona:paginate-users:schedule-next-page', () =>
|
||||
MemoryExtractionWorkflowService.triggerPersonaPaginateUsers(
|
||||
{
|
||||
baseUrl,
|
||||
cursor: { createdAt: nextCursor.createdAt.toISOString(), id: nextCursor.id },
|
||||
},
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
nextCursor: nextCursor ? nextCursor.id : null,
|
||||
processedUsers: userIds.length,
|
||||
};
|
||||
};
|
||||
@@ -0,0 +1,86 @@
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import {
|
||||
MemoryExtractionExecutor,
|
||||
MemoryExtractionWorkflowService,
|
||||
type PersonaProcessUsersWorkflowPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
const requireBaseUrl = (baseUrl?: string) => {
|
||||
if (!baseUrl) throw new Error('Missing baseUrl for persona process-users');
|
||||
return baseUrl;
|
||||
};
|
||||
|
||||
/**
|
||||
* L1: Entry for the persona update pipeline.
|
||||
*
|
||||
* - If `userIds` provided, skip eligibility query and fan out directly via L2.
|
||||
* - Else materialise the full eligible user list in one step so dry-run can report the exact total.
|
||||
* - When not dry-run: trigger L2 (paginate-users) to walk all users via cursor pagination.
|
||||
*/
|
||||
export const processUsersHandler = async (
|
||||
context: WorkflowContext<PersonaProcessUsersWorkflowPayload>,
|
||||
) => {
|
||||
const payload = context.requestPayload || ({} as PersonaProcessUsersWorkflowPayload);
|
||||
const baseUrl = requireBaseUrl(payload.baseUrl);
|
||||
const dryRun = !!payload.dryRun;
|
||||
|
||||
if (payload.userIds && payload.userIds.length > 0) {
|
||||
if (dryRun) {
|
||||
return {
|
||||
dryRun: true,
|
||||
message: `[DryRun] Would fan out ${payload.userIds.length} pre-specified users.`,
|
||||
success: true,
|
||||
targetUsers: payload.userIds.length,
|
||||
};
|
||||
}
|
||||
|
||||
await context.run('memory:persona:process-users:trigger-paginate-fanout', () =>
|
||||
MemoryExtractionWorkflowService.triggerPersonaPaginateUsers(
|
||||
{ baseUrl, userIds: payload.userIds },
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
|
||||
return { success: true, triggeredFanout: payload.userIds.length };
|
||||
}
|
||||
|
||||
// Count-only query: cheap COUNT(*) with the hourly-extraction filter.
|
||||
const executor = await MemoryExtractionExecutor.create();
|
||||
const totalEligible = await context.run('memory:persona:process-users:count-eligible-users', () =>
|
||||
executor.countUsersForHourlyExtraction(),
|
||||
);
|
||||
|
||||
if (totalEligible === 0) {
|
||||
return {
|
||||
message: 'No eligible users for persona update.',
|
||||
success: true,
|
||||
totalEligible: 0,
|
||||
};
|
||||
}
|
||||
|
||||
if (dryRun) {
|
||||
return {
|
||||
dryRun: true,
|
||||
message: `[DryRun] Would process ${totalEligible} users.`,
|
||||
success: true,
|
||||
totalEligible,
|
||||
};
|
||||
}
|
||||
|
||||
await context.run('memory:persona:process-users:trigger-paginate', () =>
|
||||
MemoryExtractionWorkflowService.triggerPersonaPaginateUsers(
|
||||
{ baseUrl },
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
|
||||
return {
|
||||
message: `Triggered paginate-users for ${totalEligible} eligible users.`,
|
||||
success: true,
|
||||
totalEligible,
|
||||
};
|
||||
};
|
||||
@@ -1,46 +0,0 @@
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
import { z } from 'zod';
|
||||
|
||||
import { getServerDB } from '@/database/server';
|
||||
import {
|
||||
buildUserPersonaJobInput,
|
||||
UserPersonaService,
|
||||
} from '@/server/services/memory/userMemory/persona/service';
|
||||
|
||||
const workflowPayloadSchema = z.object({
|
||||
userIds: z.array(z.string()).optional(),
|
||||
});
|
||||
|
||||
export const personaUpdateHandler = async (context: WorkflowContext) => {
|
||||
const payload = await context.run('memory:pipelines:persona:update-writing:parse-payload', () =>
|
||||
workflowPayloadSchema.parse(context.requestPayload || {}),
|
||||
);
|
||||
const db = await getServerDB();
|
||||
|
||||
const userIds = Array.from(new Set(payload.userIds || [])).filter(Boolean);
|
||||
if (userIds.length === 0) {
|
||||
throw new Error('No user IDs provided for persona update.');
|
||||
}
|
||||
|
||||
const service = new UserPersonaService(db);
|
||||
|
||||
await Promise.all(
|
||||
userIds.map(async (userId) =>
|
||||
context.run(`memory:pipelines:persona:update-writing:users:${userId}`, async () => {
|
||||
const jobInput = await buildUserPersonaJobInput(db, userId);
|
||||
const result = await service.composeWriting({ ...jobInput, userId });
|
||||
return {
|
||||
diffId: result.diff?.id,
|
||||
documentId: result.document.id,
|
||||
userId,
|
||||
version: result.document.version,
|
||||
};
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
return {
|
||||
message: 'User persona processed via workflow.',
|
||||
processedUsers: userIds.length,
|
||||
};
|
||||
};
|
||||
@@ -1,163 +0,0 @@
|
||||
import { SpanStatusCode } from '@lobechat/observability-otel/api';
|
||||
import {
|
||||
buildUpstashWorkflowAttributes,
|
||||
tracer as upstashWorkflowTracer,
|
||||
} from '@lobechat/observability-otel/modules/upstash-workflow';
|
||||
import { LayersEnum, MemorySourceType } from '@lobechat/types';
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
import { WorkflowAbort } from '@upstash/workflow';
|
||||
|
||||
import { AsyncTaskModel } from '@/database/models/asyncTask';
|
||||
import { getServerDB } from '@/database/server';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
|
||||
import {
|
||||
MemoryExtractionWorkflowService,
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
import { processTopicWorkflow } from './processTopic';
|
||||
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
const CEPA_LAYERS: LayersEnum[] = [
|
||||
LayersEnum.Context,
|
||||
LayersEnum.Experience,
|
||||
LayersEnum.Preference,
|
||||
LayersEnum.Activity,
|
||||
];
|
||||
const IDENTITY_LAYERS: LayersEnum[] = [LayersEnum.Identity];
|
||||
|
||||
export const processTopicsHandler = (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
|
||||
upstashWorkflowTracer.startActiveSpan(
|
||||
'workflow:memory-user-memory:process-topics',
|
||||
async (span) => {
|
||||
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
|
||||
span.setAttributes({
|
||||
...buildUpstashWorkflowAttributes(context),
|
||||
'workflow.memory_user_memory.force_all': payload.forceAll,
|
||||
'workflow.memory_user_memory.force_topics': payload.forceTopics,
|
||||
'workflow.memory_user_memory.layers': payload.layers.join(','),
|
||||
'workflow.memory_user_memory.source': payload.sources.join(','),
|
||||
'workflow.memory_user_memory.topic_count': payload.topicIds.length,
|
||||
'workflow.memory_user_memory.user_count': payload.userIds.length,
|
||||
'workflow.name': 'memory-user-memory:process-topics',
|
||||
});
|
||||
|
||||
try {
|
||||
if (!payload.userIds.length) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
|
||||
return {
|
||||
message: 'No user id provided for topic batch.',
|
||||
processedTopics: 0,
|
||||
processedUsers: 0,
|
||||
};
|
||||
}
|
||||
if (!payload.topicIds.length) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
|
||||
return {
|
||||
message: 'No topic ids provided for extraction.',
|
||||
processedTopics: 0,
|
||||
processedUsers: 0,
|
||||
};
|
||||
}
|
||||
if (!payload.sources.includes(MemorySourceType.ChatTopic)) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
|
||||
return {
|
||||
message: 'Source not supported in topic batch.',
|
||||
processedTopics: 0,
|
||||
processedUsers: 0,
|
||||
};
|
||||
}
|
||||
|
||||
const userId = payload.userIds[0];
|
||||
if (payload.asyncTaskId && userId) {
|
||||
// NOTICE: Cooperative cascading cancellation for the workflow tree.
|
||||
// If cancelled, stop before fan-out into per-topic child workflows.
|
||||
const cancelled = await context.run(
|
||||
`memory:user-memory:extract:users:${userId}:cancel-check`,
|
||||
() =>
|
||||
getServerDB().then((db) =>
|
||||
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
|
||||
payload.asyncTaskId!,
|
||||
),
|
||||
),
|
||||
);
|
||||
if (cancelled) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return {
|
||||
message: 'Memory extraction task cancellation requested, skip topic batch.',
|
||||
processedTopics: 0,
|
||||
processedUsers: 0,
|
||||
};
|
||||
}
|
||||
}
|
||||
// Delegate per-topic extraction to dedicated workflow for better isolation
|
||||
await Promise.all(
|
||||
payload.topicIds.map(async (topicId, index) => {
|
||||
await context.invoke(
|
||||
`memory:user-memory:extract:users:${userId}:topics:${topicId}:invoke:${index}`,
|
||||
{
|
||||
body: {
|
||||
...payload,
|
||||
layers: payload.layers.length
|
||||
? payload.layers
|
||||
: [...CEPA_LAYERS, ...IDENTITY_LAYERS],
|
||||
topicIds: [topicId],
|
||||
userId,
|
||||
userIds: [userId],
|
||||
},
|
||||
// CEPA: run in parallel across the batch
|
||||
//
|
||||
// NOTICE: if modified the parallelism of CEPA_LAYERS
|
||||
// or added new memory layer, make sure to update the number below.
|
||||
//
|
||||
// Currently, CEPA (context, experience, preference, activity) + identity = 5 layers.
|
||||
// and since identity requires sequential processing, we set parallelism to 5.
|
||||
flowControl: {
|
||||
key: `memory-user-memory.pipelines.chat-topic.process-topic.user.${userId}.topic.${topicId}`,
|
||||
parallelism: 5,
|
||||
},
|
||||
headers: upstashWorkflowExtraHeaders,
|
||||
workflow: processTopicWorkflow,
|
||||
},
|
||||
);
|
||||
}),
|
||||
);
|
||||
|
||||
// Trigger user persona update after topic processing using the workflow client.
|
||||
await context.run(`memory:user-memory:users:${userId}`, async () => {
|
||||
await MemoryExtractionWorkflowService.triggerPersonaUpdate(userId, payload.baseUrl, {
|
||||
extraHeaders: upstashWorkflowExtraHeaders,
|
||||
});
|
||||
});
|
||||
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
|
||||
return {
|
||||
processedTopics: payload.topicIds.length,
|
||||
processedUsers: payload.userIds.length,
|
||||
};
|
||||
} catch (error) {
|
||||
// NOTICE: Let WorkflowAbort bubble up (used internally by Upstash); record others
|
||||
if (error instanceof WorkflowAbort) {
|
||||
console.warn('workflow aborted:', error.message);
|
||||
throw error;
|
||||
}
|
||||
|
||||
span.recordException(error as Error);
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: error instanceof Error ? error.message : 'process-topics workflow failed',
|
||||
});
|
||||
|
||||
throw error;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
},
|
||||
);
|
||||
@@ -1,159 +0,0 @@
|
||||
import { MemorySourceType } from '@lobechat/types';
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
|
||||
import { AsyncTaskModel } from '@/database/models/asyncTask';
|
||||
import { type ListTopicsForMemoryExtractorCursor } from '@/database/models/topic';
|
||||
import { getServerDB } from '@/database/server';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
|
||||
import {
|
||||
buildWorkflowPayloadInput,
|
||||
MemoryExtractionExecutor,
|
||||
MemoryExtractionWorkflowService,
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
import { forEachBatchSequential } from '@/server/services/memory/userMemory/topicBatching';
|
||||
|
||||
const TOPIC_PAGE_SIZE = 50;
|
||||
const TOPIC_BATCH_SIZE = 4;
|
||||
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
export const processUserTopicsHandler = async (
|
||||
context: WorkflowContext<MemoryExtractionPayloadInput>,
|
||||
) => {
|
||||
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
if (!params.userIds.length) {
|
||||
return { message: 'No user ids provided for topic processing.' };
|
||||
}
|
||||
if (!params.sources.includes(MemorySourceType.ChatTopic)) {
|
||||
return { message: 'No supported sources requested, skip topic processing.' };
|
||||
}
|
||||
|
||||
const executor = await MemoryExtractionExecutor.create();
|
||||
|
||||
const scheduleNextPage = async (userId: string, cursorCreatedAt: Date, cursorId: string) => {
|
||||
await MemoryExtractionWorkflowService.triggerProcessUserTopics(
|
||||
{
|
||||
...buildWorkflowPayloadInput({
|
||||
...params,
|
||||
topicCursor: {
|
||||
createdAt: cursorCreatedAt.toISOString(),
|
||||
id: cursorId,
|
||||
userId,
|
||||
},
|
||||
topicIds: [],
|
||||
userId,
|
||||
userIds: [userId],
|
||||
}),
|
||||
},
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
);
|
||||
};
|
||||
|
||||
for (const userId of params.userIds) {
|
||||
if (params.asyncTaskId) {
|
||||
// NOTICE: Cooperative cascading cancellation for the workflow tree.
|
||||
// A cancelled root task should stop at user-topic pagination and avoid enqueuing topic batches.
|
||||
const cancelled = await context.run(
|
||||
`memory:user-memory:extract:users:${userId}:cancel-check`,
|
||||
() =>
|
||||
getServerDB().then((db) =>
|
||||
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
|
||||
params.asyncTaskId!,
|
||||
),
|
||||
),
|
||||
);
|
||||
if (cancelled) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const topicCursor =
|
||||
params.topicCursor && params.topicCursor.userId === userId
|
||||
? {
|
||||
createdAt: new Date(params.topicCursor.createdAt),
|
||||
id: params.topicCursor.id,
|
||||
}
|
||||
: undefined;
|
||||
|
||||
const topicsFromPayload =
|
||||
params.topicIds && params.topicIds.length > 0
|
||||
? await context.run(
|
||||
`memory:user-memory:extract:users:${userId}:filter-topic-ids`,
|
||||
async () => {
|
||||
const filtered = await executor.filterTopicIdsForUser(userId, params.topicIds);
|
||||
return filtered.length > 0 ? filtered : undefined;
|
||||
},
|
||||
)
|
||||
: undefined;
|
||||
|
||||
const topicBatch = await context.run<{
|
||||
cursor?: ListTopicsForMemoryExtractorCursor;
|
||||
ids: string[];
|
||||
}>(`memory:user-memory:extract:users:${userId}:list-topics:${topicCursor?.id || 'root'}`, () =>
|
||||
topicsFromPayload && topicsFromPayload.length > 0
|
||||
? Promise.resolve({ ids: topicsFromPayload })
|
||||
: executor.getTopicsForUser(
|
||||
{
|
||||
cursor: topicCursor,
|
||||
forceAll: params.forceAll,
|
||||
forceTopics: params.forceTopics,
|
||||
from: params.from,
|
||||
to: params.to,
|
||||
userId,
|
||||
},
|
||||
TOPIC_PAGE_SIZE,
|
||||
),
|
||||
);
|
||||
|
||||
const ids = topicBatch.ids;
|
||||
if (!ids.length) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const cursor = 'cursor' in topicBatch ? topicBatch.cursor : undefined;
|
||||
|
||||
// TODO: follow the new pattern of process-topic
|
||||
// remove the batch sequential, replace it with context.invoke(...) pattern
|
||||
await forEachBatchSequential(ids, TOPIC_BATCH_SIZE, async (topicIds, batchIndex) => {
|
||||
// NOTICE: We trigger via QStash instead of context.invoke because invoke only swaps the last path
|
||||
// segment with the workflowId. If we invoked directly from /process-user-topics, child workflow
|
||||
// URLs would inherit that base and lose the desired /process-topics/workflows prefix.
|
||||
await context.run(
|
||||
`memory:user-memory:extract:users:${userId}:process-topics-batch:${batchIndex}`,
|
||||
() =>
|
||||
MemoryExtractionWorkflowService.triggerProcessTopics(
|
||||
userId,
|
||||
{
|
||||
...buildWorkflowPayloadInput(params),
|
||||
topicCursor: undefined,
|
||||
topicIds,
|
||||
userId,
|
||||
userIds: [userId],
|
||||
},
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
if (!topicsFromPayload && cursor) {
|
||||
await context.run(
|
||||
`memory:user-memory:extract:users:${userId}:topics:${cursor.id}:schedule-next-batch`,
|
||||
() => {
|
||||
// NOTICE: Upstash Workflow only supports serializable data into plain JSON,
|
||||
// this causes the Date object to be converted into string when passed as parameter from
|
||||
// context to child workflow. So we need to convert it back to Date object here.
|
||||
const createdAt = new Date(cursor.createdAt);
|
||||
if (Number.isNaN(createdAt.getTime())) {
|
||||
throw new Error('Invalid cursor date when scheduling next topic page');
|
||||
}
|
||||
|
||||
return scheduleNextPage(userId, createdAt, cursor.id);
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return { processedUsers: params.userIds.length };
|
||||
};
|
||||
@@ -1,100 +0,0 @@
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
import { chunk } from 'es-toolkit/compat';
|
||||
|
||||
import { AsyncTaskModel } from '@/database/models/asyncTask';
|
||||
import { getServerDB } from '@/database/server';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
|
||||
import {
|
||||
buildWorkflowPayloadInput,
|
||||
MemoryExtractionExecutor,
|
||||
MemoryExtractionWorkflowService,
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
const USER_PAGE_SIZE = 50;
|
||||
const USER_BATCH_SIZE = 10;
|
||||
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
export const processUsersHandler = async (
|
||||
context: WorkflowContext<MemoryExtractionPayloadInput>,
|
||||
) => {
|
||||
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
if (params.sources.length === 0) {
|
||||
return { message: 'No sources provided, skip memory extraction.' };
|
||||
}
|
||||
if (params.asyncTaskId && params.userIds[0]) {
|
||||
// NOTICE: Cooperative cascading cancellation for the workflow tree.
|
||||
// If root task has cancelRequestedAt, this stage stops scheduling child workflows.
|
||||
const cancelled = await context.run('memory:user-memory:extract:cancel-check:root', () =>
|
||||
getServerDB().then((db) =>
|
||||
new AsyncTaskModel(db, params.userIds[0]!).isUserMemoryExtractionCancellationRequested(
|
||||
params.asyncTaskId!,
|
||||
),
|
||||
),
|
||||
);
|
||||
if (cancelled) {
|
||||
return { message: 'Memory extraction task cancellation requested, skip processing users.' };
|
||||
}
|
||||
}
|
||||
|
||||
const executor = await MemoryExtractionExecutor.create();
|
||||
|
||||
// NOTICE: Upstash Workflow only supports serializable data into plain JSON,
|
||||
// this causes the Date object to be converted into string when passed as parameter from
|
||||
// context to child workflow. So we need to convert it back to Date object here.
|
||||
const userCursor = params.userCursor
|
||||
? { createdAt: new Date(params.userCursor.createdAt), id: params.userCursor.id }
|
||||
: undefined;
|
||||
|
||||
const userBatch = await context.run('memory:user-memory:extract:get-users', () =>
|
||||
params.userIds.length > 0
|
||||
? { ids: params.userIds }
|
||||
: executor.getUsers(USER_PAGE_SIZE, userCursor),
|
||||
);
|
||||
|
||||
const ids = userBatch.ids;
|
||||
if (ids.length === 0) {
|
||||
return { message: 'No users to process for memory extraction.' };
|
||||
}
|
||||
|
||||
const cursor = 'cursor' in userBatch ? userBatch.cursor : undefined;
|
||||
|
||||
const batches = chunk(ids, USER_BATCH_SIZE);
|
||||
await Promise.all(
|
||||
batches.map((userIds) =>
|
||||
context.run(`memory:user-memory:extract:users:process-topic-batches`, () =>
|
||||
MemoryExtractionWorkflowService.triggerProcessUserTopics(
|
||||
{
|
||||
...buildWorkflowPayloadInput(params),
|
||||
topicCursor: undefined,
|
||||
userId: userIds[0],
|
||||
userIds,
|
||||
},
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
if (params.userIds.length === 0 && cursor) {
|
||||
await context.run('memory:user-memory:extract:users:schedule-next-user-batch', () =>
|
||||
MemoryExtractionWorkflowService.triggerProcessUsers(
|
||||
{
|
||||
...buildWorkflowPayloadInput({
|
||||
...params,
|
||||
userCursor: { createdAt: cursor.createdAt.toISOString(), id: cursor.id },
|
||||
}),
|
||||
},
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
batches: batches.length,
|
||||
nextCursor: cursor ? cursor.id : null,
|
||||
processedUsers: ids.length,
|
||||
};
|
||||
};
|
||||
@@ -0,0 +1,194 @@
|
||||
import { SpanStatusCode } from '@lobechat/observability-otel/api';
|
||||
import {
|
||||
buildUpstashWorkflowAttributes,
|
||||
tracer as upstashWorkflowTracer,
|
||||
} from '@lobechat/observability-otel/modules/upstash-workflow';
|
||||
import { LayersEnum, MemorySourceType } from '@lobechat/types';
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
import { WorkflowAbort } from '@upstash/workflow';
|
||||
|
||||
import { AsyncTaskModel } from '@/database/models/asyncTask';
|
||||
import { getServerDB } from '@/database/server';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
|
||||
import {
|
||||
MemoryExtractionExecutor,
|
||||
MemoryExtractionWorkflowService,
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
import { extractTopicWorkflow } from './extractTopic';
|
||||
|
||||
const TOPIC_PAGE_SIZE = 200;
|
||||
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
const CEPA_LAYERS: LayersEnum[] = [
|
||||
LayersEnum.Context,
|
||||
LayersEnum.Experience,
|
||||
LayersEnum.Preference,
|
||||
LayersEnum.Activity,
|
||||
];
|
||||
const IDENTITY_LAYERS: LayersEnum[] = [LayersEnum.Identity];
|
||||
|
||||
/**
|
||||
* L3: Process topic extraction for ONE user.
|
||||
*
|
||||
* - Load user's eligible topics (respecting forceAll/forceTopics filters) in a single pass.
|
||||
* - Fan out per-topic via `context.invoke(extractTopicWorkflow)` — Upstash rewrites the URL last
|
||||
* segment to the workflowId (`extract-topic`), which is mounted at /topics/extract-topic.
|
||||
* - After topic extraction, schedule a persona update for this user (decoupled via persona pipeline).
|
||||
*/
|
||||
export const executeUserHandler = (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
|
||||
upstashWorkflowTracer.startActiveSpan(
|
||||
'workflow:memory-user-memory:topics:execute-user',
|
||||
async (span) => {
|
||||
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
const userId = payload.userId || payload.userIds[0];
|
||||
|
||||
span.setAttributes({
|
||||
...buildUpstashWorkflowAttributes(context),
|
||||
'workflow.memory_user_memory.force_all': payload.forceAll,
|
||||
'workflow.memory_user_memory.force_topics': payload.forceTopics,
|
||||
'workflow.memory_user_memory.layers': payload.layers.join(','),
|
||||
'workflow.memory_user_memory.source': payload.sources.join(','),
|
||||
'workflow.memory_user_memory.user_id': userId,
|
||||
'workflow.name': 'memory-user-memory:topics:execute-user',
|
||||
});
|
||||
|
||||
if (!userId) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { message: 'Missing userId for execute-user workflow.' };
|
||||
}
|
||||
|
||||
if (!payload.sources.includes(MemorySourceType.ChatTopic)) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { message: 'Source not supported in execute-user workflow.' };
|
||||
}
|
||||
|
||||
try {
|
||||
// Early cancel check before any DB work.
|
||||
if (payload.asyncTaskId) {
|
||||
const cancelled = await context.run(
|
||||
`memory:topics:execute-user:${userId}:cancel-check:before`,
|
||||
() =>
|
||||
getServerDB().then((db) =>
|
||||
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
|
||||
payload.asyncTaskId!,
|
||||
),
|
||||
),
|
||||
);
|
||||
if (cancelled) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { cancelled: true, message: 'Cancelled before topic listing.' };
|
||||
}
|
||||
}
|
||||
|
||||
const executor = await MemoryExtractionExecutor.create();
|
||||
|
||||
// Either use explicitly-provided topicIds (filtered for ownership) or list all eligible topics.
|
||||
const topicIds = await context.run(
|
||||
`memory:topics:execute-user:${userId}:list-topics`,
|
||||
() =>
|
||||
payload.topicIds.length > 0
|
||||
? executor.filterTopicIdsForUser(userId, payload.topicIds)
|
||||
: executor
|
||||
.getTopicsForUser(
|
||||
{
|
||||
forceAll: payload.forceAll,
|
||||
forceTopics: payload.forceTopics,
|
||||
from: payload.from,
|
||||
to: payload.to,
|
||||
userId,
|
||||
},
|
||||
TOPIC_PAGE_SIZE,
|
||||
)
|
||||
.then((res) => res.ids),
|
||||
);
|
||||
|
||||
if (!topicIds.length) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return {
|
||||
message: 'No eligible topics for user.',
|
||||
processedTopics: 0,
|
||||
userId,
|
||||
};
|
||||
}
|
||||
|
||||
// Cancel check before fan-out so cancelled tasks stop at the earliest safe boundary.
|
||||
if (payload.asyncTaskId) {
|
||||
const cancelled = await context.run(
|
||||
`memory:topics:execute-user:${userId}:cancel-check:fanout`,
|
||||
() =>
|
||||
getServerDB().then((db) =>
|
||||
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
|
||||
payload.asyncTaskId!,
|
||||
),
|
||||
),
|
||||
);
|
||||
if (cancelled) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { cancelled: true, message: 'Cancelled before topic fan-out.' };
|
||||
}
|
||||
}
|
||||
|
||||
// Fan out per topic via context.invoke. Each invocation runs as a standalone extract-topic workflow.
|
||||
await Promise.all(
|
||||
topicIds.map((topicId, index) =>
|
||||
context.invoke(`memory:topics:execute-user:${userId}:invoke:${topicId}:${index}`, {
|
||||
body: {
|
||||
asyncTaskId: payload.asyncTaskId,
|
||||
baseUrl: payload.baseUrl,
|
||||
forceAll: payload.forceAll,
|
||||
forceTopics: payload.forceTopics,
|
||||
layers: payload.layers.length
|
||||
? payload.layers
|
||||
: [...CEPA_LAYERS, ...IDENTITY_LAYERS],
|
||||
sources: payload.sources,
|
||||
topicIds: [topicId],
|
||||
userId,
|
||||
userIds: [userId],
|
||||
userInitiated: payload.userInitiated,
|
||||
},
|
||||
// CEPA (4 layers) + identity (1, sequential) → 5. Parallelism matches layer count.
|
||||
flowControl: {
|
||||
key: `memory-user-memory.topics.extract-topic.user.${userId}.topic.${topicId}`,
|
||||
parallelism: 5,
|
||||
},
|
||||
headers: upstashWorkflowExtraHeaders,
|
||||
workflow: extractTopicWorkflow,
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
// Decoupled persona update — no longer tail-coupled to topic chain. Fires even when
|
||||
// this user had no new topics (handled by the persona pipeline's own eligibility logic).
|
||||
await context.run(`memory:topics:execute-user:${userId}:trigger-persona`, async () => {
|
||||
await MemoryExtractionWorkflowService.triggerPersonaExecuteUser(userId, payload.baseUrl, {
|
||||
extraHeaders: upstashWorkflowExtraHeaders,
|
||||
});
|
||||
});
|
||||
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return {
|
||||
processedTopics: topicIds.length,
|
||||
userId,
|
||||
};
|
||||
} catch (error) {
|
||||
if (error instanceof WorkflowAbort) {
|
||||
console.warn('workflow aborted:', error.message);
|
||||
throw error;
|
||||
}
|
||||
|
||||
span.recordException(error as Error);
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: error instanceof Error ? error.message : 'execute-user workflow failed',
|
||||
});
|
||||
|
||||
throw error;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
},
|
||||
);
|
||||
+43
-46
@@ -17,7 +17,7 @@ import {
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
import { createWorkflowQstashClient } from '../qstashClient';
|
||||
import { createWorkflowQstashClient } from '../../qstashClient';
|
||||
|
||||
const CEPA_LAYERS: LayersEnum[] = [
|
||||
LayersEnum.Context,
|
||||
@@ -28,9 +28,9 @@ const CEPA_LAYERS: LayersEnum[] = [
|
||||
|
||||
const IDENTITY_LAYERS: LayersEnum[] = [LayersEnum.Identity];
|
||||
|
||||
const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
|
||||
const extractTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
|
||||
upstashWorkflowTracer.startActiveSpan(
|
||||
'workflow:memory-user-memory:process-topic',
|
||||
'workflow:memory-user-memory:topics:extract-topic',
|
||||
async (span) => {
|
||||
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
|
||||
@@ -40,7 +40,7 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
|
||||
'workflow.memory_user_memory.source': payload.sources.join(','),
|
||||
'workflow.memory_user_memory.topic_id': payload.topicIds[0],
|
||||
'workflow.memory_user_memory.user_id': payload.userIds[0],
|
||||
'workflow.name': 'memory-user-memory:process-topic',
|
||||
'workflow.name': 'memory-user-memory:topics:extract-topic',
|
||||
});
|
||||
|
||||
const topicId = payload.topicIds[0];
|
||||
@@ -48,12 +48,12 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
|
||||
|
||||
if (!userId || !topicId) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { message: 'Missing userId or topicId for topic workflow.' };
|
||||
return { message: 'Missing userId or topicId for extract-topic workflow.' };
|
||||
}
|
||||
|
||||
if (!payload.sources.includes(MemorySourceType.ChatTopic)) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { message: 'Source not supported in topic workflow.' };
|
||||
return { message: 'Source not supported in extract-topic workflow.' };
|
||||
}
|
||||
|
||||
const executor = await MemoryExtractionExecutor.create();
|
||||
@@ -63,7 +63,7 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
|
||||
// NOTICE: Cooperative cascading cancellation for the workflow tree.
|
||||
// Check before CEPA extraction so cancelled tasks stop at the earliest safe boundary.
|
||||
const cancelled = await context.run(
|
||||
`memory:user-memory:extract:users:${userId}:topics:${topicId}:cancel-check:before`,
|
||||
`memory:topics:extract-topic:${userId}:${topicId}:cancel-check:before`,
|
||||
() =>
|
||||
getServerDB().then((db) =>
|
||||
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
|
||||
@@ -83,21 +83,19 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
|
||||
layers = payload.layers.filter((layer) => CEPA_LAYERS.includes(layer));
|
||||
}
|
||||
|
||||
await context.run(
|
||||
`memory:user-memory:extract:users:${userId}:topics:${topicId}:cepa`,
|
||||
() =>
|
||||
executor.extractTopic({
|
||||
asyncTaskId: payload.asyncTaskId,
|
||||
forceAll: payload.forceAll,
|
||||
forceTopics: payload.forceTopics,
|
||||
from: payload.from,
|
||||
layers,
|
||||
source: MemorySourceType.ChatTopic,
|
||||
to: payload.to,
|
||||
topicId,
|
||||
userId,
|
||||
userInitiated: payload.userInitiated,
|
||||
}),
|
||||
await context.run(`memory:topics:extract-topic:${userId}:${topicId}:cepa`, () =>
|
||||
executor.extractTopic({
|
||||
asyncTaskId: payload.asyncTaskId,
|
||||
forceAll: payload.forceAll,
|
||||
forceTopics: payload.forceTopics,
|
||||
from: payload.from,
|
||||
layers,
|
||||
source: MemorySourceType.ChatTopic,
|
||||
to: payload.to,
|
||||
topicId,
|
||||
userId,
|
||||
userInitiated: payload.userInitiated,
|
||||
}),
|
||||
);
|
||||
}
|
||||
{
|
||||
@@ -105,7 +103,7 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
|
||||
// NOTICE: Cooperative cascading cancellation for the workflow tree.
|
||||
// Re-check before identity extraction to avoid running sequential identity step after cancel.
|
||||
const cancelled = await context.run(
|
||||
`memory:user-memory:extract:users:${userId}:topics:${topicId}:cancel-check:identity`,
|
||||
`memory:topics:extract-topic:${userId}:${topicId}:cancel-check:identity`,
|
||||
() =>
|
||||
getServerDB().then((db) =>
|
||||
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
|
||||
@@ -126,21 +124,19 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
|
||||
layers = payload.layers.filter((layer) => IDENTITY_LAYERS.includes(layer));
|
||||
}
|
||||
|
||||
await context.run(
|
||||
`memory:user-memory:extract:users:${userId}:topics:${topicId}:identity`,
|
||||
() =>
|
||||
executor.extractTopic({
|
||||
asyncTaskId: payload.asyncTaskId,
|
||||
forceAll: payload.forceAll,
|
||||
forceTopics: payload.forceTopics,
|
||||
from: payload.from,
|
||||
layers,
|
||||
source: MemorySourceType.ChatTopic,
|
||||
to: payload.to,
|
||||
topicId,
|
||||
userId,
|
||||
userInitiated: payload.userInitiated,
|
||||
}),
|
||||
await context.run(`memory:topics:extract-topic:${userId}:${topicId}:identity`, () =>
|
||||
executor.extractTopic({
|
||||
asyncTaskId: payload.asyncTaskId,
|
||||
forceAll: payload.forceAll,
|
||||
forceTopics: payload.forceTopics,
|
||||
from: payload.from,
|
||||
layers,
|
||||
source: MemorySourceType.ChatTopic,
|
||||
to: payload.to,
|
||||
topicId,
|
||||
userId,
|
||||
userInitiated: payload.userInitiated,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -158,12 +154,11 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
|
||||
span.recordException(error as Error);
|
||||
span.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: errorMessageFrom(error) || 'process-topic workflow failed',
|
||||
message: errorMessageFrom(error) || 'extract-topic workflow failed',
|
||||
});
|
||||
|
||||
// Avoid infinite retries on non-retry-able errors
|
||||
throw new WorkflowNonRetryableError(
|
||||
errorMessageFrom(error) || 'process-topic workflow failed',
|
||||
errorMessageFrom(error) || 'extract-topic workflow failed',
|
||||
);
|
||||
} finally {
|
||||
span.end();
|
||||
@@ -171,8 +166,8 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
|
||||
},
|
||||
);
|
||||
|
||||
export const processTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput, unknown>(
|
||||
processTopicRoute,
|
||||
export const extractTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput, unknown>(
|
||||
extractTopicRoute,
|
||||
{
|
||||
failureFunction: async ({ context, failStatus, failResponse }) => {
|
||||
try {
|
||||
@@ -189,7 +184,7 @@ export const processTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput,
|
||||
|
||||
await asyncTaskModel.incrementUserMemoryExtractionProgress(payload.asyncTaskId);
|
||||
console.error(
|
||||
`[process-topic][failureFunction] marking async task as failed for user ${userId}, topic ${topicId}`,
|
||||
`[extract-topic][failureFunction] marking async task as failed for user ${userId}, topic ${topicId}`,
|
||||
{
|
||||
failResponse,
|
||||
failStatus,
|
||||
@@ -198,7 +193,7 @@ export const processTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput,
|
||||
|
||||
return 'async-task-updated';
|
||||
} catch (error) {
|
||||
console.error('[process-topic][failureFunction] failed to record async task error', error);
|
||||
console.error('[extract-topic][failureFunction] failed to record async task error', error);
|
||||
return 'async-task-update-failed';
|
||||
}
|
||||
},
|
||||
@@ -206,4 +201,6 @@ export const processTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput,
|
||||
},
|
||||
);
|
||||
|
||||
processTopicWorkflow.workflowId = 'process-topic';
|
||||
// NOTICE: workflowId must match the last path segment of WORKFLOW_PATHS.topicsExtractTopic,
|
||||
// because Upstash `context.invoke` rewrites the URL last segment to the target workflowId.
|
||||
extractTopicWorkflow.workflowId = 'extract-topic';
|
||||
@@ -0,0 +1,177 @@
|
||||
import { SpanStatusCode } from '@lobechat/observability-otel/api';
|
||||
import {
|
||||
buildUpstashWorkflowAttributes,
|
||||
tracer as upstashWorkflowTracer,
|
||||
} from '@lobechat/observability-otel/modules/upstash-workflow';
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
import { chunk } from 'es-toolkit/compat';
|
||||
|
||||
import { AsyncTaskModel } from '@/database/models/asyncTask';
|
||||
import { getServerDB } from '@/database/server';
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
|
||||
import {
|
||||
buildWorkflowPayloadInput,
|
||||
MemoryExtractionExecutor,
|
||||
MemoryExtractionWorkflowService,
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
const USER_PAGE_SIZE = 50;
|
||||
const CHUNK_SIZE = 20;
|
||||
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
/**
|
||||
* L2: Paginate users + fan-out.
|
||||
*
|
||||
* - If `userIds` provided (fan-out chunk from a prior call), trigger execute-user for each and stop.
|
||||
* - Else paginate via cursor (PAGE_SIZE=50); if batch > CHUNK_SIZE=20, split into chunks and
|
||||
* recursively re-trigger this L2 with each chunk. Otherwise trigger execute-user directly.
|
||||
* - Always schedule the next page if a cursor exists.
|
||||
*/
|
||||
export const paginateUsersHandler = (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
|
||||
upstashWorkflowTracer.startActiveSpan(
|
||||
'workflow:memory-user-memory:topics:paginate-users',
|
||||
async (span) => {
|
||||
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
|
||||
span.setAttributes({
|
||||
...buildUpstashWorkflowAttributes(context),
|
||||
'workflow.memory_user_memory.payload_user_count': payload.userIds.length,
|
||||
'workflow.name': 'memory-user-memory:topics:paginate-users',
|
||||
});
|
||||
|
||||
try {
|
||||
// Fan-out chunk path: specific userIds were provided by an upstream paginate call.
|
||||
if (payload.userIds.length > 0 && !payload.userCursor) {
|
||||
await Promise.all(
|
||||
payload.userIds.map((userId) =>
|
||||
context.run(`memory:topics:paginate-users:fanout:execute:${userId}`, () =>
|
||||
MemoryExtractionWorkflowService.triggerTopicsExecuteUser(
|
||||
userId,
|
||||
buildWorkflowPayloadInput({
|
||||
...payload,
|
||||
topicCursor: undefined,
|
||||
topicIds: [],
|
||||
userCursor: undefined,
|
||||
userId,
|
||||
userIds: [userId],
|
||||
}),
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { fannedOut: payload.userIds.length };
|
||||
}
|
||||
|
||||
// Cursor pagination path.
|
||||
const userCursor = payload.userCursor
|
||||
? { createdAt: new Date(payload.userCursor.createdAt), id: payload.userCursor.id }
|
||||
: undefined;
|
||||
if (userCursor && Number.isNaN(userCursor.createdAt.getTime())) {
|
||||
throw new Error('Invalid userCursor.createdAt for topics paginate-users');
|
||||
}
|
||||
|
||||
const executor = await MemoryExtractionExecutor.create();
|
||||
|
||||
// Root-level cancel check (only when asyncTaskId + at least one userId known).
|
||||
if (payload.asyncTaskId && payload.userIds[0]) {
|
||||
const cancelled = await context.run(
|
||||
'memory:topics:paginate-users:cancel-check:root',
|
||||
() =>
|
||||
getServerDB().then((db) =>
|
||||
new AsyncTaskModel(
|
||||
db,
|
||||
payload.userIds[0]!,
|
||||
).isUserMemoryExtractionCancellationRequested(payload.asyncTaskId!),
|
||||
),
|
||||
);
|
||||
if (cancelled) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { cancelled: true, message: 'Cancelled at paginate-users root.' };
|
||||
}
|
||||
}
|
||||
|
||||
const batch = await context.run(
|
||||
`memory:topics:paginate-users:list:${userCursor?.id || 'root'}`,
|
||||
() => executor.getUsersForHourlyExtraction(USER_PAGE_SIZE, userCursor),
|
||||
);
|
||||
|
||||
const userIds = batch.ids;
|
||||
const nextCursor = 'cursor' in batch ? batch.cursor : undefined;
|
||||
|
||||
if (userIds.length === 0) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { message: 'No users in page, pagination complete.' };
|
||||
}
|
||||
|
||||
// Fan-out if batch exceeds CHUNK_SIZE; else trigger execute-user directly.
|
||||
if (userIds.length > CHUNK_SIZE) {
|
||||
const chunks = chunk(userIds, CHUNK_SIZE);
|
||||
await Promise.all(
|
||||
chunks.map((chunkIds, idx) =>
|
||||
context.run(`memory:topics:paginate-users:fanout:${idx + 1}/${chunks.length}`, () =>
|
||||
MemoryExtractionWorkflowService.triggerTopicsPaginateUsers(
|
||||
buildWorkflowPayloadInput({
|
||||
...payload,
|
||||
userCursor: undefined,
|
||||
userId: chunkIds[0],
|
||||
userIds: chunkIds,
|
||||
}),
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
} else {
|
||||
await Promise.all(
|
||||
userIds.map((userId) =>
|
||||
context.run(`memory:topics:paginate-users:execute:${userId}`, () =>
|
||||
MemoryExtractionWorkflowService.triggerTopicsExecuteUser(
|
||||
userId,
|
||||
buildWorkflowPayloadInput({
|
||||
...payload,
|
||||
topicCursor: undefined,
|
||||
topicIds: [],
|
||||
userCursor: undefined,
|
||||
userId,
|
||||
userIds: [userId],
|
||||
}),
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
// Schedule next page.
|
||||
if (nextCursor) {
|
||||
await context.run('memory:topics:paginate-users:schedule-next-page', () =>
|
||||
MemoryExtractionWorkflowService.triggerTopicsPaginateUsers(
|
||||
buildWorkflowPayloadInput({
|
||||
...payload,
|
||||
userCursor: {
|
||||
createdAt: nextCursor.createdAt.toISOString(),
|
||||
id: nextCursor.id,
|
||||
},
|
||||
userIds: [],
|
||||
}),
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return {
|
||||
nextCursor: nextCursor ? nextCursor.id : null,
|
||||
processedUsers: userIds.length,
|
||||
};
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
},
|
||||
);
|
||||
@@ -0,0 +1,121 @@
|
||||
import { SpanStatusCode } from '@lobechat/observability-otel/api';
|
||||
import {
|
||||
buildUpstashWorkflowAttributes,
|
||||
tracer as upstashWorkflowTracer,
|
||||
} from '@lobechat/observability-otel/modules/upstash-workflow';
|
||||
import { MemorySourceType } from '@lobechat/types';
|
||||
import { type WorkflowContext } from '@upstash/workflow';
|
||||
|
||||
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
|
||||
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
|
||||
import {
|
||||
buildWorkflowPayloadInput,
|
||||
MemoryExtractionExecutor,
|
||||
MemoryExtractionWorkflowService,
|
||||
normalizeMemoryExtractionPayload,
|
||||
} from '@/server/services/memory/userMemory/extract';
|
||||
|
||||
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
|
||||
|
||||
/**
|
||||
* L1: Entry for the topics extraction pipeline.
|
||||
*
|
||||
* - If `userIds` in payload, skip the eligibility query and fan out directly via L2.
|
||||
* - Else materialise the full eligible user list in one step so dry-run can report the exact total.
|
||||
* - When not dry-run: trigger L2 (paginate-users) to walk all users via cursor pagination.
|
||||
*/
|
||||
export const processUsersHandler = (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
|
||||
upstashWorkflowTracer.startActiveSpan(
|
||||
'workflow:memory-user-memory:topics:process-users',
|
||||
async (span) => {
|
||||
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
|
||||
const dryRun = !!(context.requestPayload as { dryRun?: boolean } | null)?.dryRun;
|
||||
|
||||
span.setAttributes({
|
||||
...buildUpstashWorkflowAttributes(context),
|
||||
'workflow.memory_user_memory.dry_run': dryRun,
|
||||
'workflow.memory_user_memory.payload_user_count': payload.userIds.length,
|
||||
'workflow.name': 'memory-user-memory:topics:process-users',
|
||||
});
|
||||
|
||||
// Ensure source defaults to ChatTopic when caller omitted it — topics pipeline is chat-topic-only.
|
||||
const sources = payload.sources.length ? payload.sources : [MemorySourceType.ChatTopic];
|
||||
if (!sources.includes(MemorySourceType.ChatTopic)) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return {
|
||||
message: 'No supported sources requested, skip topics process-users.',
|
||||
success: true,
|
||||
};
|
||||
}
|
||||
|
||||
// Explicit target userIds path: skip the eligibility query and go straight to fan-out via L2.
|
||||
if (payload.userIds.length > 0) {
|
||||
if (dryRun) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return {
|
||||
dryRun: true,
|
||||
message: `[DryRun] Would fan out ${payload.userIds.length} pre-specified users.`,
|
||||
success: true,
|
||||
targetUsers: payload.userIds.length,
|
||||
};
|
||||
}
|
||||
|
||||
await context.run('memory:topics:process-users:trigger-paginate-fanout', () =>
|
||||
MemoryExtractionWorkflowService.triggerTopicsPaginateUsers(
|
||||
buildWorkflowPayloadInput({ ...payload, sources, userCursor: undefined }),
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return { success: true, triggeredFanout: payload.userIds.length };
|
||||
}
|
||||
|
||||
// Count-only query: cheap COUNT(*) with the hourly-extraction filter (memory-enabled + has
|
||||
// at least one user message). Gives an exact total for dry-run.
|
||||
const executor = await MemoryExtractionExecutor.create();
|
||||
const totalEligible = await context.run(
|
||||
'memory:topics:process-users:count-eligible-users',
|
||||
() => executor.countUsersForHourlyExtraction(),
|
||||
);
|
||||
|
||||
if (totalEligible === 0) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return {
|
||||
message: 'No eligible users for topics extraction.',
|
||||
success: true,
|
||||
totalEligible: 0,
|
||||
};
|
||||
}
|
||||
|
||||
if (dryRun) {
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return {
|
||||
dryRun: true,
|
||||
message: `[DryRun] Would process ${totalEligible} users.`,
|
||||
success: true,
|
||||
totalEligible,
|
||||
};
|
||||
}
|
||||
|
||||
// Trigger L2 to walk all users via cursor pagination.
|
||||
await context.run('memory:topics:process-users:trigger-paginate', () =>
|
||||
MemoryExtractionWorkflowService.triggerTopicsPaginateUsers(
|
||||
buildWorkflowPayloadInput({
|
||||
...payload,
|
||||
sources,
|
||||
userCursor: undefined,
|
||||
userIds: [],
|
||||
}),
|
||||
{ extraHeaders: upstashWorkflowExtraHeaders },
|
||||
),
|
||||
);
|
||||
|
||||
span.setStatus({ code: SpanStatusCode.OK });
|
||||
return {
|
||||
message: `Triggered paginate-users for ${totalEligible} eligible users.`,
|
||||
success: true,
|
||||
totalEligible,
|
||||
};
|
||||
},
|
||||
);
|
||||
Reference in New Issue
Block a user