Skip to content

Commit ff78cf7

Browse files
committed
Separate input from output
1 parent 37411c1 commit ff78cf7

File tree

3 files changed

+20
-17
lines changed

3 files changed

+20
-17
lines changed

.changeset/famous-moles-shave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@soundxyz/redis-pubsub": major
3+
---
4+
5+
Separate input from output of inputSchema and outputSchema

src/index.ts

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export const EventCodes = {
5757
SUBSCRIPTION_ABORTED: "SUBSCRIPTION_ABORTED",
5858
} as const;
5959

60-
export type EventCodes = typeof EventCodes[keyof typeof EventCodes];
60+
export type EventCodes = (typeof EventCodes)[keyof typeof EventCodes];
6161

6262
export function RedisPubSub({
6363
publisher,
@@ -270,7 +270,7 @@ export function RedisPubSub({
270270
);
271271
}
272272

273-
function createChannel<Input, Output>({
273+
function createChannel<PublishInput, ChannelData, SubscriberData>({
274274
name,
275275
isLazy = true,
276276
...schemas
@@ -282,12 +282,12 @@ export function RedisPubSub({
282282
isLazy?: boolean;
283283
} & (
284284
| {
285-
inputSchema: ZodSchema<Input, ZodTypeDef, Input>;
286-
outputSchema: ZodSchema<Output, ZodTypeDef, Input>;
285+
inputSchema: ZodSchema<ChannelData, ZodTypeDef, PublishInput>;
286+
outputSchema: ZodSchema<SubscriberData, ZodTypeDef, ChannelData>;
287287
schema?: never;
288288
}
289289
| {
290-
schema: ZodSchema<Output, ZodTypeDef, Input>;
290+
schema: ZodSchema<SubscriberData, ZodTypeDef, PublishInput>;
291291
inputSchema?: never;
292292
outputSchema?: never;
293293
}
@@ -315,8 +315,6 @@ export function RedisPubSub({
315315
unsubscribe,
316316
publish,
317317
unsubscribeAll,
318-
inputSchema,
319-
outputSchema,
320318
};
321319

322320
function getSubscriptionValue({
@@ -339,23 +337,23 @@ export function RedisPubSub({
339337
});
340338
}
341339

342-
function subscribe<FilteredValue extends Output>(subscribeArguments: {
340+
function subscribe<FilteredValue extends SubscriberData>(subscribeArguments: {
343341
abortSignal?: AbortSignal;
344-
filter: (value: Output) => value is FilteredValue;
342+
filter: (value: SubscriberData) => value is FilteredValue;
345343
identifier?: string | number;
346344
}): AsyncGenerator<FilteredValue, void, unknown>;
347345
function subscribe(subscribeArguments?: {
348346
abortSignal?: AbortSignal;
349-
filter?: (value: Output) => unknown;
347+
filter?: (value: SubscriberData) => unknown;
350348
identifier?: string | number;
351-
}): AsyncGenerator<Output, void, unknown>;
349+
}): AsyncGenerator<SubscriberData, void, unknown>;
352350
async function* subscribe({
353351
abortSignal,
354352
filter,
355353
identifier,
356354
}: {
357355
abortSignal?: AbortSignal;
358-
filter?: (value: Output) => unknown;
356+
filter?: (value: SubscriberData) => unknown;
359357
identifier?: string | number;
360358
} = {}) {
361359
const channel = identifier ? name + identifier : name;
@@ -428,7 +426,7 @@ export function RedisPubSub({
428426
while (true) {
429427
await dataPromise.current.promise;
430428

431-
for (const value of dataPromise.current.values as Output[]) {
429+
for (const value of dataPromise.current.values as SubscriberData[]) {
432430
if (filter && !filter(value)) {
433431
if (enabledLogEvents?.SUBSCRIPTION_MESSAGE_FILTERED_OUT) {
434432
logMessage("SUBSCRIPTION_MESSAGE_FILTERED_OUT", {
@@ -498,15 +496,15 @@ export function RedisPubSub({
498496

499497
async function publish(
500498
...values: [
501-
{ value: Input; identifier?: string | number },
502-
...{ value: Input; identifier?: string | number }[]
499+
{ value: PublishInput; identifier?: string | number },
500+
...{ value: PublishInput; identifier?: string | number }[],
503501
]
504502
) {
505503
await Promise.all(
506504
values.map(async ({ value, identifier }) => {
507505
const tracing = enabledLogEvents?.PUBLISH_MESSAGE_EXECUTION_TIME ? getTracing() : null;
508506

509-
let parsedValue: Input | Output;
507+
let parsedValue: ChannelData | SubscriberData;
510508

511509
try {
512510
parsedValue = await inputSchema.parseAsync(value);

test/helpers.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type LoggedEvents, RedisPubSub,type RedisPubSubOptions } from "../src";
1+
import { type LoggedEvents, RedisPubSub, type RedisPubSubOptions } from "../src";
22
import Pino from "pino";
33
import Redis from "ioredis";
44

0 commit comments

Comments
 (0)