Skip to content

Commit 2a25d6f

Browse files
committed
Added brain for memory
Added direct caching for cache_miss in Pusher
1 parent 576666b commit 2a25d6f

File tree

12 files changed

+135
-18
lines changed

12 files changed

+135
-18
lines changed

src/brain/brain.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import type * as FN from '@soketi/impl/types';
2+
3+
export abstract class Brain {
4+
abstract get(key: string): Promise<FN.Brain.BrainRecord['value']|null>;
5+
abstract getWithMetadata(key: string): Promise<FN.Brain.BrainRecord|null>;
6+
abstract set(key: string, value: FN.Brain.BrainRecord['value'], ttlSeconds?: number): Promise<void>;
7+
abstract has(key: string): Promise<boolean>;
8+
abstract delete(key: string): Promise<void>;
9+
10+
async cleanup(): Promise<void> {
11+
//
12+
}
13+
}

src/brain/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from './brain';
2+
export * from './local-brain';

src/brain/local-brain.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import type * as FN from '@soketi/impl/types';
2+
import { Brain } from './brain';
3+
4+
export class LocalBrain extends Brain {
5+
memory: Map<string, FN.Brain.BrainRecord> = new Map();
6+
7+
constructor() {
8+
super();
9+
10+
setInterval(() => {
11+
for (let [key, { ttlSeconds, setTime }] of [...this.memory]) {
12+
let currentTime = parseInt((new Date().getTime() / 1000) as unknown as string);
13+
14+
if (ttlSeconds > 0 && (setTime + ttlSeconds) <= currentTime) {
15+
this.memory.delete(key);
16+
}
17+
}
18+
}, 1_000);
19+
}
20+
21+
async get(key: string): Promise<FN.JSON.Value|null> {
22+
return (await this.getWithMetadata(key))?.value ?? null;
23+
}
24+
25+
async getWithMetadata(key: string): Promise<FN.Brain.BrainRecord|null> {
26+
return this.memory.get(key) ?? null;
27+
}
28+
29+
async set(key: string, value: FN.JSON.Value, ttlSeconds = -1): Promise<void> {
30+
this.memory.set(key, {
31+
value,
32+
ttlSeconds,
33+
setTime: parseInt((new Date().getTime() / 1000) as unknown as string),
34+
});
35+
}
36+
37+
async has(key: string): Promise<boolean> {
38+
return Boolean(this.memory.get(key));
39+
}
40+
41+
async delete(key: string): Promise<void> {
42+
this.memory.delete(key);
43+
}
44+
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
export * as Brain from './brain';
12
export * as Gossiper from './gossiper';
23
export * as Pusher from './pusher';
34
export * as Webhooks from './webhooks';

src/pusher/ws/pusher-connections.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type * as FN from '@soketi/impl/types';
2+
import { Brain } from '../../brain';
23
import { Connections as BaseConnections } from '../../ws';
34
import { EncryptedPrivateChannelManager, PresenceChannelManager, PrivateChannelManager, PublicChannelManager } from '../channels';
45
import { PusherConnection, Utils } from '../';
@@ -10,6 +11,7 @@ export class PusherConnections extends BaseConnections implements FN.Pusher.Push
1011
constructor(
1112
protected app: FN.Pusher.PusherApps.App,
1213
protected readonly gossiper: Gossiper,
14+
protected readonly brain: Brain,
1315
) {
1416
super();
1517

@@ -508,16 +510,19 @@ export class PusherConnections extends BaseConnections implements FN.Pusher.Push
508510
}
509511

510512
async sendMissedCacheIfExists(conn: FN.Pusher.PusherWS.PusherConnection, channel: string) {
511-
// TODO: Caching module
512-
/* let cachedEvent = await this.env.APPS.get(
513+
let cachedEvent = await this.brain.get(
513514
`app_${this.app.id}_channel_${channel}_cache_miss`,
514515
);
515516

516517
if (cachedEvent) {
517-
conn.sendJson({ event: 'pusher:cache_miss', channel, data: cachedEvent });
518+
conn.sendJson({
519+
event: 'pusher:cache_miss',
520+
channel,
521+
data: cachedEvent,
522+
});
518523
} else {
519-
// TODO: this.webhooks.sendCacheMissed(channel);
520-
} */
524+
// TODO: Send webhook event.
525+
}
521526
}
522527

523528
async getChannelManagerFor(channel: string): Promise<FN.Pusher.Channels.ChannelManager> {

tests/brain/local-brain.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { LocalBrain } from '../../src/brain';
2+
import { describe, test, expect } from 'vitest';
3+
4+
describe('brain/local-brain', () => {
5+
test('basic storage', async () => {
6+
const brain = new LocalBrain();
7+
8+
await brain.set('test', { test: 'object' });
9+
10+
expect(await brain.has('test')).toBe(true);
11+
expect(await brain.get('test')).toEqual({ test: 'object' });
12+
13+
await brain.delete('test');
14+
15+
expect(await brain.has('test')).toBe(false);
16+
expect(await brain.get('test')).toBe(null);
17+
});
18+
19+
test('basic storage with ttl', async () => {
20+
const brain = new LocalBrain();
21+
22+
await brain.set('test', { test: 'object' }, 1);
23+
24+
await new Promise((resolve) => setTimeout(resolve, 1_100));
25+
26+
expect(await brain.has('test')).toBe(false);
27+
expect(await brain.get('test')).toBe(null);
28+
});
29+
});

tests/pusher/channels/presence.test.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,19 @@ import { PusherConnection, PusherConnections } from '../../../src/pusher/ws';
66
import { Router as WsRouter } from '../../../src/ws';
77
import { describe, test, expect, beforeEach } from 'vitest';
88
import { createHmac } from 'crypto';
9+
import { Brain, LocalBrain } from '../../../src/brain';
910

1011
const pusherUtil = require('pusher/lib/util');
1112
const Pusher = require('pusher');
1213

1314
let apps: TestAppsManager;
1415
let gossiper: NoGossiper;
16+
let brain: Brain;
1517

1618
beforeEach(() => {
1719
apps = new TestAppsManager();
1820
gossiper = new NoGossiper();
21+
brain = new LocalBrain();
1922

2023
AppsRegistry.registerDriver('default', apps);
2124

@@ -36,7 +39,7 @@ beforeEach(() => {
3639
describe('pusher/channels/presence', () => {
3740
test('join and leave', () => new Promise<void>(async (done) => {
3841
const app = await AppsRegistry.getById('app-id') as TestApp;
39-
const conns = new LocalConnections(app, gossiper);
42+
const conns = new LocalConnections(app, gossiper, brain);
4043
const user = {
4144
user_id: '1',
4245
user_info: {
@@ -90,7 +93,7 @@ describe('pusher/channels/presence', () => {
9093

9194
test('connect and disconnect', async () => new Promise<void>(async (done) => {
9295
const app = await AppsRegistry.getById('app-id') as TestApp;
93-
const conns = new LocalConnections(app, gossiper);
96+
const conns = new LocalConnections(app, gossiper, brain);
9497
const user = {
9598
user_id: '1',
9699
user_info: {
@@ -138,7 +141,7 @@ describe('pusher/channels/presence', () => {
138141

139142
test('connect but get unauthorized', async () => new Promise<void>(async (done) => {
140143
const app = await AppsRegistry.getById('app-id') as TestApp;
141-
const conns = new LocalConnections(app, gossiper);
144+
const conns = new LocalConnections(app, gossiper, brain);
142145
const user = {
143146
user_id: '1',
144147
user_info: {

tests/pusher/channels/private-encrypted.test.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,19 @@ import { PusherConnection, PusherConnections } from '../../../src/pusher/ws';
66
import { Router as WsRouter } from '../../../src/ws';
77
import { describe, test, expect, beforeEach } from 'vitest';
88
import { createHmac } from 'crypto';
9+
import { Brain, LocalBrain } from '../../../src/brain';
910

1011
const pusherUtil = require('pusher/lib/util');
1112
const Pusher = require('pusher');
1213

1314
let apps: TestAppsManager;
1415
let gossiper: NoGossiper;
16+
let brain: Brain;
1517

1618
beforeEach(() => {
1719
apps = new TestAppsManager();
1820
gossiper = new NoGossiper();
21+
brain = new LocalBrain();
1922

2023
AppsRegistry.registerDriver('default', apps);
2124
AppsRegistry.initializeApp({}).then((app: TestApp) => {
@@ -35,7 +38,7 @@ beforeEach(() => {
3538
describe('pusher/channels/private-encrypted', () => {
3639
test('join and leave', () => new Promise<void>(async (done) => {
3740
const app = await AppsRegistry.getById('app-id') as TestApp;
38-
const conns = new LocalConnections(app, gossiper);
41+
const conns = new LocalConnections(app, gossiper, brain);
3942

4043
const conn = new PusherConnection('test', {
4144
send: (message) => {
@@ -71,7 +74,7 @@ describe('pusher/channels/private-encrypted', () => {
7174

7275
test('connect and disconnect', async () => new Promise<void>(async (done) => {
7376
const app = await AppsRegistry.getById('app-id') as TestApp;
74-
const conns = new LocalConnections(app, gossiper);
77+
const conns = new LocalConnections(app, gossiper, brain);
7578

7679
WsRouter.onConnectionClosed(async (conn) => {
7780
await conns.unsubscribeFromAllChannels(conn);
@@ -109,7 +112,7 @@ describe('pusher/channels/private-encrypted', () => {
109112

110113
test('connect but get unauthorized', async () => new Promise<void>(async (done) => {
111114
const app = await AppsRegistry.getById('app-id') as TestApp;
112-
const conns = new LocalConnections(app, gossiper);
115+
const conns = new LocalConnections(app, gossiper, brain);
113116

114117
const conn = new PusherConnection('test', {
115118
send: (message) => {

tests/pusher/channels/private.test.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,19 @@ import { PusherConnection, PusherConnections } from '../../../src/pusher/ws';
66
import { Router as WsRouter } from '../../../src/ws';
77
import { describe, test, expect, beforeEach } from 'vitest';
88
import { createHmac } from 'crypto';
9+
import { Brain, LocalBrain } from '../../../src/brain';
910

1011
const pusherUtil = require('pusher/lib/util');
1112
const Pusher = require('pusher');
1213

1314
let apps: TestAppsManager;
1415
let gossiper: NoGossiper;
16+
let brain: Brain;
1517

1618
beforeEach(() => {
1719
apps = new TestAppsManager();
1820
gossiper = new NoGossiper();
21+
brain = new LocalBrain();
1922

2023
AppsRegistry.registerDriver('default', apps);
2124
AppsRegistry.initializeApp({}).then((app: TestApp) => {
@@ -35,7 +38,7 @@ beforeEach(() => {
3538
describe('pusher/channels/private', () => {
3639
test('join and leave', () => new Promise<void>(async (done) => {
3740
const app = await AppsRegistry.getById('app-id') as TestApp;
38-
const conns = new LocalConnections(app, gossiper);
41+
const conns = new LocalConnections(app, gossiper, brain);
3942

4043
const conn = new PusherConnection('test', {
4144
send: (message) => {
@@ -71,7 +74,7 @@ describe('pusher/channels/private', () => {
7174

7275
test('connect and disconnect', async () => new Promise<void>(async (done) => {
7376
const app = await AppsRegistry.getById('app-id') as TestApp;
74-
const conns = new LocalConnections(app, gossiper);
77+
const conns = new LocalConnections(app, gossiper, brain);
7578

7679
WsRouter.onConnectionClosed(async (conn) => {
7780
await conns.unsubscribeFromAllChannels(conn);
@@ -110,7 +113,7 @@ describe('pusher/channels/private', () => {
110113

111114
test('connect but get unauthorized', async () => new Promise<void>(async (done) => {
112115
const app = await AppsRegistry.getById('app-id') as TestApp;
113-
const conns = new LocalConnections(app, gossiper);
116+
const conns = new LocalConnections(app, gossiper, brain);
114117

115118
const conn = new PusherConnection('test', {
116119
send: (message) => {

tests/pusher/channels/public.test.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,19 @@ import { PusherConnection, PusherConnections } from '../../../src/pusher/ws';
66
import { Router as WsRouter } from '../../../src/ws';
77
import { describe, test, expect, beforeEach } from 'vitest';
88
import { createHmac } from 'crypto';
9+
import { Brain, LocalBrain } from '../../../src/brain';
910

1011
const pusherUtil = require('pusher/lib/util');
1112
const Pusher = require('pusher');
1213

1314
let apps: TestAppsManager;
1415
let gossiper: NoGossiper;
16+
let brain: Brain;
1517

1618
beforeEach(() => {
1719
apps = new TestAppsManager();
1820
gossiper = new NoGossiper();
21+
brain = new LocalBrain();
1922

2023
AppsRegistry.registerDriver('default', apps);
2124
AppsRegistry.initializeApp({}).then((app: TestApp) => {
@@ -35,7 +38,7 @@ beforeEach(() => {
3538
describe('pusher/channels/public', () => {
3639
test('join and leave', async () => {
3740
const app = await AppsRegistry.getById('app-id') as TestApp;
38-
const conns = new LocalConnections(app, gossiper);
41+
const conns = new LocalConnections(app, gossiper, brain);
3942

4043
const conn = new PusherConnection('test', {
4144
send: (message) => {
@@ -66,7 +69,7 @@ describe('pusher/channels/public', () => {
6669

6770
test('connect and disconnect', async () => new Promise<void>(async (done) => {
6871
const app = await AppsRegistry.getById('app-id') as TestApp;
69-
const conns = new LocalConnections(app, gossiper);
72+
const conns = new LocalConnections(app, gossiper, brain);
7073

7174
WsRouter.onConnectionClosed(async (conn) => {
7275
await conns.unsubscribeFromAllChannels(conn);

0 commit comments

Comments
 (0)