mirror of
https://github.com/diced/zipline.git
synced 2025-03-28 23:11:22 -05:00
feat: supabase datasource & remove swift
This commit is contained in:
parent
d76e6444e0
commit
659868181d
8 changed files with 135 additions and 253 deletions
|
@ -11,10 +11,10 @@ export interface ConfigCore {
|
|||
}
|
||||
|
||||
export interface ConfigDatasource {
|
||||
type: 'local' | 's3' | 'swift';
|
||||
type: 'local' | 's3' | 'supabase';
|
||||
local: ConfigLocalDatasource;
|
||||
s3?: ConfigS3Datasource;
|
||||
swift?: ConfigSwiftDatasource;
|
||||
supabase?: ConfigSupabaseDatasource;
|
||||
}
|
||||
|
||||
export interface ConfigLocalDatasource {
|
||||
|
@ -42,6 +42,12 @@ export interface ConfigSwiftDatasource {
|
|||
region_id?: string;
|
||||
}
|
||||
|
||||
export interface ConfigSupabaseDatasource {
|
||||
url: string;
|
||||
key: string;
|
||||
bucket: string;
|
||||
}
|
||||
|
||||
export interface ConfigUploader {
|
||||
default_format: string;
|
||||
route: string;
|
||||
|
|
|
@ -77,13 +77,9 @@ export default function readConfig() {
|
|||
map('DATASOURCE_S3_REGION', 'string', 'datasource.s3.region'),
|
||||
map('DATASOURCE_S3_USE_SSL', 'boolean', 'datasource.s3.use_ssl'),
|
||||
|
||||
map('DATASOURCE_SWIFT_USERNAME', 'string', 'datasource.swift.username'),
|
||||
map('DATASOURCE_SWIFT_PASSWORD', 'string', 'datasource.swift.password'),
|
||||
map('DATASOURCE_SWIFT_AUTH_ENDPOINT', 'string', 'datasource.swift.auth_endpoint'),
|
||||
map('DATASOURCE_SWIFT_CONTAINER', 'string', 'datasource.swift.container'),
|
||||
map('DATASOURCE_SWIFT_PROJECT_ID', 'string', 'datasource.swift.project_id'),
|
||||
map('DATASOURCE_SWIFT_DOMAIN_ID', 'string', 'datasource.swift.domain_id'),
|
||||
map('DATASOURCE_SWIFT_REGION_ID', 'string', 'datasource.swift.region_id'),
|
||||
map('DATASOURCE_SUPABASE_URL', 'string', 'datasource.supabase.url'),
|
||||
map('DATASOURCE_SUPABASE_KEY', 'string', 'datasource.supabase.key'),
|
||||
map('DATASOURCE_SUPABASE_BUCKET', 'string', 'datasource.supabase.bucket'),
|
||||
|
||||
map('UPLOADER_DEFAULT_FORMAT', 'string', 'uploader.default_format'),
|
||||
map('UPLOADER_ROUTE', 'string', 'uploader.route'),
|
||||
|
|
|
@ -34,7 +34,7 @@ const validator = s.object({
|
|||
}),
|
||||
datasource: s
|
||||
.object({
|
||||
type: s.enum('local', 's3', 'swift').default('local'),
|
||||
type: s.enum('local', 's3', 'swift', 'supabase').default('local'),
|
||||
local: s
|
||||
.object({
|
||||
directory: s.string.default('./uploads'),
|
||||
|
@ -61,6 +61,11 @@ const validator = s.object({
|
|||
domain_id: s.string.default('default'),
|
||||
region_id: s.string.nullable,
|
||||
}).optional,
|
||||
supabase: s.object({
|
||||
url: s.string,
|
||||
key: s.string,
|
||||
bucket: s.string,
|
||||
}).optional,
|
||||
})
|
||||
.default({
|
||||
type: 'local',
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import config from './config';
|
||||
import { Datasource, Local, S3, Swift } from './datasources';
|
||||
import { Datasource, Local, S3, Supabase } from './datasources';
|
||||
import Logger from './logger';
|
||||
|
||||
const logger = Logger.get('datasource');
|
||||
|
@ -14,9 +14,9 @@ if (!global.datasource) {
|
|||
global.datasource = new Local(config.datasource.local.directory);
|
||||
logger.info(`using Local(${config.datasource.local.directory}) datasource`);
|
||||
break;
|
||||
case 'swift':
|
||||
global.datasource = new Swift(config.datasource.swift);
|
||||
logger.info(`using Swift(${config.datasource.swift.container}) datasource`);
|
||||
case 'supabase':
|
||||
global.datasource = new Supabase(config.datasource.supabase);
|
||||
logger.info(`using Supabase(${config.datasource.supabase.bucket}) datasource`);
|
||||
break;
|
||||
default:
|
||||
throw new Error('Invalid datasource type');
|
||||
|
|
104
src/lib/datasources/Supabase.ts
Normal file
104
src/lib/datasources/Supabase.ts
Normal file
|
@ -0,0 +1,104 @@
|
|||
import { Datasource } from '.';
|
||||
import { ConfigSupabaseDatasource } from 'lib/config/Config';
|
||||
import { guess } from '../mimes';
|
||||
import Logger from '../logger';
|
||||
import { Readable } from 'stream';
|
||||
|
||||
export class Supabase extends Datasource {
|
||||
public name: string = 'Supabase';
|
||||
public logger: Logger = Logger.get('datasource::supabase');
|
||||
|
||||
public constructor(public config: ConfigSupabaseDatasource) {
|
||||
super();
|
||||
}
|
||||
|
||||
public async save(file: string, data: Buffer): Promise<void> {
|
||||
const mimetype = await guess(file.split('.').pop());
|
||||
|
||||
const r = await fetch(`${this.config.url}/storage/v1/object/${this.config.bucket}/${file}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Authorization: `Bearer ${this.config.key}`,
|
||||
'Content-Type': mimetype,
|
||||
},
|
||||
body: data,
|
||||
});
|
||||
|
||||
const j = await r.json();
|
||||
if (j.error) this.logger.error(`${j.error}: ${j.message}`);
|
||||
}
|
||||
|
||||
public async delete(file: string): Promise<void> {
|
||||
await fetch(`${this.config.url}/storage/v1/object/${this.config.bucket}/${file}`, {
|
||||
method: 'DELETE',
|
||||
headers: {
|
||||
Authorization: `Bearer ${this.config.key}`,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
public async get(file: string): Promise<Readable> {
|
||||
// get a readable stream from the request
|
||||
const r = await fetch(`${this.config.url}/storage/v1/object/${this.config.bucket}/${file}`, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
Authorization: `Bearer ${this.config.key}`,
|
||||
},
|
||||
});
|
||||
|
||||
return Readable.fromWeb(r.body as any);
|
||||
}
|
||||
|
||||
public size(file: string): Promise<number> {
|
||||
return new Promise(async (res, rej) => {
|
||||
fetch(`${this.config.url}/storage/v1/object/list/${this.config.bucket}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Authorization: `Bearer ${this.config.key}`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
prefix: '',
|
||||
search: file,
|
||||
}),
|
||||
})
|
||||
.then((r) => r.json())
|
||||
.then((j) => {
|
||||
if (j.error) {
|
||||
this.logger.error(`${j.error}: ${j.message}`);
|
||||
res(0);
|
||||
}
|
||||
|
||||
if (j.length === 0) {
|
||||
res(0);
|
||||
} else {
|
||||
res(j[0].metadata.size);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public async fullSize(): Promise<number> {
|
||||
return new Promise((res, rej) => {
|
||||
fetch(`${this.config.url}/storage/v1/object/list/${this.config.bucket}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Authorization: `Bearer ${this.config.key}`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
prefix: '',
|
||||
}),
|
||||
})
|
||||
.then((r) => r.json())
|
||||
.then((j) => {
|
||||
if (j.error) {
|
||||
this.logger.error(`${j.error}: ${j.message}`);
|
||||
res(0);
|
||||
}
|
||||
|
||||
res(j.reduce((a, b) => a + b.metadata.size, 0));
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,236 +0,0 @@
|
|||
import { Datasource } from '.';
|
||||
import { Readable } from 'stream';
|
||||
import { ConfigSwiftDatasource } from 'lib/config/Config';
|
||||
|
||||
interface SwiftContainerOptions {
|
||||
auth_endpoint_url: string;
|
||||
credentials: {
|
||||
username: string;
|
||||
password: string;
|
||||
project_id: string;
|
||||
domain_id: string;
|
||||
container: string;
|
||||
interface?: string;
|
||||
region_id: string;
|
||||
};
|
||||
refreshMargin?: number;
|
||||
}
|
||||
|
||||
interface SwiftAuth {
|
||||
token: string;
|
||||
expires: Date;
|
||||
swiftURL: string;
|
||||
}
|
||||
|
||||
interface SwiftObject {
|
||||
bytes: number;
|
||||
content_type: string;
|
||||
hash: string;
|
||||
name: string;
|
||||
last_modified: string;
|
||||
}
|
||||
|
||||
class SwiftContainer {
|
||||
auth: SwiftAuth | null;
|
||||
|
||||
constructor(private options: SwiftContainerOptions) {
|
||||
this.auth = null;
|
||||
}
|
||||
|
||||
private findEndpointURL(catalog: any[], service: string): string | null {
|
||||
const catalogEntry = catalog.find((x) => x.name === service);
|
||||
if (!catalogEntry) return null;
|
||||
|
||||
const endpoint = catalogEntry.endpoints.find(
|
||||
(x: any) =>
|
||||
x.interface === (this.options.credentials.interface || 'public') &&
|
||||
(this.options.credentials.region_id ? x.region_id == this.options.credentials.region_id : true)
|
||||
);
|
||||
|
||||
return endpoint ? endpoint.url : null;
|
||||
}
|
||||
|
||||
private async getCredentials(): Promise<SwiftAuth> {
|
||||
const payload = {
|
||||
auth: {
|
||||
identity: {
|
||||
methods: ['password'],
|
||||
password: {
|
||||
user: {
|
||||
name: this.options.credentials.username,
|
||||
password: this.options.credentials.password,
|
||||
domain: {
|
||||
id: this.options.credentials.domain_id || 'default',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
scope: {
|
||||
project: {
|
||||
id: this.options.credentials.project_id,
|
||||
domain: {
|
||||
id: this.options.credentials.domain_id || 'default',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const { json, headers, error } = await fetch(`${this.options.auth_endpoint_url}/auth/tokens`, {
|
||||
body: JSON.stringify(payload),
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Accept: 'application/json',
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
}).then(async (e) => {
|
||||
try {
|
||||
const json = await e.json();
|
||||
return { json, headers: e.headers, error: null };
|
||||
} catch (e) {
|
||||
return { json: null, headers: null, error: e };
|
||||
}
|
||||
});
|
||||
|
||||
if (error || !json || !headers || json.error)
|
||||
throw new Error('Could not retrieve credentials from OpenStack, check your config file');
|
||||
|
||||
const catalog = json.token.catalog;
|
||||
// many Swift clouds use ceph radosgw to provide swift
|
||||
const swiftURL = this.findEndpointURL(catalog, 'swift') || this.findEndpointURL(catalog, 'radosgw-swift');
|
||||
if (!swiftURL) throw new Error('Couldn\'t find any "swift" or "radosgw-swift" service in the catalog');
|
||||
|
||||
return {
|
||||
token: headers.get('x-subject-token'),
|
||||
expires: new Date(json.token.expires_at),
|
||||
swiftURL,
|
||||
};
|
||||
}
|
||||
|
||||
private async authenticate() {
|
||||
if (!this.auth) this.auth = await this.getCredentials();
|
||||
const authExpiry = new Date(Date.now() + this.options.refreshMargin || 10_000);
|
||||
|
||||
if (authExpiry > this.auth.expires) this.auth = await this.getCredentials();
|
||||
const validAuth = this.auth;
|
||||
|
||||
return { swiftURL: validAuth.swiftURL, token: validAuth.token };
|
||||
}
|
||||
|
||||
private generateHeaders(token: string, extra?: any) {
|
||||
return { accept: 'application/json', 'x-auth-token': token, ...extra };
|
||||
}
|
||||
|
||||
public async listObjects(query?: string): Promise<SwiftObject[]> {
|
||||
const auth = await this.authenticate();
|
||||
return await fetch(
|
||||
`${auth.swiftURL}/${this.options.credentials.container}${
|
||||
query ? `${query.startsWith('?') ? '' : '?'}${query}` : ''
|
||||
}`,
|
||||
{
|
||||
method: 'GET',
|
||||
headers: this.generateHeaders(auth.token),
|
||||
}
|
||||
).then((e) => e.json());
|
||||
}
|
||||
|
||||
public async uploadObject(name: string, data: Buffer): Promise<any> {
|
||||
const auth = await this.authenticate();
|
||||
|
||||
return fetch(`${auth.swiftURL}/${this.options.credentials.container}/${name}`, {
|
||||
method: 'PUT',
|
||||
headers: this.generateHeaders(auth.token),
|
||||
body: data,
|
||||
});
|
||||
}
|
||||
|
||||
public async deleteObject(name: string): Promise<any> {
|
||||
const auth = await this.authenticate();
|
||||
|
||||
return fetch(`${auth.swiftURL}/${this.options.credentials.container}/${name}`, {
|
||||
method: 'DELETE',
|
||||
headers: this.generateHeaders(auth.token),
|
||||
});
|
||||
}
|
||||
|
||||
public async getObject(name: string): Promise<Readable> {
|
||||
const auth = await this.authenticate();
|
||||
|
||||
const arrayBuffer = await fetch(`${auth.swiftURL}/${this.options.credentials.container}/${name}`, {
|
||||
method: 'GET',
|
||||
headers: this.generateHeaders(auth.token, { Accept: '*/*' }),
|
||||
}).then((e) => e.arrayBuffer());
|
||||
|
||||
return Readable.from(Buffer.from(arrayBuffer));
|
||||
}
|
||||
|
||||
public async headObject(name: string): Promise<any> {
|
||||
const auth = await this.authenticate();
|
||||
|
||||
return fetch(`${auth.swiftURL}/${this.options.credentials.container}/${name}`, {
|
||||
method: 'HEAD',
|
||||
headers: this.generateHeaders(auth.token),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export class Swift extends Datasource {
|
||||
public name: string = 'Swift';
|
||||
container: SwiftContainer;
|
||||
|
||||
public constructor(public config: ConfigSwiftDatasource) {
|
||||
super();
|
||||
this.container = new SwiftContainer({
|
||||
auth_endpoint_url: config.auth_endpoint,
|
||||
credentials: {
|
||||
username: config.username,
|
||||
password: config.password,
|
||||
project_id: config.project_id,
|
||||
domain_id: config.domain_id || 'default',
|
||||
container: config.container,
|
||||
region_id: config.region_id,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
public async save(file: string, data: Buffer): Promise<void> {
|
||||
try {
|
||||
return this.container.uploadObject(file, data);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public async delete(file: string): Promise<void> {
|
||||
try {
|
||||
return this.container.deleteObject(file);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public get(file: string): Promise<Readable> | Readable {
|
||||
try {
|
||||
return this.container.getObject(file);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public async size(file: string): Promise<number> {
|
||||
try {
|
||||
const head = await this.container.headObject(file);
|
||||
|
||||
return head.headers.get('content-length') || 0;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public async fullSize(): Promise<number> {
|
||||
return this.container
|
||||
.listObjects()
|
||||
.then((objects) => objects.reduce((acc, object) => acc + object.bytes, 0))
|
||||
.catch(() => 0);
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
export { Datasource } from './Datasource';
|
||||
export { Local } from './Local';
|
||||
export { S3 } from './S3';
|
||||
export { Swift } from './Swift';
|
||||
export { Supabase } from './Supabase';
|
||||
|
|
|
@ -236,7 +236,10 @@ async function rawFile(req: IncomingMessage, res: OutgoingMessage, nextServer: N
|
|||
res.setHeader('Content-Length', size);
|
||||
|
||||
data.pipe(res);
|
||||
data.on('error', () => nextServer.render404(req, res as ServerResponse));
|
||||
data.on('error', (e) => {
|
||||
logger.debug(`error while serving raw file ${id}: ${e}`);
|
||||
nextServer.render404(req, res as ServerResponse);
|
||||
});
|
||||
data.on('end', () => res.end());
|
||||
}
|
||||
|
||||
|
@ -257,8 +260,12 @@ async function fileDb(
|
|||
|
||||
res.setHeader('Content-Type', image.mimetype);
|
||||
res.setHeader('Content-Length', size);
|
||||
|
||||
data.pipe(res);
|
||||
data.on('error', () => nextServer.render404(req, res as ServerResponse));
|
||||
data.on('error', (e) => {
|
||||
logger.debug(`error while serving raw file ${image.file}: ${e}`);
|
||||
nextServer.render404(req, res as ServerResponse);
|
||||
});
|
||||
data.on('end', () => res.end());
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue