0
Fork 0
mirror of https://github.com/logto-io/logto.git synced 2024-12-16 20:26:19 -05:00

feat(core): add redis cluster and tls extra options support (#5619)

* feat: add redis cluster and tls extra options support

* refactor(core): allow non-normative redis url

---------

Co-authored-by: Gao Sun <gao@silverhand.io>
This commit is contained in:
Alessandro Chitolina 2024-04-18 14:14:49 +02:00 committed by GitHub
parent 4aa6db99db
commit 8ef021fb35
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 160 additions and 31 deletions

View file

@ -0,0 +1,5 @@
---
"@logto/core": minor
---
add support for Redis Cluster and extra TLS options for Redis connections

View file

@ -18,19 +18,21 @@ mockEsm('redis', () => ({
disconnect: mockFunction,
on: mockFunction,
}),
createCluster: () => ({
set: mockFunction,
get: mockFunction,
del: mockFunction,
sendCommand: async () => 'PONG',
connect: mockFunction,
disconnect: mockFunction,
on: mockFunction,
}),
}));
const { RedisCache } = await import('./index.js');
const stubRedisUrl = (url?: string) =>
Sinon.stub(EnvSet, 'values').value({
...EnvSet.values,
redisUrl: url,
});
const { RedisCache, RedisClusterCache, redisCacheFactory } = await import('./index.js');
describe('RedisCache', () => {
it('should successfully construct with no REDIS_URL', async () => {
stubRedisUrl();
const cache = new RedisCache();
expect(cache.client).toBeUndefined();
@ -47,9 +49,13 @@ describe('RedisCache', () => {
it('should successfully construct with a Redis client', async () => {
for (const url of ['1', 'redis://url']) {
jest.clearAllMocks();
stubRedisUrl(url);
const cache = new RedisCache();
const stub = Sinon.stub(EnvSet, 'values').value({
...EnvSet.values,
redisUrl: url,
});
const cache = redisCacheFactory();
expect(cache instanceof RedisCache).toBeTruthy();
expect(cache.client).toBeTruthy();
// eslint-disable-next-line no-await-in-loop
@ -63,6 +69,34 @@ describe('RedisCache', () => {
// Do sanity check only
expect(mockFunction).toBeCalledTimes(6);
stub.restore();
}
});
it('should successfully construct with a Redis Cluster client', async () => {
for (const url of ['redis://url?cluster=1', 'redis:?host=h1&host=h2&host=h3&cluster=true']) {
jest.clearAllMocks();
const stub = Sinon.stub(EnvSet, 'values').value({
...EnvSet.values,
redisUrl: url,
});
const cache = redisCacheFactory();
expect(cache instanceof RedisClusterCache).toBeTruthy();
expect(cache.client).toBeTruthy();
// eslint-disable-next-line no-await-in-loop
await Promise.all([
cache.set('foo', 'bar'),
cache.get('foo'),
cache.delete('foo'),
cache.connect(),
cache.disconnect(),
]);
// Do sanity check only
expect(mockFunction).toBeCalledTimes(6);
stub.restore();
}
});
});

View file

@ -1,27 +1,16 @@
import fs from 'node:fs';
import { appInsights } from '@logto/app-insights/node';
import { type Optional, conditional, yes } from '@silverhand/essentials';
import { createClient, type RedisClientType } from 'redis';
import { type Optional, conditional, yes, trySafe } from '@silverhand/essentials';
import { createClient, createCluster, type RedisClientType, type RedisClusterType } from 'redis';
import { EnvSet } from '#src/env-set/index.js';
import { consoleLog } from '#src/utils/console.js';
import { type CacheStore } from './types.js';
export class RedisCache implements CacheStore {
readonly client?: RedisClientType;
constructor() {
const { redisUrl } = EnvSet.values;
if (redisUrl) {
this.client = createClient({
url: conditional(!yes(redisUrl) && redisUrl),
});
this.client.on('error', (error) => {
void appInsights.trackException(error);
});
}
}
abstract class RedisCacheBase implements CacheStore {
readonly client?: RedisClientType | RedisClusterType;
async set(key: string, value: string, expire: number = 30 * 60) {
await this.client?.set(key, value, {
@ -40,7 +29,7 @@ export class RedisCache implements CacheStore {
async connect() {
if (this.client) {
await this.client.connect();
const pong = await this.client.ping();
const pong = await this.ping();
if (pong === 'PONG') {
consoleLog.info('[CACHE] Connected to Redis');
@ -56,6 +45,107 @@ export class RedisCache implements CacheStore {
consoleLog.info('[CACHE] Disconnected from Redis');
}
}
protected getSocketOptions(url: URL) {
const certFile = url.searchParams.get('cert');
const keyFile = url.searchParams.get('key');
const caFile = url.searchParams.get('certroot');
return {
rejectUnauthorized: yes(url.searchParams.get('reject_unauthorized')),
tls: url.protocol === 'rediss',
cert: certFile ? fs.readFileSync(certFile).toString() : undefined,
key: keyFile ? fs.readFileSync(keyFile).toString() : undefined,
ca: caFile ? fs.readFileSync(caFile).toString() : undefined,
reconnectStrategy: (retries: number, cause: Error) => {
if ('code' in cause && cause.code === 'SELF_SIGNED_CERT_IN_CHAIN') {
// This will throw only if reject unauthorized is true (default).
// Doesn't make sense to retry.
return false;
}
return Math.min(retries * 50, 500);
},
};
}
protected abstract ping(): Promise<string | undefined>;
}
export const redisCache = new RedisCache();
export class RedisCache extends RedisCacheBase {
readonly client?: RedisClientType;
constructor(redisUrl?: string | undefined) {
super();
if (redisUrl) {
this.client = createClient({
url: conditional(!yes(redisUrl) && redisUrl),
socket: trySafe(() => this.getSocketOptions(new URL(redisUrl))),
});
this.client.on('error', (error) => {
void appInsights.trackException(error);
});
}
}
protected async ping(): Promise<string | undefined> {
return this.client?.ping();
}
}
export class RedisClusterCache extends RedisCacheBase {
readonly client?: RedisClusterType;
constructor(connectionUrl: URL) {
super();
/* eslint-disable @silverhand/fp/no-mutating-methods */
const hosts = [];
if (connectionUrl.host) {
hosts.push(connectionUrl.host);
}
hosts.push(...connectionUrl.searchParams.getAll('host'));
/* eslint-enable @silverhand/fp/no-mutating-methods */
const rootNodes = hosts.map((host) => {
return {
url: 'redis://' + host,
};
});
this.client = createCluster({
rootNodes,
useReplicas: true,
defaults: {
socket: this.getSocketOptions(connectionUrl),
username: connectionUrl.username,
password: connectionUrl.password,
},
});
this.client.on('error', (error) => {
void appInsights.trackException(error);
});
}
protected async ping(): Promise<string | undefined> {
return this.client?.sendCommand(undefined, true, ['PING']);
}
}
export const redisCacheFactory = (): RedisCacheBase => {
const { redisUrl } = EnvSet.values;
if (redisUrl) {
const url = trySafe(() => new URL(redisUrl));
if (url && yes(url.searchParams.get('cluster'))) {
return new RedisClusterCache(url);
}
}
return new RedisCache(redisUrl);
};
export const redisCache = redisCacheFactory();

View file

@ -7,7 +7,7 @@ import koaLogger from 'koa-logger';
import mount from 'koa-mount';
import type Provider from 'oidc-provider';
import { type RedisCache } from '#src/caches/index.js';
import { type CacheStore } from '#src/caches/types.js';
import { WellKnownCache } from '#src/caches/well-known.js';
import { AdminApps, EnvSet, UserApps } from '#src/env-set/index.js';
import { createCloudConnectionLibrary } from '#src/libraries/cloud-connection.js';
@ -35,7 +35,7 @@ import type TenantContext from './TenantContext.js';
import { getTenantDatabaseDsn } from './utils.js';
export default class Tenant implements TenantContext {
static async create(id: string, redisCache: RedisCache, customDomain?: string): Promise<Tenant> {
static async create(id: string, redisCache: CacheStore, customDomain?: string): Promise<Tenant> {
// Treat the default database URL as the management URL
const envSet = new EnvSet(id, await getTenantDatabaseDsn(id));
// Custom endpoint is used for building OIDC issuer URL when the request is a custom domain