Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(threads): handle custom data #1428

Merged
merged 7 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2754,20 +2754,25 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
*
* @returns {{ threads: Thread<StreamChatGenerics>[], next: string }} Returns the list of threads and the next cursor.
*/
async queryThreads(options?: QueryThreadsOptions) {
const opts = {
async queryThreads(options: QueryThreadsOptions = {}) {
const optionsWithDefaults = {
limit: 10,
participant_limit: 10,
reply_limit: 3,
watch: true,
...options,
};

const res = await this.post<QueryThreadsAPIResponse<StreamChatGenerics>>(this.baseURL + `/threads`, opts);
const response = await this.post<QueryThreadsAPIResponse<StreamChatGenerics>>(
`${this.baseURL}/threads`,
optionsWithDefaults,
);

return {
threads: res.threads.map((thread) => new Thread({ client: this, threadData: thread })),
next: res.next,
threads: response.threads.map(
(thread) => new Thread<StreamChatGenerics>({ client: this, threadData: thread }),
),
next: response.next,
};
}

Expand All @@ -2784,22 +2789,22 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
*/
async getThread(messageId: string, options: GetThreadOptions = {}) {
if (!messageId) {
throw Error('Please specify the message id when calling partialUpdateThread');
throw Error('Please specify the messageId when calling getThread');
}

const opts = {
const optionsWithDefaults = {
participant_limit: 100,
reply_limit: 3,
watch: true,
...options,
};

const res = await this.get<GetThreadAPIResponse<StreamChatGenerics>>(
this.baseURL + `/threads/${encodeURIComponent(messageId)}`,
opts,
const response = await this.get<GetThreadAPIResponse<StreamChatGenerics>>(
`${this.baseURL}/threads/${encodeURIComponent(messageId)}`,
optionsWithDefaults,
);

return new Thread<StreamChatGenerics>({ client: this, threadData: res.thread });
return new Thread<StreamChatGenerics>({ client: this, threadData: response.thread });
}

/**
Expand Down Expand Up @@ -2827,6 +2832,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
'reply_count',
'participants',
'channel',
'custom',
];

for (const key in { ...partialThreadObject.set, ...partialThreadObject.unset }) {
Expand All @@ -2838,7 +2844,7 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
}

return await this.patch<GetThreadAPIResponse<StreamChatGenerics>>(
this.baseURL + `/threads/${encodeURIComponent(messageId)}`,
`${this.baseURL}/threads/${encodeURIComponent(messageId)}`,
partialThreadObject,
);
}
Expand Down
1 change: 1 addition & 0 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const EVENT_MAP = {
'reaction.deleted': true,
'reaction.new': true,
'reaction.updated': true,
'thread.updated': true,
'typing.start': true,
'typing.stop': true,
'user.banned': true,
Expand Down
67 changes: 65 additions & 2 deletions src/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
MessageResponse,
ReadResponse,
ThreadResponse,
ThreadResponseCustomData,
UserResponse,
} from './types';
import { addToMessageList, findIndexInSortedArray, formatMessage, throttle } from './utils';
Expand All @@ -27,6 +28,7 @@ export type ThreadState<SCG extends ExtendableGenerics = DefaultGenerics> = {
active: boolean;
channel: Channel<SCG>;
createdAt: Date;
custom: ThreadResponseCustomData;
deletedAt: Date | null;
isLoading: boolean;
isStateStale: boolean;
Expand All @@ -40,6 +42,7 @@ export type ThreadState<SCG extends ExtendableGenerics = DefaultGenerics> = {
read: ThreadReadState;
replies: Array<FormatMessageResponse<SCG>>;
replyCount: number;
title: string;
updatedAt: Date | null;
};

Expand All @@ -65,6 +68,42 @@ export type ThreadReadState<SCG extends ExtendableGenerics = DefaultGenerics> =
const DEFAULT_PAGE_LIMIT = 50;
const DEFAULT_SORT: { created_at: AscDesc }[] = [{ created_at: -1 }];
const MARK_AS_READ_THROTTLE_TIMEOUT = 1000;
const THREAD_RESERVED_KEYS = [
'channel',
'channel_cid',
'created_at',
'created_by_user_id',
'parent_message_id',
'title',
'updated_at',
'latest_replies',
'active_participant_count',
'deleted_at',
'last_message_at',
'participant_count',
'reply_count',
'read',
'thread_participants',
'created_by',
'parent_message',
] as const;

// TODO: remove this once we move to API v2
const constructCustomDataObject = <SCG extends ExtendableGenerics>(threadData: ThreadResponse<SCG>) => {
const custom: ThreadResponseCustomData = {};

for (const key in threadData) {
if (THREAD_RESERVED_KEYS.includes(key as keyof ThreadResponse<SCG>)) {
continue;
}

const customKey = key as keyof ThreadResponseCustomData;

custom[customKey] = threadData[customKey];
}

return custom;
};

export class Thread<SCG extends ExtendableGenerics = DefaultGenerics> {
public readonly state: StateStore<ThreadState<SCG>>;
Expand All @@ -87,12 +126,15 @@ export class Thread<SCG extends ExtendableGenerics = DefaultGenerics> {
: [];

this.state = new StateStore<ThreadState<SCG>>({
// local only
active: false,
isLoading: false,
isStateStale: false,
// 99.9% should never change
channel,
createdAt: new Date(threadData.created_at),
// rest
deletedAt: threadData.deleted_at ? new Date(threadData.deleted_at) : null,
isLoading: false,
isStateStale: false,
pagination: repliesPaginationFromInitialThread(threadData),
parentMessage: formatMessage(threadData.parent_message),
participants: threadData.thread_participants,
Expand All @@ -102,6 +144,8 @@ export class Thread<SCG extends ExtendableGenerics = DefaultGenerics> {
replies: threadData.latest_replies.map(formatMessage),
replyCount: threadData.reply_count ?? 0,
updatedAt: threadData.updated_at ? new Date(threadData.updated_at) : null,
title: threadData.title,
custom: constructCustomDataObject<SCG>(threadData),
});

this.id = threadData.parent_message_id;
Expand Down Expand Up @@ -186,6 +230,7 @@ export class Thread<SCG extends ExtendableGenerics = DefaultGenerics> {
return;
}

this.unsubscribeFunctions.add(this.subscribeThreadUpdated());
this.unsubscribeFunctions.add(this.subscribeMarkActiveThreadRead());
this.unsubscribeFunctions.add(this.subscribeReloadActiveStaleThread());
this.unsubscribeFunctions.add(this.subscribeMarkThreadStale());
Expand All @@ -195,6 +240,24 @@ export class Thread<SCG extends ExtendableGenerics = DefaultGenerics> {
this.unsubscribeFunctions.add(this.subscribeMessageUpdated());
};

private subscribeThreadUpdated = () => {
return this.client.on('thread.updated', (event) => {
if (!event.thread || event.thread.parent_message_id !== this.id) {
return;
}

const threadData = event.thread;

this.state.partialNext({
title: threadData.title,
updatedAt: new Date(threadData.updated_at),
deletedAt: threadData.deleted_at ? new Date(threadData.deleted_at) : null,
// TODO: use threadData.custom once we move to API v2
custom: constructCustomDataObject<SCG>(threadData),
});
}).unsubscribe;
};

private subscribeMarkActiveThreadRead = () => {
return this.state.subscribeWithSelector(
(nextValue) => ({
Expand Down
8 changes: 7 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,10 @@ export type GetMessageAPIResponse<
StreamChatGenerics extends ExtendableGenerics = DefaultGenerics
> = SendMessageAPIResponse<StreamChatGenerics>;

export interface ThreadResponse<SCG extends ExtendableGenerics = DefaultGenerics> {
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface ThreadResponseCustomData {}

export interface ThreadResponse<SCG extends ExtendableGenerics = DefaultGenerics> extends ThreadResponseCustomData {
// FIXME: according to OpenAPI, `channel` could be undefined but since cid is provided I'll asume that it's wrong
channel: ChannelResponse<SCG>;
channel_cid: string;
Expand All @@ -531,6 +534,7 @@ export interface ThreadResponse<SCG extends ExtendableGenerics = DefaultGenerics
parent_message_id: string;
title: string;
updated_at: string;
active_participant_count?: number;
created_by?: UserResponse<SCG>;
deleted_at?: string;
last_message_at?: string;
Expand All @@ -547,6 +551,8 @@ export interface ThreadResponse<SCG extends ExtendableGenerics = DefaultGenerics
user?: UserResponse<SCG>;
user_id?: string;
}>;
// TODO: when moving to API v2 we should do this instead
// custom: ThreadResponseCustomData;
}

// TODO: Figure out a way to strongly type set and unset.
Expand Down
62 changes: 62 additions & 0 deletions test/unit/threads.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,67 @@ describe('Threads 2.0', () => {
thread.unregisterSubscriptions();
});

describe('Event: thread.updated', () => {
it('ignores incoming event if the data do not match (parent_message_id)', () => {
const thread = createTestThread({ title: 'A' });
thread.registerSubscriptions();

const stateBefore = thread.state.getLatestValue();
expect(stateBefore.title).to.eq('A');

client.dispatchEvent({
type: 'thread.updated',
thread: generateThreadResponse(channelResponse, generateMsg(), { title: 'B' }),
});

const stateAfter = thread.state.getLatestValue();
expect(stateAfter.title).to.eq('A');
});

it('correctly updates thread-level properties', () => {
const thread = createTestThread({ title: 'A' });
thread.registerSubscriptions();

const stateBefore = thread.state.getLatestValue();
expect(stateBefore.title).to.eq('A');

client.dispatchEvent({
type: 'thread.updated',
thread: generateThreadResponse(channelResponse, generateMsg({ id: parentMessageResponse.id }), {
title: 'B',
}),
});

const stateAfter = thread.state.getLatestValue();
expect(stateAfter.title).to.eq('B');
});

it('properly handles custom data', () => {
const customKey1 = uuidv4();
const customKey2 = uuidv4();

const thread = createTestThread({ [customKey1]: 1, [customKey2]: { key: 1 } });
thread.registerSubscriptions();

const stateBefore = thread.state.getLatestValue();

expect(stateBefore.custom).to.have.keys([customKey1, customKey2]);
expect(stateBefore.custom[customKey1]).to.equal(1);

client.dispatchEvent({
type: 'thread.updated',
thread: generateThreadResponse(channelResponse, generateMsg({ id: parentMessageResponse.id }), {
[customKey1]: 2,
}),
});

const stateAfter = thread.state.getLatestValue();

expect(stateAfter.custom).to.not.have.property(customKey2);
expect(stateAfter.custom[customKey1]).to.equal(2);
});
});

describe('Event: user.watching.stop', () => {
it('ignores incoming event if the data do not match (channel or user.id)', () => {
const thread = createTestThread();
Expand Down Expand Up @@ -1207,6 +1268,7 @@ describe('Threads 2.0', () => {
await threadManager.reload();
expect(stubbedQueryThreads.calledWithMatch({ limit: 25 })).to.be.true;
});

it('skips reload if there were no updates since the latest reload', async () => {
threadManager.state.partialNext({ ready: true });
await threadManager.reload();
Expand Down
Loading