Compare commits

...

3 Commits

Author SHA1 Message Date
Arvin Xu 78ef131cb7 ♻️ refactor: report exact eligible count in memory workflow dry-run
Replace the cap-limited list sample in topics/persona process-users L1 with a dedicated `COUNT(*)` query (`UserModel.countUsersForHourlyMemoryExtractor`). Dry-run now reports the real total instead of "at least N".

Optimise the count SQL via a CTE on `messages → topics` before joining `users + user_settings` — ~25× faster than the per-row EXISTS variant (5s vs 118s on ~200K users locally). Align topics/persona L2 paginate to use the same hourly-extraction filter so the real-run user set matches the dry-run total.

Refs LOBE-4968

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 01:06:45 +08:00
Arvin Xu 9ce61d41f7 📝 docs: add upstash-workflow-testing skill
Local testing guide for Upstash Workflow endpoints via the QStash dev server — covers verifying handlers end-to-end, inspecting step-level logs, triggering dry-runs from curl.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 17:42:01 +08:00
Arvin Xu 4d138539ca ♻️ refactor: reshape memory-user-memory into 3-layer workflow
Split the monolithic `process-users → process-user-topics → process-topics → process-topic` chain into two independent standard 3-layer pipelines:

- **Topics**: `topics/process-users` (L1 dry-run) → `topics/paginate-users` (L2 cursor + CHUNK_SIZE=20 fan-out) → `topics/execute-user` (L3 one user) → `topics/extract-topic` (per-topic sub-workflow via context.invoke)
- **Persona**: `persona/process-users` → `persona/paginate-users` → `persona/execute-user` (one user composeWriting)

External entry point: `cron/hourly` dispatches to both L1 in parallel. Decouples persona update from the topic extraction tail (persona now runs even when the user has no new topics). Replaces custom `forEachBatchSequential` with proper fan-out, completes the existing TODO.

URL paths changed — QStash scheduled cron target must be updated to `/api/workflows/memory-user-memory/cron/hourly`.

Refs LOBE-4968

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 17:41:51 +08:00
22 changed files with 1190 additions and 722 deletions
@@ -0,0 +1,182 @@
---
name: upstash-workflow-testing
description: Local testing guide for Upstash Workflow endpoints via the QStash dev server. Use when verifying workflow handlers end-to-end, debugging why a workflow step isn't firing, inspecting step-level logs and outputs, or triggering a dry-run from curl. Triggers on 'test workflow locally', 'smoke test workflow', 'qstash dev server', 'workflow dry run', 'debug workflow step'.
---
# Upstash Workflow Local Testing
How to trigger, observe, and debug Upstash Workflow endpoints against the local QStash dev server — **without** writing unit tests.
## TL;DR
Workflow endpoints reject raw curl (signature verification). To test locally:
1. **Publish via QStash dev server** at `localhost:8080`, not directly to your handler
2. **Query workflow logs** via `/v2/workflows/logs?workflowRunId=...` — NOT `/v2/events` (events only lists direct publishes, not workflow-internal step publishes)
## Prerequisites
The `.env` file already ships dev defaults:
```bash
QSTASH_URL="http://localhost:8080"
QSTASH_TOKEN="eyJVc2VySUQiOiJkZWZhdWx0VXNlciIs..." # dev default token
QSTASH_CURRENT_SIGNING_KEY="sig_..."
QSTASH_NEXT_SIGNING_KEY="sig_..."
```
Dev startup (`bun run dev`) boots both the Next.js server and a local QStash dev server. Verify:
```bash
lsof -i :3011 # Next.js
lsof -i :8080 # QStash dev server
```
If QStash isn't up: `brew install upstash/qstash/qstash-cli && qstash dev` (or check the dev startup script).
## 1. Trigger a workflow
Publish to QStash dev server, which signs + forwards to your handler:
```bash
TOKEN="$(grep '^QSTASH_TOKEN=' .env | cut -d '"' -f2)"
TARGET="http://localhost:3011/api/workflows/memory-user-memory/cron/hourly"
curl -X POST "http://localhost:8080/v2/publish/$TARGET" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"dryRun": true, "baseUrl": "http://localhost:3011"}'
# → {"messageId":"msg_..."}
```
For dry-run, always pass `"dryRun": true` in the body. The handler should return stats without triggering downstream L2/L3 pipelines.
## 2. Observe execution
### Workflow logs (authoritative source)
```bash
# List recent runs (all workflows)
curl -s "http://localhost:8080/v2/workflows/logs" \
-H "Authorization: Bearer $TOKEN" | jq '.runs[] | {id: .workflowRunId, url: .workflowUrl, state: .workflowState}'
# Inspect a specific run
WFR="wfr_xxxxxxxxxxxxxxxxx"
curl -s "http://localhost:8080/v2/workflows/logs?workflowRunId=$WFR" \
-H "Authorization: Bearer $TOKEN" | jq '.runs[0].steps'
```
Each step entry includes:
- `stepName` — what you passed to `context.run('<name>', ...)`
- `stepType``Initial` | `Run` | `Call` | `Invoke` | `SleepFor` | etc.
- `state``STEP_SUCCESS` | `STEP_FAILED` | `STEP_RETRY`
- `out` — JSON-serialized return value of the step (base64 in some fields)
- `messageId` — underlying QStash message for that step
Run-level `workflowState`:
- `RUN_SUCCESS` — all steps completed
- `RUN_FAILED` — a step hit max retries
- `RUN_CANCELED` — explicitly canceled
### Events (direct-publish only — NOT step-level)
```bash
curl -s "http://localhost:8080/v2/events?count=50" \
-H "Authorization: Bearer $TOKEN" | jq '.events[] | {state, url, messageId}'
```
`/v2/events` only shows messages you publish to QStash directly (the initial trigger, plus any `client.trigger(...)` calls from inside a `context.run`). It does **NOT** show internal workflow-step messages that `serve()` publishes to itself — for those, use `/v2/workflows/logs`.
If you trigger pipeline A → B and only see A's messages in `/v2/events`, that usually means A's handler published correctly but B hasn't been inspected by workflow logs yet. Query `/v2/workflows/logs` for B's workflowRunId instead.
## 3. Common failure modes
### a. 500 "Upstash-Signature header is not passed"
You curl'd the handler directly. Publish via `http://localhost:8080/v2/publish/<target>` instead.
### b. Handler runs but no downstream workflow fires
The `qstashClient` passed to `serve()` or used by your `triggerXxx` helper probably doesn't honor `QSTASH_URL`. **Both clients must point at the dev server.**
`@upstash/qstash`'s `Client` uses **`baseUrl`** in the config object (NOT `url`) and also reads `QSTASH_URL` from env automatically:
```ts
// ✅ Correct
new Client({ token, baseUrl: process.env.QSTASH_URL });
// ⚠️ Works too (env var fallback) but explicit is safer
new Client({ token });
```
`@upstash/workflow`'s `Client` — used by `MemoryExtractionWorkflowService` and similar trigger helpers — forwards to the same QStash client internally.
### c. `triggerXxx()` returns `{workflowRunId}` but `/v2/events` shows nothing
`/v2/events` only lists direct publishes. A `client.trigger()` call publishes to QStash's workflow API, which creates a run log entry (visible via `/v2/workflows/logs`) plus its own initial QStash message. Always cross-check with `/v2/workflows/logs` before concluding the trigger failed.
### d. Dry-run path still cascades to L2
Means the handler read `dryRun` from the wrong field. For our codebase the convention is to put `dryRun: true` at the **top level** of the body; the L1 handler reads it off `context.requestPayload` directly (not via `normalizeMemoryExtractionPayload`, which strips unknown fields). When in doubt, `appendFileSync('/tmp/<wf>-debug.log', ...)` inside the handler to log the exact payload received.
### e. You need to see handler logs but can't access dev server stdout
Dev is usually started in the background. When you can't tail stdout, drop a **temporary** file logger into the handler:
```ts
import { appendFileSync } from 'node:fs';
appendFileSync('/tmp/wf-debug.log', `[${new Date().toISOString()}] <message>\n`);
```
Delete before committing. Also consider `verbose: true` on the `serve()` options — that routes @upstash/workflow's internal tracing to console (which, again, you need stdout access for).
## 4. End-to-end smoke recipes
### Dry-run the entire hourly cron dispatcher
```bash
TOKEN=$(grep '^QSTASH_TOKEN=' .env | cut -d '"' -f2)
TARGET='http://localhost:3011/api/workflows/memory-user-memory/cron/hourly'
MSG=$(curl -s -X POST "http://localhost:8080/v2/publish/$TARGET" \
-H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' \
-d '{"dryRun": true, "baseUrl": "http://localhost:3011"}' \
| jq -r .messageId)
# Follow the cron/hourly run to completion (polls until RUN_SUCCESS or RUN_FAILED)
while :; do
STATE=$(curl -s "http://localhost:8080/v2/workflows/logs" \
-H "Authorization: Bearer $TOKEN" \
| jq -r --arg url "$TARGET" '.runs[] | select(.workflowUrl == $url) | .workflowState' | head -1)
echo "state: $STATE"
[[ "$STATE" == "RUN_SUCCESS" || "$STATE" == "RUN_FAILED" ]] && break
sleep 2
done
```
Expected on success: two child workflow runs appear in `/v2/workflows/logs` — one at `/topics/process-users`, one at `/persona/process-users`. Each should also reach `RUN_SUCCESS` in dry-run (L1 returns stats; no L2 triggered).
### Directly target a single L1 (skip the cron dispatcher)
```bash
TARGET='http://localhost:3011/api/workflows/memory-user-memory/topics/process-users'
curl -X POST "http://localhost:8080/v2/publish/$TARGET" \
-H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' \
-d '{"dryRun": true, "baseUrl": "http://localhost:3011", "mode": "workflow"}'
```
Then query logs for that workflow run — should complete in 12 steps with stats in the final step's `out`.
## 5. What NOT to do
- ❌ Unit-testing the handler by constructing a fake `WorkflowContext`. The workflow runtime does step caching, replay, and QStash round-trips that you can't realistically mock. Integration via QStash dev server is faster and more accurate.
- ❌ Bypassing signature verification by clearing `QSTASH_*_SIGNING_KEY` env. Dev QStash signs requests — leaving verification on catches misconfigured receivers.
- ❌ Relying on `/v2/events` as the full picture of a workflow run. Use `/v2/workflows/logs` for step-level truth.
## References
- Upstash QStash local dev: <https://upstash.com/docs/qstash/howto/local-development>
- Workflow basics (serve/context/run): <https://upstash.com/docs/workflow/basics/context>
- Related skill: `upstash-workflow` (implementation patterns)
+34
View File
@@ -395,6 +395,40 @@ export class UserModel {
return options.limit !== undefined ? query.limit(options.limit) : query;
};
static countUsersForHourlyMemoryExtractor = async (
db: LobeChatDatabase,
options: Pick<ListUsersForHourlyMemoryExtractorOptions, 'whitelist'> = {},
): Promise<number> => {
// NOTICE: Reversed from the per-row EXISTS pattern in listUsersForHourlyMemoryExtractor.
// Start with a CTE of users who actually chatted (messages ⋈ topics on user_id) then join
// to users+settings. On our data this is ~20× faster than the EXISTS variant because the
// chatted set is much smaller than the total users table.
const whitelistSql =
options.whitelist && options.whitelist.length > 0
? sql`AND u.id IN (${sql.join(
options.whitelist.map((id) => sql`${id}`),
sql`, `,
)})`
: sql``;
const rows = await db.execute<{ n: number }>(sql`
WITH chatted AS (
SELECT DISTINCT m.user_id
FROM ${messages} m
INNER JOIN ${topics} t ON t.id = m.topic_id AND t.user_id = m.user_id
WHERE m.role = 'user'
)
SELECT COUNT(*)::int AS n
FROM chatted c
INNER JOIN ${users} u ON u.id = c.user_id
LEFT JOIN ${userSettings} s ON s.id = u.id
WHERE COALESCE((s.memory ->> 'enabled')::boolean, true) = true
${whitelistSql}
`);
return rows.rows[0]?.n ?? 0;
};
/**
* Get user info for AI generation (name and language preference)
*/
@@ -41,7 +41,7 @@ export const POST = async (req: Request) => {
const params = normalizeMemoryExtractionPayload(payload, origin);
if (params.mode === 'workflow') {
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerProcessUsers(
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerTopicsProcessUsers(
buildWorkflowPayloadInput(params),
{ extraHeaders: upstashWorkflowExtraHeaders },
);
@@ -63,7 +63,7 @@ export const POST = async (req: Request) => {
if (params.mode === 'workflow') {
const results = await Promise.all(
params.userIds.map(async (userId) => {
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerPersonaUpdate(
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerPersonaExecuteUser(
userId,
params.baseUrl,
{ extraHeaders: upstashWorkflowExtraHeaders },
@@ -12,8 +12,8 @@ const mockFindById = vi.fn();
const mockCountTopicsForMemoryExtractor = vi.fn();
const mockDeleteAll = vi.fn();
const { mockTriggerProcessUsers } = vi.hoisted(() => ({
mockTriggerProcessUsers: vi.fn(),
const { mockTriggerTopicsProcessUsers } = vi.hoisted(() => ({
mockTriggerTopicsProcessUsers: vi.fn(),
}));
vi.mock('@/database/models/asyncTask', () => ({
@@ -59,7 +59,7 @@ vi.mock('@/server/globalConfig/parseMemoryExtractionConfig', () => ({
vi.mock('@/server/services/memory/userMemory/extract', () => ({
MemoryExtractionWorkflowService: {
triggerProcessUsers: mockTriggerProcessUsers,
triggerTopicsProcessUsers: mockTriggerTopicsProcessUsers,
},
buildWorkflowPayloadInput: (payload: any) => payload,
normalizeMemoryExtractionPayload: (payload: any) => payload,
@@ -78,7 +78,7 @@ const createCaller = (ctxOverrides: Partial<any> = {}) => {
describe('userMemoryRouter.requestMemoryFromChatTopic', () => {
beforeEach(() => {
vi.clearAllMocks();
mockTriggerProcessUsers.mockResolvedValue({ workflowRunId: 'workflow-run-1' });
mockTriggerTopicsProcessUsers.mockResolvedValue({ workflowRunId: 'workflow-run-1' });
});
it('dedupes when an active task exists', async () => {
@@ -98,7 +98,7 @@ describe('userMemoryRouter.requestMemoryFromChatTopic', () => {
status: AsyncTaskStatus.Pending,
});
expect(mockCreate).not.toHaveBeenCalled();
expect(mockTriggerProcessUsers).not.toHaveBeenCalled();
expect(mockTriggerTopicsProcessUsers).not.toHaveBeenCalled();
});
it('creates task and triggers workflow with user context and dates', async () => {
@@ -124,7 +124,7 @@ describe('userMemoryRouter.requestMemoryFromChatTopic', () => {
status: AsyncTaskStatus.Pending,
type: AsyncTaskType.UserMemoryExtractionWithChatTopic,
});
expect(mockTriggerProcessUsers).toHaveBeenCalledWith(
expect(mockTriggerTopicsProcessUsers).toHaveBeenCalledWith(
expect.objectContaining({
asyncTaskId: 'new-task',
baseUrl: 'https://internal.example.com',
@@ -170,7 +170,7 @@ describe('userMemoryRouter.requestMemoryFromChatTopic', () => {
},
status: AsyncTaskStatus.Success,
});
expect(mockTriggerProcessUsers).not.toHaveBeenCalled();
expect(mockTriggerTopicsProcessUsers).not.toHaveBeenCalled();
});
it('throws on invalid date range', async () => {
+1 -1
View File
@@ -275,7 +275,7 @@ export const userMemoryRouter = router({
const baseUrl = webhook.baseUrl || appEnv.INTERNAL_APP_URL || appEnv.APP_URL;
try {
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerProcessUsers(
const { workflowRunId } = await MemoryExtractionWorkflowService.triggerTopicsProcessUsers(
buildWorkflowPayloadInput(
normalizeMemoryExtractionPayload({
asyncTaskId: taskId,
@@ -1709,6 +1709,13 @@ export class MemoryExtractionExecutor {
};
}
async countUsersForHourlyExtraction(): Promise<number> {
const db = await this.db;
return UserModel.countUsersForHourlyMemoryExtractor(db, {
whitelist: this.privateConfig.whitelistUsers,
});
}
async getUsersForHourlyExtraction(
limit: number,
cursor?: ListUsersForMemoryExtractorCursor,
@@ -2264,12 +2271,34 @@ export class MemoryExtractionExecutor {
}
}
const WORKFLOW_PATHS = {
hourly: '/api/workflows/memory-user-memory/call-cron-hourly-analysis',
personaUpdate: '/api/workflows/memory-user-memory/pipelines/persona/update-writing',
topicBatch: '/api/workflows/memory-user-memory/pipelines/chat-topic/process-topics',
userTopics: '/api/workflows/memory-user-memory/pipelines/chat-topic/process-user-topics',
users: '/api/workflows/memory-user-memory/pipelines/chat-topic/process-users',
export interface PersonaWorkflowCursor {
createdAt: string;
id: string;
}
export interface PersonaProcessUsersWorkflowPayload {
baseUrl?: string;
cursor?: PersonaWorkflowCursor;
dryRun?: boolean;
userIds?: string[];
}
export interface PersonaExecuteUserWorkflowPayload {
userId: string;
}
export const WORKFLOW_PATHS = {
hourlyCron: '/api/workflows/memory-user-memory/cron/hourly',
personaExecuteUser: '/api/workflows/memory-user-memory/persona/execute-user',
personaPaginateUsers: '/api/workflows/memory-user-memory/persona/paginate-users',
personaProcessUsers: '/api/workflows/memory-user-memory/persona/process-users',
topicsExecuteUser: '/api/workflows/memory-user-memory/topics/execute-user',
// Invoked internally via context.invoke from topics/execute-user; workflowId must match the last path segment.
topicsExtractTopic: '/api/workflows/memory-user-memory/topics/extract-topic',
topicsPaginateUsers: '/api/workflows/memory-user-memory/topics/paginate-users',
topicsProcessUsers: '/api/workflows/memory-user-memory/topics/process-users',
} as const;
const getWorkflowUrl = (path: string, baseUrl: string) => {
@@ -2291,6 +2320,13 @@ const getWorkflowClient = () => {
return new Client(config);
};
type TriggerOptions = { extraHeaders?: Record<string, string> };
const requireBaseUrl = (baseUrl: string | undefined): string => {
if (!baseUrl) throw new Error('Missing baseUrl for workflow trigger');
return baseUrl;
};
export class MemoryExtractionWorkflowService {
private static client: Client;
@@ -2302,87 +2338,80 @@ export class MemoryExtractionWorkflowService {
return this.client;
}
static triggerProcessUsers(
payload: MemoryExtractionPayloadInput,
options?: { extraHeaders?: Record<string, string> },
) {
if (!payload.baseUrl) {
throw new Error('Missing baseUrl for workflow trigger');
}
// ─── External cron entry ──────────────────────────────────────────────
const url = getWorkflowUrl(WORKFLOW_PATHS.users, payload.baseUrl);
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
}
static triggerHourly(
static triggerHourlyCron(
payload: MemoryExtractionHourlyWorkflowPayload,
options?: { extraHeaders?: Record<string, string> },
options?: TriggerOptions,
) {
if (!payload.baseUrl) {
throw new Error('Missing baseUrl for workflow trigger');
}
const url = getWorkflowUrl(WORKFLOW_PATHS.hourly, payload.baseUrl);
const url = getWorkflowUrl(WORKFLOW_PATHS.hourlyCron, requireBaseUrl(payload.baseUrl));
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
}
static triggerProcessUserTopics(
payload: UserTopicWorkflowPayload,
options?: { extraHeaders?: Record<string, string> },
) {
if (!payload.baseUrl) {
throw new Error('Missing baseUrl for workflow trigger');
}
// ─── Topics pipeline (3 layers + inner extract-topic workflow) ────────
const url = getWorkflowUrl(WORKFLOW_PATHS.userTopics, payload.baseUrl);
return this.getClient().trigger({
body: payload,
headers: options?.extraHeaders,
url,
});
static triggerTopicsProcessUsers(
payload: MemoryExtractionPayloadInput,
options?: TriggerOptions,
) {
const url = getWorkflowUrl(WORKFLOW_PATHS.topicsProcessUsers, requireBaseUrl(payload.baseUrl));
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
}
static triggerProcessTopics(
static triggerTopicsPaginateUsers(
payload: MemoryExtractionPayloadInput,
options?: TriggerOptions,
) {
const url = getWorkflowUrl(WORKFLOW_PATHS.topicsPaginateUsers, requireBaseUrl(payload.baseUrl));
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
}
static triggerTopicsExecuteUser(
userId: string,
payload: MemoryExtractionPayloadInput,
options?: { extraHeaders?: Record<string, string> },
options?: TriggerOptions,
) {
if (!payload.baseUrl) {
throw new Error('Missing baseUrl for workflow trigger');
}
const url = getWorkflowUrl(WORKFLOW_PATHS.topicBatch, payload.baseUrl);
const url = getWorkflowUrl(WORKFLOW_PATHS.topicsExecuteUser, requireBaseUrl(payload.baseUrl));
return this.getClient().trigger({
body: payload,
flowControl: {
key: `memory-user-memory.pipelines.chat-topic.process-topics.user.${userId}`,
// NOTICE: if modified the parallelism of
// src/server/workflows-hono/memory-user-memory/workflows/processTopics.ts
// or added new memory layer, make sure to update the number below.
//
// Currently, CEPA (context, experience, preference, activity) + identity = 5 layers.
// and since identity requires sequential processing, we set parallelism to 5.
key: `memory-user-memory.topics.execute-user.${userId}`,
// NOTICE: one execute-user run invokes extract-topic per topic in parallel.
// Keep this conservative so a single user with many topics does not starve the queue.
parallelism: 5,
},
} satisfies FlowControl,
headers: options?.extraHeaders,
url,
});
}
static triggerPersonaUpdate(
userId: string,
baseUrl: string,
options?: { extraHeaders?: Record<string, string> },
) {
if (!baseUrl) {
throw new Error('Missing baseUrl for workflow trigger');
}
// ─── Persona pipeline (3 layers) ──────────────────────────────────────
const url = getWorkflowUrl(WORKFLOW_PATHS.personaUpdate, baseUrl);
static triggerPersonaProcessUsers(
payload: PersonaProcessUsersWorkflowPayload,
options?: TriggerOptions,
) {
const url = getWorkflowUrl(WORKFLOW_PATHS.personaProcessUsers, requireBaseUrl(payload.baseUrl));
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
}
static triggerPersonaPaginateUsers(
payload: PersonaProcessUsersWorkflowPayload,
options?: TriggerOptions,
) {
const url = getWorkflowUrl(
WORKFLOW_PATHS.personaPaginateUsers,
requireBaseUrl(payload.baseUrl),
);
return this.getClient().trigger({ body: payload, headers: options?.extraHeaders, url });
}
static triggerPersonaExecuteUser(userId: string, baseUrl: string, options?: TriggerOptions) {
const url = getWorkflowUrl(WORKFLOW_PATHS.personaExecuteUser, requireBaseUrl(baseUrl));
return this.getClient().trigger({
body: { userIds: [userId] },
body: { userId } satisfies PersonaExecuteUserWorkflowPayload,
flowControl: {
key: `memory-user-memory.pipelines.persona.update-write.${userId}`,
key: `memory-user-memory.persona.execute-user.${userId}`,
parallelism: 1,
} satisfies FlowControl,
headers: options?.extraHeaders,
@@ -2,52 +2,60 @@ import { serve, serveMany } from '@upstash/workflow/hono';
import { Hono } from 'hono';
import { createWorkflowQstashClient } from './qstashClient';
import { hourlyWorkflowHandler, hourlyWorkflowOptions } from './workflows/hourly';
import { personaUpdateHandler } from './workflows/personaUpdate';
import { processTopicWorkflow } from './workflows/processTopic';
import { processTopicsHandler } from './workflows/processTopics';
import { processUsersHandler } from './workflows/processUsers';
import { processUserTopicsHandler } from './workflows/processUserTopics';
import { hourlyCronHandler } from './workflows/cron/hourly';
import { executeUserHandler as personaExecuteUserHandler } from './workflows/persona/executeUser';
import { paginateUsersHandler as personaPaginateUsersHandler } from './workflows/persona/paginateUsers';
import { processUsersHandler as personaProcessUsersHandler } from './workflows/persona/processUsers';
import { executeUserHandler as topicsExecuteUserHandler } from './workflows/topics/executeUser';
import { extractTopicWorkflow } from './workflows/topics/extractTopic';
import { paginateUsersHandler as topicsPaginateUsersHandler } from './workflows/topics/paginateUsers';
import { processUsersHandler as topicsProcessUsersHandler } from './workflows/topics/processUsers';
const app = new Hono().basePath('/api/workflows/memory-user-memory');
// ─── External cron entry ───────────────────────────────────────────────
app.post('/cron/hourly', serve(hourlyCronHandler, { qstashClient: createWorkflowQstashClient() }));
// ─── Topics pipeline (3 layers + inner extract-topic workflow) ─────────
app.post(
'/call-cron-hourly-analysis',
serve(hourlyWorkflowHandler, {
...hourlyWorkflowOptions,
qstashClient: createWorkflowQstashClient(),
}),
'/topics/process-users',
serve(topicsProcessUsersHandler, { qstashClient: createWorkflowQstashClient() }),
);
app.post(
'/pipelines/persona/update-writing',
serve(personaUpdateHandler, { qstashClient: createWorkflowQstashClient() }),
'/topics/paginate-users',
serve(topicsPaginateUsersHandler, { qstashClient: createWorkflowQstashClient() }),
);
app.post(
'/pipelines/chat-topic/process-users',
serve(processUsersHandler, { qstashClient: createWorkflowQstashClient() }),
'/topics/execute-user',
serve(topicsExecuteUserHandler, { qstashClient: createWorkflowQstashClient() }),
);
// NOTICE: `context.invoke(extractTopicWorkflow)` in topics/execute-user rewrites the URL last
// segment to the workflowId (`extract-topic`); serveMany dispatches that to the right workflow.
app.post(
'/pipelines/chat-topic/process-user-topics',
serve(processUserTopicsHandler, { qstashClient: createWorkflowQstashClient() }),
);
app.post(
'/pipelines/chat-topic/process-topics',
serve(processTopicsHandler, { qstashClient: createWorkflowQstashClient() }),
);
// NOTICE: Must use serveMany here. The `context.invoke(processTopicWorkflow)` call in
// process-topics rewrites the URL last segment to the workflowId ("process-topic"). serveMany
// multiplexes by that final segment to dispatch to the right workflow.
app.post(
'/pipelines/chat-topic/process-topic',
'/topics/extract-topic',
serveMany(
{ 'process-topic': processTopicWorkflow },
{ 'extract-topic': extractTopicWorkflow },
{ qstashClient: createWorkflowQstashClient() },
),
);
// ─── Persona pipeline (3 layers) ───────────────────────────────────────
app.post(
'/persona/process-users',
serve(personaProcessUsersHandler, { qstashClient: createWorkflowQstashClient() }),
);
app.post(
'/persona/paginate-users',
serve(personaPaginateUsersHandler, { qstashClient: createWorkflowQstashClient() }),
);
app.post(
'/persona/execute-user',
serve(personaExecuteUserHandler, { qstashClient: createWorkflowQstashClient() }),
);
export default app;
@@ -10,6 +10,9 @@ const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
// https://upstash.com/docs/workflow/troubleshooting/vercel#step-2-pass-header-when-triggering
export const createWorkflowQstashClient = () =>
new Client({
// QStash Client reads QSTASH_URL/QSTASH_TOKEN from env automatically; explicit baseUrl here
// keeps local-dev behavior visible and avoids any config-shape confusion with `url`.
...(process.env.QSTASH_URL ? { baseUrl: process.env.QSTASH_URL } : {}),
headers: { ...upstashWorkflowExtraHeaders },
token: process.env.QSTASH_TOKEN!,
});
@@ -0,0 +1,56 @@
import { type WorkflowContext } from '@upstash/workflow';
import { appEnv } from '@/envs/app';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import {
type MemoryExtractionHourlyWorkflowPayload,
type MemoryExtractionPayloadInput,
MemoryExtractionWorkflowService,
} from '@/server/services/memory/userMemory/extract';
const { webhook, upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const resolveBaseUrl = () => webhook.baseUrl || appEnv.INTERNAL_APP_URL || appEnv.APP_URL;
/**
* External cron entry.
*
* QStash scheduled cron posts here hourly. Responsibility: dispatch both pipelines (topics + persona)
* L1 in parallel. Each pipeline then does its own pagination and filtering — this handler stays thin.
*/
export const hourlyCronHandler = async (
context: WorkflowContext<MemoryExtractionHourlyWorkflowPayload>,
) => {
const payload = context.requestPayload || ({} as MemoryExtractionHourlyWorkflowPayload);
const baseUrl = payload.baseUrl || resolveBaseUrl();
if (!baseUrl) {
throw new Error('Missing baseUrl for hourly cron dispatcher');
}
const dryRun = !!payload.dryRun;
await context.run('memory:cron:hourly:trigger-topics-process-users', () =>
MemoryExtractionWorkflowService.triggerTopicsProcessUsers(
// `dryRun` isn't part of MemoryExtractionPayloadInput but is forwarded verbatim in the JSON
// body; the topics L1 handler reads it off the raw request payload.
{ baseUrl, dryRun, mode: 'workflow' } as MemoryExtractionPayloadInput & { dryRun?: boolean },
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
await context.run('memory:cron:hourly:trigger-persona-process-users', () =>
MemoryExtractionWorkflowService.triggerPersonaProcessUsers(
{ baseUrl, dryRun },
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
return {
dispatched: { persona: true, topics: true },
dryRun,
message: dryRun
? '[DryRun] Dispatched both topics and persona pipelines in dry-run mode.'
: 'Dispatched both topics and persona pipelines.',
success: true,
};
};
@@ -1,105 +0,0 @@
import { MemorySourceType } from '@lobechat/types';
import { type WorkflowContext } from '@upstash/workflow';
import { chunk } from 'es-toolkit/compat';
import { appEnv } from '@/envs/app';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import {
buildWorkflowPayloadInput,
MemoryExtractionExecutor,
type MemoryExtractionHourlyWorkflowPayload,
MemoryExtractionWorkflowService,
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
const USER_PAGE_SIZE = 200;
const USER_BATCH_SIZE = 20;
const { webhook, upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const resolveBaseUrl = () => webhook.baseUrl || appEnv.INTERNAL_APP_URL || appEnv.APP_URL;
export const hourlyWorkflowHandler = async (
context: WorkflowContext<MemoryExtractionHourlyWorkflowPayload>,
) => {
const { cursor, dryRun } = context.requestPayload || {};
const baseUrl = resolveBaseUrl();
if (!baseUrl) {
throw new Error('Missing baseUrl for hourly memory extraction workflow');
}
const parsedCursor = cursor
? { createdAt: new Date(cursor.createdAt), id: cursor.id }
: undefined;
if (parsedCursor && Number.isNaN(parsedCursor.createdAt.getTime())) {
throw new Error('Invalid cursor date for hourly memory extraction workflow');
}
const executor = await MemoryExtractionExecutor.create();
const userBatch = await context.run(
`memory:user-memory:hourly:list-users:${parsedCursor?.id || 'root'}`,
() => executor.getUsersForHourlyExtraction(USER_PAGE_SIZE, parsedCursor),
);
const userIds = userBatch.ids;
if (userIds.length === 0) {
return { message: 'No eligible users for hourly memory extraction.', processedUsers: 0 };
}
const nextCursor = userBatch.cursor
? {
createdAt: userBatch.cursor.createdAt.toISOString(),
id: userBatch.cursor.id,
}
: undefined;
if (!dryRun) {
const batches = chunk(userIds, USER_BATCH_SIZE);
await Promise.all(
batches.map((batchUserIds, index) =>
context.run(`memory:user-memory:hourly:trigger-users:${index}`, () =>
MemoryExtractionWorkflowService.triggerProcessUsers(
buildWorkflowPayloadInput(
normalizeMemoryExtractionPayload({
baseUrl,
mode: 'workflow',
sources: [MemorySourceType.ChatTopic],
userIds: batchUserIds,
}),
),
{ extraHeaders: upstashWorkflowExtraHeaders },
),
),
),
);
}
if (nextCursor) {
await context.run('memory:user-memory:hourly:schedule-next-page', () =>
MemoryExtractionWorkflowService.triggerHourly(
{
baseUrl,
cursor: nextCursor,
dryRun,
},
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
}
return {
dryRun: !!dryRun,
hasNextPage: !!nextCursor,
processedUsers: userIds.length,
scheduledBatches: dryRun ? 0 : chunk(userIds, USER_BATCH_SIZE).length,
};
};
export const hourlyWorkflowOptions = {
flowControl: {
key: 'memory-user-memory.call-cron-hourly-analysis',
parallelism: 1,
ratePerSecond: 1,
},
};
@@ -0,0 +1,45 @@
import { type WorkflowContext } from '@upstash/workflow';
import { z } from 'zod';
import { getServerDB } from '@/database/server';
import { type PersonaExecuteUserWorkflowPayload } from '@/server/services/memory/userMemory/extract';
import {
buildUserPersonaJobInput,
UserPersonaService,
} from '@/server/services/memory/userMemory/persona/service';
const payloadSchema = z.object({
userId: z.string(),
});
/**
* L3: Compose persona writing for ONE user.
*/
export const executeUserHandler = async (
context: WorkflowContext<PersonaExecuteUserWorkflowPayload>,
) => {
const payload = await context.run('memory:persona:execute-user:parse-payload', () =>
payloadSchema.parse(context.requestPayload || {}),
);
const { userId } = payload;
const db = await getServerDB();
const service = new UserPersonaService(db);
const result = await context.run(`memory:persona:execute-user:${userId}:compose`, async () => {
const jobInput = await buildUserPersonaJobInput(db, userId);
const composed = await service.composeWriting({ ...jobInput, userId });
return {
diffId: composed.diff?.id,
documentId: composed.document.id,
userId,
version: composed.document.version,
};
});
return {
message: 'Persona composed.',
success: true,
...result,
};
};
@@ -0,0 +1,109 @@
import { type WorkflowContext } from '@upstash/workflow';
import { chunk } from 'es-toolkit/compat';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import {
MemoryExtractionExecutor,
MemoryExtractionWorkflowService,
type PersonaProcessUsersWorkflowPayload,
} from '@/server/services/memory/userMemory/extract';
const USER_PAGE_SIZE = 50;
const CHUNK_SIZE = 20;
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const requireBaseUrl = (baseUrl?: string) => {
if (!baseUrl) throw new Error('Missing baseUrl for persona paginate-users');
return baseUrl;
};
/**
* L2: Paginate eligible users and fan-out to execute-user.
*
* Reuses `executor.getUsers` (same base set as topics) — persona eligibility finer-grained
* filtering lives inside `UserPersonaService.composeWriting`, which is a no-op for users whose
* persona is up-to-date.
*/
export const paginateUsersHandler = async (
context: WorkflowContext<PersonaProcessUsersWorkflowPayload>,
) => {
const payload = context.requestPayload || ({} as PersonaProcessUsersWorkflowPayload);
const baseUrl = requireBaseUrl(payload.baseUrl);
// Fan-out chunk path.
if (payload.userIds && payload.userIds.length > 0 && !payload.cursor) {
await Promise.all(
payload.userIds.map((userId) =>
context.run(`memory:persona:paginate-users:fanout:execute:${userId}`, () =>
MemoryExtractionWorkflowService.triggerPersonaExecuteUser(userId, baseUrl, {
extraHeaders: upstashWorkflowExtraHeaders,
}),
),
),
);
return { fannedOut: payload.userIds.length };
}
// Cursor pagination.
const cursor = payload.cursor
? { createdAt: new Date(payload.cursor.createdAt), id: payload.cursor.id }
: undefined;
if (cursor && Number.isNaN(cursor.createdAt.getTime())) {
throw new Error('Invalid cursor.createdAt for persona paginate-users');
}
const executor = await MemoryExtractionExecutor.create();
const batch = await context.run(
`memory:persona:paginate-users:list:${cursor?.id || 'root'}`,
() => executor.getUsersForHourlyExtraction(USER_PAGE_SIZE, cursor),
);
const userIds = batch.ids;
const nextCursor = 'cursor' in batch ? batch.cursor : undefined;
if (userIds.length === 0) {
return { message: 'No users in page, pagination complete.' };
}
if (userIds.length > CHUNK_SIZE) {
const chunks = chunk(userIds, CHUNK_SIZE);
await Promise.all(
chunks.map((chunkIds, idx) =>
context.run(`memory:persona:paginate-users:fanout:${idx + 1}/${chunks.length}`, () =>
MemoryExtractionWorkflowService.triggerPersonaPaginateUsers(
{ baseUrl, userIds: chunkIds },
{ extraHeaders: upstashWorkflowExtraHeaders },
),
),
),
);
} else {
await Promise.all(
userIds.map((userId) =>
context.run(`memory:persona:paginate-users:execute:${userId}`, () =>
MemoryExtractionWorkflowService.triggerPersonaExecuteUser(userId, baseUrl, {
extraHeaders: upstashWorkflowExtraHeaders,
}),
),
),
);
}
if (nextCursor) {
await context.run('memory:persona:paginate-users:schedule-next-page', () =>
MemoryExtractionWorkflowService.triggerPersonaPaginateUsers(
{
baseUrl,
cursor: { createdAt: nextCursor.createdAt.toISOString(), id: nextCursor.id },
},
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
}
return {
nextCursor: nextCursor ? nextCursor.id : null,
processedUsers: userIds.length,
};
};
@@ -0,0 +1,86 @@
import { type WorkflowContext } from '@upstash/workflow';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import {
MemoryExtractionExecutor,
MemoryExtractionWorkflowService,
type PersonaProcessUsersWorkflowPayload,
} from '@/server/services/memory/userMemory/extract';
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const requireBaseUrl = (baseUrl?: string) => {
if (!baseUrl) throw new Error('Missing baseUrl for persona process-users');
return baseUrl;
};
/**
* L1: Entry for the persona update pipeline.
*
* - If `userIds` provided, skip eligibility query and fan out directly via L2.
* - Else materialise the full eligible user list in one step so dry-run can report the exact total.
* - When not dry-run: trigger L2 (paginate-users) to walk all users via cursor pagination.
*/
export const processUsersHandler = async (
context: WorkflowContext<PersonaProcessUsersWorkflowPayload>,
) => {
const payload = context.requestPayload || ({} as PersonaProcessUsersWorkflowPayload);
const baseUrl = requireBaseUrl(payload.baseUrl);
const dryRun = !!payload.dryRun;
if (payload.userIds && payload.userIds.length > 0) {
if (dryRun) {
return {
dryRun: true,
message: `[DryRun] Would fan out ${payload.userIds.length} pre-specified users.`,
success: true,
targetUsers: payload.userIds.length,
};
}
await context.run('memory:persona:process-users:trigger-paginate-fanout', () =>
MemoryExtractionWorkflowService.triggerPersonaPaginateUsers(
{ baseUrl, userIds: payload.userIds },
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
return { success: true, triggeredFanout: payload.userIds.length };
}
// Count-only query: cheap COUNT(*) with the hourly-extraction filter.
const executor = await MemoryExtractionExecutor.create();
const totalEligible = await context.run('memory:persona:process-users:count-eligible-users', () =>
executor.countUsersForHourlyExtraction(),
);
if (totalEligible === 0) {
return {
message: 'No eligible users for persona update.',
success: true,
totalEligible: 0,
};
}
if (dryRun) {
return {
dryRun: true,
message: `[DryRun] Would process ${totalEligible} users.`,
success: true,
totalEligible,
};
}
await context.run('memory:persona:process-users:trigger-paginate', () =>
MemoryExtractionWorkflowService.triggerPersonaPaginateUsers(
{ baseUrl },
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
return {
message: `Triggered paginate-users for ${totalEligible} eligible users.`,
success: true,
totalEligible,
};
};
@@ -1,46 +0,0 @@
import { type WorkflowContext } from '@upstash/workflow';
import { z } from 'zod';
import { getServerDB } from '@/database/server';
import {
buildUserPersonaJobInput,
UserPersonaService,
} from '@/server/services/memory/userMemory/persona/service';
const workflowPayloadSchema = z.object({
userIds: z.array(z.string()).optional(),
});
export const personaUpdateHandler = async (context: WorkflowContext) => {
const payload = await context.run('memory:pipelines:persona:update-writing:parse-payload', () =>
workflowPayloadSchema.parse(context.requestPayload || {}),
);
const db = await getServerDB();
const userIds = Array.from(new Set(payload.userIds || [])).filter(Boolean);
if (userIds.length === 0) {
throw new Error('No user IDs provided for persona update.');
}
const service = new UserPersonaService(db);
await Promise.all(
userIds.map(async (userId) =>
context.run(`memory:pipelines:persona:update-writing:users:${userId}`, async () => {
const jobInput = await buildUserPersonaJobInput(db, userId);
const result = await service.composeWriting({ ...jobInput, userId });
return {
diffId: result.diff?.id,
documentId: result.document.id,
userId,
version: result.document.version,
};
}),
),
);
return {
message: 'User persona processed via workflow.',
processedUsers: userIds.length,
};
};
@@ -1,163 +0,0 @@
import { SpanStatusCode } from '@lobechat/observability-otel/api';
import {
buildUpstashWorkflowAttributes,
tracer as upstashWorkflowTracer,
} from '@lobechat/observability-otel/modules/upstash-workflow';
import { LayersEnum, MemorySourceType } from '@lobechat/types';
import { type WorkflowContext } from '@upstash/workflow';
import { WorkflowAbort } from '@upstash/workflow';
import { AsyncTaskModel } from '@/database/models/asyncTask';
import { getServerDB } from '@/database/server';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
import {
MemoryExtractionWorkflowService,
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
import { processTopicWorkflow } from './processTopic';
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const CEPA_LAYERS: LayersEnum[] = [
LayersEnum.Context,
LayersEnum.Experience,
LayersEnum.Preference,
LayersEnum.Activity,
];
const IDENTITY_LAYERS: LayersEnum[] = [LayersEnum.Identity];
export const processTopicsHandler = (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
upstashWorkflowTracer.startActiveSpan(
'workflow:memory-user-memory:process-topics',
async (span) => {
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
span.setAttributes({
...buildUpstashWorkflowAttributes(context),
'workflow.memory_user_memory.force_all': payload.forceAll,
'workflow.memory_user_memory.force_topics': payload.forceTopics,
'workflow.memory_user_memory.layers': payload.layers.join(','),
'workflow.memory_user_memory.source': payload.sources.join(','),
'workflow.memory_user_memory.topic_count': payload.topicIds.length,
'workflow.memory_user_memory.user_count': payload.userIds.length,
'workflow.name': 'memory-user-memory:process-topics',
});
try {
if (!payload.userIds.length) {
span.setStatus({ code: SpanStatusCode.OK });
return {
message: 'No user id provided for topic batch.',
processedTopics: 0,
processedUsers: 0,
};
}
if (!payload.topicIds.length) {
span.setStatus({ code: SpanStatusCode.OK });
return {
message: 'No topic ids provided for extraction.',
processedTopics: 0,
processedUsers: 0,
};
}
if (!payload.sources.includes(MemorySourceType.ChatTopic)) {
span.setStatus({ code: SpanStatusCode.OK });
return {
message: 'Source not supported in topic batch.',
processedTopics: 0,
processedUsers: 0,
};
}
const userId = payload.userIds[0];
if (payload.asyncTaskId && userId) {
// NOTICE: Cooperative cascading cancellation for the workflow tree.
// If cancelled, stop before fan-out into per-topic child workflows.
const cancelled = await context.run(
`memory:user-memory:extract:users:${userId}:cancel-check`,
() =>
getServerDB().then((db) =>
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
payload.asyncTaskId!,
),
),
);
if (cancelled) {
span.setStatus({ code: SpanStatusCode.OK });
return {
message: 'Memory extraction task cancellation requested, skip topic batch.',
processedTopics: 0,
processedUsers: 0,
};
}
}
// Delegate per-topic extraction to dedicated workflow for better isolation
await Promise.all(
payload.topicIds.map(async (topicId, index) => {
await context.invoke(
`memory:user-memory:extract:users:${userId}:topics:${topicId}:invoke:${index}`,
{
body: {
...payload,
layers: payload.layers.length
? payload.layers
: [...CEPA_LAYERS, ...IDENTITY_LAYERS],
topicIds: [topicId],
userId,
userIds: [userId],
},
// CEPA: run in parallel across the batch
//
// NOTICE: if modified the parallelism of CEPA_LAYERS
// or added new memory layer, make sure to update the number below.
//
// Currently, CEPA (context, experience, preference, activity) + identity = 5 layers.
// and since identity requires sequential processing, we set parallelism to 5.
flowControl: {
key: `memory-user-memory.pipelines.chat-topic.process-topic.user.${userId}.topic.${topicId}`,
parallelism: 5,
},
headers: upstashWorkflowExtraHeaders,
workflow: processTopicWorkflow,
},
);
}),
);
// Trigger user persona update after topic processing using the workflow client.
await context.run(`memory:user-memory:users:${userId}`, async () => {
await MemoryExtractionWorkflowService.triggerPersonaUpdate(userId, payload.baseUrl, {
extraHeaders: upstashWorkflowExtraHeaders,
});
});
span.setStatus({ code: SpanStatusCode.OK });
return {
processedTopics: payload.topicIds.length,
processedUsers: payload.userIds.length,
};
} catch (error) {
// NOTICE: Let WorkflowAbort bubble up (used internally by Upstash); record others
if (error instanceof WorkflowAbort) {
console.warn('workflow aborted:', error.message);
throw error;
}
span.recordException(error as Error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error instanceof Error ? error.message : 'process-topics workflow failed',
});
throw error;
} finally {
span.end();
}
},
);
@@ -1,159 +0,0 @@
import { MemorySourceType } from '@lobechat/types';
import { type WorkflowContext } from '@upstash/workflow';
import { AsyncTaskModel } from '@/database/models/asyncTask';
import { type ListTopicsForMemoryExtractorCursor } from '@/database/models/topic';
import { getServerDB } from '@/database/server';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
import {
buildWorkflowPayloadInput,
MemoryExtractionExecutor,
MemoryExtractionWorkflowService,
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
import { forEachBatchSequential } from '@/server/services/memory/userMemory/topicBatching';
const TOPIC_PAGE_SIZE = 50;
const TOPIC_BATCH_SIZE = 4;
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
export const processUserTopicsHandler = async (
context: WorkflowContext<MemoryExtractionPayloadInput>,
) => {
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
if (!params.userIds.length) {
return { message: 'No user ids provided for topic processing.' };
}
if (!params.sources.includes(MemorySourceType.ChatTopic)) {
return { message: 'No supported sources requested, skip topic processing.' };
}
const executor = await MemoryExtractionExecutor.create();
const scheduleNextPage = async (userId: string, cursorCreatedAt: Date, cursorId: string) => {
await MemoryExtractionWorkflowService.triggerProcessUserTopics(
{
...buildWorkflowPayloadInput({
...params,
topicCursor: {
createdAt: cursorCreatedAt.toISOString(),
id: cursorId,
userId,
},
topicIds: [],
userId,
userIds: [userId],
}),
},
{ extraHeaders: upstashWorkflowExtraHeaders },
);
};
for (const userId of params.userIds) {
if (params.asyncTaskId) {
// NOTICE: Cooperative cascading cancellation for the workflow tree.
// A cancelled root task should stop at user-topic pagination and avoid enqueuing topic batches.
const cancelled = await context.run(
`memory:user-memory:extract:users:${userId}:cancel-check`,
() =>
getServerDB().then((db) =>
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
params.asyncTaskId!,
),
),
);
if (cancelled) {
continue;
}
}
const topicCursor =
params.topicCursor && params.topicCursor.userId === userId
? {
createdAt: new Date(params.topicCursor.createdAt),
id: params.topicCursor.id,
}
: undefined;
const topicsFromPayload =
params.topicIds && params.topicIds.length > 0
? await context.run(
`memory:user-memory:extract:users:${userId}:filter-topic-ids`,
async () => {
const filtered = await executor.filterTopicIdsForUser(userId, params.topicIds);
return filtered.length > 0 ? filtered : undefined;
},
)
: undefined;
const topicBatch = await context.run<{
cursor?: ListTopicsForMemoryExtractorCursor;
ids: string[];
}>(`memory:user-memory:extract:users:${userId}:list-topics:${topicCursor?.id || 'root'}`, () =>
topicsFromPayload && topicsFromPayload.length > 0
? Promise.resolve({ ids: topicsFromPayload })
: executor.getTopicsForUser(
{
cursor: topicCursor,
forceAll: params.forceAll,
forceTopics: params.forceTopics,
from: params.from,
to: params.to,
userId,
},
TOPIC_PAGE_SIZE,
),
);
const ids = topicBatch.ids;
if (!ids.length) {
continue;
}
const cursor = 'cursor' in topicBatch ? topicBatch.cursor : undefined;
// TODO: follow the new pattern of process-topic
// remove the batch sequential, replace it with context.invoke(...) pattern
await forEachBatchSequential(ids, TOPIC_BATCH_SIZE, async (topicIds, batchIndex) => {
// NOTICE: We trigger via QStash instead of context.invoke because invoke only swaps the last path
// segment with the workflowId. If we invoked directly from /process-user-topics, child workflow
// URLs would inherit that base and lose the desired /process-topics/workflows prefix.
await context.run(
`memory:user-memory:extract:users:${userId}:process-topics-batch:${batchIndex}`,
() =>
MemoryExtractionWorkflowService.triggerProcessTopics(
userId,
{
...buildWorkflowPayloadInput(params),
topicCursor: undefined,
topicIds,
userId,
userIds: [userId],
},
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
});
if (!topicsFromPayload && cursor) {
await context.run(
`memory:user-memory:extract:users:${userId}:topics:${cursor.id}:schedule-next-batch`,
() => {
// NOTICE: Upstash Workflow only supports serializable data into plain JSON,
// this causes the Date object to be converted into string when passed as parameter from
// context to child workflow. So we need to convert it back to Date object here.
const createdAt = new Date(cursor.createdAt);
if (Number.isNaN(createdAt.getTime())) {
throw new Error('Invalid cursor date when scheduling next topic page');
}
return scheduleNextPage(userId, createdAt, cursor.id);
},
);
}
}
return { processedUsers: params.userIds.length };
};
@@ -1,100 +0,0 @@
import { type WorkflowContext } from '@upstash/workflow';
import { chunk } from 'es-toolkit/compat';
import { AsyncTaskModel } from '@/database/models/asyncTask';
import { getServerDB } from '@/database/server';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
import {
buildWorkflowPayloadInput,
MemoryExtractionExecutor,
MemoryExtractionWorkflowService,
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
const USER_PAGE_SIZE = 50;
const USER_BATCH_SIZE = 10;
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
export const processUsersHandler = async (
context: WorkflowContext<MemoryExtractionPayloadInput>,
) => {
const params = normalizeMemoryExtractionPayload(context.requestPayload || {});
if (params.sources.length === 0) {
return { message: 'No sources provided, skip memory extraction.' };
}
if (params.asyncTaskId && params.userIds[0]) {
// NOTICE: Cooperative cascading cancellation for the workflow tree.
// If root task has cancelRequestedAt, this stage stops scheduling child workflows.
const cancelled = await context.run('memory:user-memory:extract:cancel-check:root', () =>
getServerDB().then((db) =>
new AsyncTaskModel(db, params.userIds[0]!).isUserMemoryExtractionCancellationRequested(
params.asyncTaskId!,
),
),
);
if (cancelled) {
return { message: 'Memory extraction task cancellation requested, skip processing users.' };
}
}
const executor = await MemoryExtractionExecutor.create();
// NOTICE: Upstash Workflow only supports serializable data into plain JSON,
// this causes the Date object to be converted into string when passed as parameter from
// context to child workflow. So we need to convert it back to Date object here.
const userCursor = params.userCursor
? { createdAt: new Date(params.userCursor.createdAt), id: params.userCursor.id }
: undefined;
const userBatch = await context.run('memory:user-memory:extract:get-users', () =>
params.userIds.length > 0
? { ids: params.userIds }
: executor.getUsers(USER_PAGE_SIZE, userCursor),
);
const ids = userBatch.ids;
if (ids.length === 0) {
return { message: 'No users to process for memory extraction.' };
}
const cursor = 'cursor' in userBatch ? userBatch.cursor : undefined;
const batches = chunk(ids, USER_BATCH_SIZE);
await Promise.all(
batches.map((userIds) =>
context.run(`memory:user-memory:extract:users:process-topic-batches`, () =>
MemoryExtractionWorkflowService.triggerProcessUserTopics(
{
...buildWorkflowPayloadInput(params),
topicCursor: undefined,
userId: userIds[0],
userIds,
},
{ extraHeaders: upstashWorkflowExtraHeaders },
),
),
),
);
if (params.userIds.length === 0 && cursor) {
await context.run('memory:user-memory:extract:users:schedule-next-user-batch', () =>
MemoryExtractionWorkflowService.triggerProcessUsers(
{
...buildWorkflowPayloadInput({
...params,
userCursor: { createdAt: cursor.createdAt.toISOString(), id: cursor.id },
}),
},
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
}
return {
batches: batches.length,
nextCursor: cursor ? cursor.id : null,
processedUsers: ids.length,
};
};
@@ -0,0 +1,194 @@
import { SpanStatusCode } from '@lobechat/observability-otel/api';
import {
buildUpstashWorkflowAttributes,
tracer as upstashWorkflowTracer,
} from '@lobechat/observability-otel/modules/upstash-workflow';
import { LayersEnum, MemorySourceType } from '@lobechat/types';
import { type WorkflowContext } from '@upstash/workflow';
import { WorkflowAbort } from '@upstash/workflow';
import { AsyncTaskModel } from '@/database/models/asyncTask';
import { getServerDB } from '@/database/server';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
import {
MemoryExtractionExecutor,
MemoryExtractionWorkflowService,
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
import { extractTopicWorkflow } from './extractTopic';
const TOPIC_PAGE_SIZE = 200;
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
const CEPA_LAYERS: LayersEnum[] = [
LayersEnum.Context,
LayersEnum.Experience,
LayersEnum.Preference,
LayersEnum.Activity,
];
const IDENTITY_LAYERS: LayersEnum[] = [LayersEnum.Identity];
/**
* L3: Process topic extraction for ONE user.
*
* - Load user's eligible topics (respecting forceAll/forceTopics filters) in a single pass.
* - Fan out per-topic via `context.invoke(extractTopicWorkflow)` — Upstash rewrites the URL last
* segment to the workflowId (`extract-topic`), which is mounted at /topics/extract-topic.
* - After topic extraction, schedule a persona update for this user (decoupled via persona pipeline).
*/
export const executeUserHandler = (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
upstashWorkflowTracer.startActiveSpan(
'workflow:memory-user-memory:topics:execute-user',
async (span) => {
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
const userId = payload.userId || payload.userIds[0];
span.setAttributes({
...buildUpstashWorkflowAttributes(context),
'workflow.memory_user_memory.force_all': payload.forceAll,
'workflow.memory_user_memory.force_topics': payload.forceTopics,
'workflow.memory_user_memory.layers': payload.layers.join(','),
'workflow.memory_user_memory.source': payload.sources.join(','),
'workflow.memory_user_memory.user_id': userId,
'workflow.name': 'memory-user-memory:topics:execute-user',
});
if (!userId) {
span.setStatus({ code: SpanStatusCode.OK });
return { message: 'Missing userId for execute-user workflow.' };
}
if (!payload.sources.includes(MemorySourceType.ChatTopic)) {
span.setStatus({ code: SpanStatusCode.OK });
return { message: 'Source not supported in execute-user workflow.' };
}
try {
// Early cancel check before any DB work.
if (payload.asyncTaskId) {
const cancelled = await context.run(
`memory:topics:execute-user:${userId}:cancel-check:before`,
() =>
getServerDB().then((db) =>
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
payload.asyncTaskId!,
),
),
);
if (cancelled) {
span.setStatus({ code: SpanStatusCode.OK });
return { cancelled: true, message: 'Cancelled before topic listing.' };
}
}
const executor = await MemoryExtractionExecutor.create();
// Either use explicitly-provided topicIds (filtered for ownership) or list all eligible topics.
const topicIds = await context.run(
`memory:topics:execute-user:${userId}:list-topics`,
() =>
payload.topicIds.length > 0
? executor.filterTopicIdsForUser(userId, payload.topicIds)
: executor
.getTopicsForUser(
{
forceAll: payload.forceAll,
forceTopics: payload.forceTopics,
from: payload.from,
to: payload.to,
userId,
},
TOPIC_PAGE_SIZE,
)
.then((res) => res.ids),
);
if (!topicIds.length) {
span.setStatus({ code: SpanStatusCode.OK });
return {
message: 'No eligible topics for user.',
processedTopics: 0,
userId,
};
}
// Cancel check before fan-out so cancelled tasks stop at the earliest safe boundary.
if (payload.asyncTaskId) {
const cancelled = await context.run(
`memory:topics:execute-user:${userId}:cancel-check:fanout`,
() =>
getServerDB().then((db) =>
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
payload.asyncTaskId!,
),
),
);
if (cancelled) {
span.setStatus({ code: SpanStatusCode.OK });
return { cancelled: true, message: 'Cancelled before topic fan-out.' };
}
}
// Fan out per topic via context.invoke. Each invocation runs as a standalone extract-topic workflow.
await Promise.all(
topicIds.map((topicId, index) =>
context.invoke(`memory:topics:execute-user:${userId}:invoke:${topicId}:${index}`, {
body: {
asyncTaskId: payload.asyncTaskId,
baseUrl: payload.baseUrl,
forceAll: payload.forceAll,
forceTopics: payload.forceTopics,
layers: payload.layers.length
? payload.layers
: [...CEPA_LAYERS, ...IDENTITY_LAYERS],
sources: payload.sources,
topicIds: [topicId],
userId,
userIds: [userId],
userInitiated: payload.userInitiated,
},
// CEPA (4 layers) + identity (1, sequential) → 5. Parallelism matches layer count.
flowControl: {
key: `memory-user-memory.topics.extract-topic.user.${userId}.topic.${topicId}`,
parallelism: 5,
},
headers: upstashWorkflowExtraHeaders,
workflow: extractTopicWorkflow,
}),
),
);
// Decoupled persona update — no longer tail-coupled to topic chain. Fires even when
// this user had no new topics (handled by the persona pipeline's own eligibility logic).
await context.run(`memory:topics:execute-user:${userId}:trigger-persona`, async () => {
await MemoryExtractionWorkflowService.triggerPersonaExecuteUser(userId, payload.baseUrl, {
extraHeaders: upstashWorkflowExtraHeaders,
});
});
span.setStatus({ code: SpanStatusCode.OK });
return {
processedTopics: topicIds.length,
userId,
};
} catch (error) {
if (error instanceof WorkflowAbort) {
console.warn('workflow aborted:', error.message);
throw error;
}
span.recordException(error as Error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: error instanceof Error ? error.message : 'execute-user workflow failed',
});
throw error;
} finally {
span.end();
}
},
);
@@ -17,7 +17,7 @@ import {
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
import { createWorkflowQstashClient } from '../qstashClient';
import { createWorkflowQstashClient } from '../../qstashClient';
const CEPA_LAYERS: LayersEnum[] = [
LayersEnum.Context,
@@ -28,9 +28,9 @@ const CEPA_LAYERS: LayersEnum[] = [
const IDENTITY_LAYERS: LayersEnum[] = [LayersEnum.Identity];
const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
const extractTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
upstashWorkflowTracer.startActiveSpan(
'workflow:memory-user-memory:process-topic',
'workflow:memory-user-memory:topics:extract-topic',
async (span) => {
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
@@ -40,7 +40,7 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
'workflow.memory_user_memory.source': payload.sources.join(','),
'workflow.memory_user_memory.topic_id': payload.topicIds[0],
'workflow.memory_user_memory.user_id': payload.userIds[0],
'workflow.name': 'memory-user-memory:process-topic',
'workflow.name': 'memory-user-memory:topics:extract-topic',
});
const topicId = payload.topicIds[0];
@@ -48,12 +48,12 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
if (!userId || !topicId) {
span.setStatus({ code: SpanStatusCode.OK });
return { message: 'Missing userId or topicId for topic workflow.' };
return { message: 'Missing userId or topicId for extract-topic workflow.' };
}
if (!payload.sources.includes(MemorySourceType.ChatTopic)) {
span.setStatus({ code: SpanStatusCode.OK });
return { message: 'Source not supported in topic workflow.' };
return { message: 'Source not supported in extract-topic workflow.' };
}
const executor = await MemoryExtractionExecutor.create();
@@ -63,7 +63,7 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
// NOTICE: Cooperative cascading cancellation for the workflow tree.
// Check before CEPA extraction so cancelled tasks stop at the earliest safe boundary.
const cancelled = await context.run(
`memory:user-memory:extract:users:${userId}:topics:${topicId}:cancel-check:before`,
`memory:topics:extract-topic:${userId}:${topicId}:cancel-check:before`,
() =>
getServerDB().then((db) =>
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
@@ -83,21 +83,19 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
layers = payload.layers.filter((layer) => CEPA_LAYERS.includes(layer));
}
await context.run(
`memory:user-memory:extract:users:${userId}:topics:${topicId}:cepa`,
() =>
executor.extractTopic({
asyncTaskId: payload.asyncTaskId,
forceAll: payload.forceAll,
forceTopics: payload.forceTopics,
from: payload.from,
layers,
source: MemorySourceType.ChatTopic,
to: payload.to,
topicId,
userId,
userInitiated: payload.userInitiated,
}),
await context.run(`memory:topics:extract-topic:${userId}:${topicId}:cepa`, () =>
executor.extractTopic({
asyncTaskId: payload.asyncTaskId,
forceAll: payload.forceAll,
forceTopics: payload.forceTopics,
from: payload.from,
layers,
source: MemorySourceType.ChatTopic,
to: payload.to,
topicId,
userId,
userInitiated: payload.userInitiated,
}),
);
}
{
@@ -105,7 +103,7 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
// NOTICE: Cooperative cascading cancellation for the workflow tree.
// Re-check before identity extraction to avoid running sequential identity step after cancel.
const cancelled = await context.run(
`memory:user-memory:extract:users:${userId}:topics:${topicId}:cancel-check:identity`,
`memory:topics:extract-topic:${userId}:${topicId}:cancel-check:identity`,
() =>
getServerDB().then((db) =>
new AsyncTaskModel(db, userId).isUserMemoryExtractionCancellationRequested(
@@ -126,21 +124,19 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
layers = payload.layers.filter((layer) => IDENTITY_LAYERS.includes(layer));
}
await context.run(
`memory:user-memory:extract:users:${userId}:topics:${topicId}:identity`,
() =>
executor.extractTopic({
asyncTaskId: payload.asyncTaskId,
forceAll: payload.forceAll,
forceTopics: payload.forceTopics,
from: payload.from,
layers,
source: MemorySourceType.ChatTopic,
to: payload.to,
topicId,
userId,
userInitiated: payload.userInitiated,
}),
await context.run(`memory:topics:extract-topic:${userId}:${topicId}:identity`, () =>
executor.extractTopic({
asyncTaskId: payload.asyncTaskId,
forceAll: payload.forceAll,
forceTopics: payload.forceTopics,
from: payload.from,
layers,
source: MemorySourceType.ChatTopic,
to: payload.to,
topicId,
userId,
userInitiated: payload.userInitiated,
}),
);
}
@@ -158,12 +154,11 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
span.recordException(error as Error);
span.setStatus({
code: SpanStatusCode.ERROR,
message: errorMessageFrom(error) || 'process-topic workflow failed',
message: errorMessageFrom(error) || 'extract-topic workflow failed',
});
// Avoid infinite retries on non-retry-able errors
throw new WorkflowNonRetryableError(
errorMessageFrom(error) || 'process-topic workflow failed',
errorMessageFrom(error) || 'extract-topic workflow failed',
);
} finally {
span.end();
@@ -171,8 +166,8 @@ const processTopicRoute = async (context: WorkflowContext<MemoryExtractionPayloa
},
);
export const processTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput, unknown>(
processTopicRoute,
export const extractTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput, unknown>(
extractTopicRoute,
{
failureFunction: async ({ context, failStatus, failResponse }) => {
try {
@@ -189,7 +184,7 @@ export const processTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput,
await asyncTaskModel.incrementUserMemoryExtractionProgress(payload.asyncTaskId);
console.error(
`[process-topic][failureFunction] marking async task as failed for user ${userId}, topic ${topicId}`,
`[extract-topic][failureFunction] marking async task as failed for user ${userId}, topic ${topicId}`,
{
failResponse,
failStatus,
@@ -198,7 +193,7 @@ export const processTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput,
return 'async-task-updated';
} catch (error) {
console.error('[process-topic][failureFunction] failed to record async task error', error);
console.error('[extract-topic][failureFunction] failed to record async task error', error);
return 'async-task-update-failed';
}
},
@@ -206,4 +201,6 @@ export const processTopicWorkflow = createWorkflow<MemoryExtractionPayloadInput,
},
);
processTopicWorkflow.workflowId = 'process-topic';
// NOTICE: workflowId must match the last path segment of WORKFLOW_PATHS.topicsExtractTopic,
// because Upstash `context.invoke` rewrites the URL last segment to the target workflowId.
extractTopicWorkflow.workflowId = 'extract-topic';
@@ -0,0 +1,177 @@
import { SpanStatusCode } from '@lobechat/observability-otel/api';
import {
buildUpstashWorkflowAttributes,
tracer as upstashWorkflowTracer,
} from '@lobechat/observability-otel/modules/upstash-workflow';
import { type WorkflowContext } from '@upstash/workflow';
import { chunk } from 'es-toolkit/compat';
import { AsyncTaskModel } from '@/database/models/asyncTask';
import { getServerDB } from '@/database/server';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
import {
buildWorkflowPayloadInput,
MemoryExtractionExecutor,
MemoryExtractionWorkflowService,
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
const USER_PAGE_SIZE = 50;
const CHUNK_SIZE = 20;
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
/**
* L2: Paginate users + fan-out.
*
* - If `userIds` provided (fan-out chunk from a prior call), trigger execute-user for each and stop.
* - Else paginate via cursor (PAGE_SIZE=50); if batch > CHUNK_SIZE=20, split into chunks and
* recursively re-trigger this L2 with each chunk. Otherwise trigger execute-user directly.
* - Always schedule the next page if a cursor exists.
*/
export const paginateUsersHandler = (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
upstashWorkflowTracer.startActiveSpan(
'workflow:memory-user-memory:topics:paginate-users',
async (span) => {
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
span.setAttributes({
...buildUpstashWorkflowAttributes(context),
'workflow.memory_user_memory.payload_user_count': payload.userIds.length,
'workflow.name': 'memory-user-memory:topics:paginate-users',
});
try {
// Fan-out chunk path: specific userIds were provided by an upstream paginate call.
if (payload.userIds.length > 0 && !payload.userCursor) {
await Promise.all(
payload.userIds.map((userId) =>
context.run(`memory:topics:paginate-users:fanout:execute:${userId}`, () =>
MemoryExtractionWorkflowService.triggerTopicsExecuteUser(
userId,
buildWorkflowPayloadInput({
...payload,
topicCursor: undefined,
topicIds: [],
userCursor: undefined,
userId,
userIds: [userId],
}),
{ extraHeaders: upstashWorkflowExtraHeaders },
),
),
),
);
span.setStatus({ code: SpanStatusCode.OK });
return { fannedOut: payload.userIds.length };
}
// Cursor pagination path.
const userCursor = payload.userCursor
? { createdAt: new Date(payload.userCursor.createdAt), id: payload.userCursor.id }
: undefined;
if (userCursor && Number.isNaN(userCursor.createdAt.getTime())) {
throw new Error('Invalid userCursor.createdAt for topics paginate-users');
}
const executor = await MemoryExtractionExecutor.create();
// Root-level cancel check (only when asyncTaskId + at least one userId known).
if (payload.asyncTaskId && payload.userIds[0]) {
const cancelled = await context.run(
'memory:topics:paginate-users:cancel-check:root',
() =>
getServerDB().then((db) =>
new AsyncTaskModel(
db,
payload.userIds[0]!,
).isUserMemoryExtractionCancellationRequested(payload.asyncTaskId!),
),
);
if (cancelled) {
span.setStatus({ code: SpanStatusCode.OK });
return { cancelled: true, message: 'Cancelled at paginate-users root.' };
}
}
const batch = await context.run(
`memory:topics:paginate-users:list:${userCursor?.id || 'root'}`,
() => executor.getUsersForHourlyExtraction(USER_PAGE_SIZE, userCursor),
);
const userIds = batch.ids;
const nextCursor = 'cursor' in batch ? batch.cursor : undefined;
if (userIds.length === 0) {
span.setStatus({ code: SpanStatusCode.OK });
return { message: 'No users in page, pagination complete.' };
}
// Fan-out if batch exceeds CHUNK_SIZE; else trigger execute-user directly.
if (userIds.length > CHUNK_SIZE) {
const chunks = chunk(userIds, CHUNK_SIZE);
await Promise.all(
chunks.map((chunkIds, idx) =>
context.run(`memory:topics:paginate-users:fanout:${idx + 1}/${chunks.length}`, () =>
MemoryExtractionWorkflowService.triggerTopicsPaginateUsers(
buildWorkflowPayloadInput({
...payload,
userCursor: undefined,
userId: chunkIds[0],
userIds: chunkIds,
}),
{ extraHeaders: upstashWorkflowExtraHeaders },
),
),
),
);
} else {
await Promise.all(
userIds.map((userId) =>
context.run(`memory:topics:paginate-users:execute:${userId}`, () =>
MemoryExtractionWorkflowService.triggerTopicsExecuteUser(
userId,
buildWorkflowPayloadInput({
...payload,
topicCursor: undefined,
topicIds: [],
userCursor: undefined,
userId,
userIds: [userId],
}),
{ extraHeaders: upstashWorkflowExtraHeaders },
),
),
),
);
}
// Schedule next page.
if (nextCursor) {
await context.run('memory:topics:paginate-users:schedule-next-page', () =>
MemoryExtractionWorkflowService.triggerTopicsPaginateUsers(
buildWorkflowPayloadInput({
...payload,
userCursor: {
createdAt: nextCursor.createdAt.toISOString(),
id: nextCursor.id,
},
userIds: [],
}),
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
}
span.setStatus({ code: SpanStatusCode.OK });
return {
nextCursor: nextCursor ? nextCursor.id : null,
processedUsers: userIds.length,
};
} finally {
span.end();
}
},
);
@@ -0,0 +1,121 @@
import { SpanStatusCode } from '@lobechat/observability-otel/api';
import {
buildUpstashWorkflowAttributes,
tracer as upstashWorkflowTracer,
} from '@lobechat/observability-otel/modules/upstash-workflow';
import { MemorySourceType } from '@lobechat/types';
import { type WorkflowContext } from '@upstash/workflow';
import { parseMemoryExtractionConfig } from '@/server/globalConfig/parseMemoryExtractionConfig';
import { type MemoryExtractionPayloadInput } from '@/server/services/memory/userMemory/extract';
import {
buildWorkflowPayloadInput,
MemoryExtractionExecutor,
MemoryExtractionWorkflowService,
normalizeMemoryExtractionPayload,
} from '@/server/services/memory/userMemory/extract';
const { upstashWorkflowExtraHeaders } = parseMemoryExtractionConfig();
/**
* L1: Entry for the topics extraction pipeline.
*
* - If `userIds` in payload, skip the eligibility query and fan out directly via L2.
* - Else materialise the full eligible user list in one step so dry-run can report the exact total.
* - When not dry-run: trigger L2 (paginate-users) to walk all users via cursor pagination.
*/
export const processUsersHandler = (context: WorkflowContext<MemoryExtractionPayloadInput>) =>
upstashWorkflowTracer.startActiveSpan(
'workflow:memory-user-memory:topics:process-users',
async (span) => {
const payload = normalizeMemoryExtractionPayload(context.requestPayload || {});
const dryRun = !!(context.requestPayload as { dryRun?: boolean } | null)?.dryRun;
span.setAttributes({
...buildUpstashWorkflowAttributes(context),
'workflow.memory_user_memory.dry_run': dryRun,
'workflow.memory_user_memory.payload_user_count': payload.userIds.length,
'workflow.name': 'memory-user-memory:topics:process-users',
});
// Ensure source defaults to ChatTopic when caller omitted it — topics pipeline is chat-topic-only.
const sources = payload.sources.length ? payload.sources : [MemorySourceType.ChatTopic];
if (!sources.includes(MemorySourceType.ChatTopic)) {
span.setStatus({ code: SpanStatusCode.OK });
return {
message: 'No supported sources requested, skip topics process-users.',
success: true,
};
}
// Explicit target userIds path: skip the eligibility query and go straight to fan-out via L2.
if (payload.userIds.length > 0) {
if (dryRun) {
span.setStatus({ code: SpanStatusCode.OK });
return {
dryRun: true,
message: `[DryRun] Would fan out ${payload.userIds.length} pre-specified users.`,
success: true,
targetUsers: payload.userIds.length,
};
}
await context.run('memory:topics:process-users:trigger-paginate-fanout', () =>
MemoryExtractionWorkflowService.triggerTopicsPaginateUsers(
buildWorkflowPayloadInput({ ...payload, sources, userCursor: undefined }),
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
span.setStatus({ code: SpanStatusCode.OK });
return { success: true, triggeredFanout: payload.userIds.length };
}
// Count-only query: cheap COUNT(*) with the hourly-extraction filter (memory-enabled + has
// at least one user message). Gives an exact total for dry-run.
const executor = await MemoryExtractionExecutor.create();
const totalEligible = await context.run(
'memory:topics:process-users:count-eligible-users',
() => executor.countUsersForHourlyExtraction(),
);
if (totalEligible === 0) {
span.setStatus({ code: SpanStatusCode.OK });
return {
message: 'No eligible users for topics extraction.',
success: true,
totalEligible: 0,
};
}
if (dryRun) {
span.setStatus({ code: SpanStatusCode.OK });
return {
dryRun: true,
message: `[DryRun] Would process ${totalEligible} users.`,
success: true,
totalEligible,
};
}
// Trigger L2 to walk all users via cursor pagination.
await context.run('memory:topics:process-users:trigger-paginate', () =>
MemoryExtractionWorkflowService.triggerTopicsPaginateUsers(
buildWorkflowPayloadInput({
...payload,
sources,
userCursor: undefined,
userIds: [],
}),
{ extraHeaders: upstashWorkflowExtraHeaders },
),
);
span.setStatus({ code: SpanStatusCode.OK });
return {
message: `Triggered paginate-users for ${totalEligible} eligible users.`,
success: true,
totalEligible,
};
},
);