mirror of
https://github.com/diced/zipline.git
synced 2025-04-11 23:31:17 -05:00
feat(datasource): s3 path styles
This commit is contained in:
parent
300430b3ec
commit
13b0ac737b
5 changed files with 18 additions and 16 deletions
|
@ -1,29 +1,28 @@
|
|||
import { Datasource } from './datasource';
|
||||
import AWS from 'aws-sdk';
|
||||
import { Readable } from 'stream';
|
||||
import { ConfigS3Datasource } from 'lib/types';
|
||||
|
||||
export class S3 extends Datasource {
|
||||
public name: string = 'S3';
|
||||
public s3: AWS.S3;
|
||||
|
||||
public constructor(
|
||||
public accessKey: string,
|
||||
public secretKey: string,
|
||||
public endpoint: string,
|
||||
public bucket: string,
|
||||
public config: ConfigS3Datasource,
|
||||
) {
|
||||
super();
|
||||
this.s3 = new AWS.S3({
|
||||
accessKeyId: accessKey,
|
||||
endpoint: endpoint,
|
||||
secretAccessKey: secretKey,
|
||||
accessKeyId: config.access_key_id,
|
||||
endpoint: config.endpoint || null,
|
||||
s3ForcePathStyle: config.force_s3_path,
|
||||
secretAccessKey: config.secret_access_key,
|
||||
});
|
||||
}
|
||||
|
||||
public async save(file: string, data: Buffer): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.s3.upload({
|
||||
Bucket: this.bucket,
|
||||
Bucket: this.config.bucket,
|
||||
Key: file,
|
||||
Body: data,
|
||||
}, err => {
|
||||
|
@ -39,7 +38,7 @@ export class S3 extends Datasource {
|
|||
public async delete(file: string): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.s3.deleteObject({
|
||||
Bucket: this.bucket,
|
||||
Bucket: this.config.bucket,
|
||||
Key: file,
|
||||
}, err => {
|
||||
if (err) {
|
||||
|
@ -54,7 +53,7 @@ export class S3 extends Datasource {
|
|||
public get(file: string): Readable {
|
||||
// Unfortunately, aws-sdk is bad and the stream still loads everything into memory.
|
||||
return this.s3.getObject({
|
||||
Bucket: this.bucket,
|
||||
Bucket: this.config.bucket,
|
||||
Key: file,
|
||||
}).createReadStream();
|
||||
}
|
||||
|
@ -62,7 +61,7 @@ export class S3 extends Datasource {
|
|||
public async size(): Promise<number> {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.s3.listObjects({
|
||||
Bucket: this.bucket,
|
||||
Bucket: this.config.bucket,
|
||||
}, (err, data) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
|
|
|
@ -6,7 +6,7 @@ if (!global.datasource) {
|
|||
switch (config.datasource.type) {
|
||||
case 's3':
|
||||
Logger.get('datasource').info(`Using S3(${config.datasource.s3.bucket}) datasource`);
|
||||
global.datasource = new S3(config.datasource.s3.access_key_id, config.datasource.s3.secret_access_key, config.datasource.s3.endpoint, config.datasource.s3.bucket);
|
||||
global.datasource = new S3(config.datasource.s3);
|
||||
break;
|
||||
case 'local':
|
||||
Logger.get('datasource').info(`Using local(${config.datasource.local.directory}) datasource`);
|
||||
|
|
|
@ -19,7 +19,8 @@ const envValues = [
|
|||
e('DATASOURCE_LOCAL_DIRECTORY', 'string', (c, v) => c.datasource.local.directory = v),
|
||||
e('DATASOURCE_S3_ACCESS_KEY_ID', 'string', (c, v) => c.datasource.s3.access_key_id = v ),
|
||||
e('DATASOURCE_S3_SECRET_ACCESS_KEY', 'string', (c, v) => c.datasource.s3.secret_access_key = v),
|
||||
e('DATASOURCE_S3_ENDPOINT', 'string', (c, v) => c.datasource.s3.endpoint = v),
|
||||
e('DATASOURCE_S3_ENDPOINT', 'string', (c, v) => c.datasource.s3.endpoint = v ?? null),
|
||||
e('DATASOURCE_S3_FORCE_S3_PATH', 'string', (c, v) => c.datasource.s3.force_s3_path = v ?? false),
|
||||
e('DATASOURCE_S3_BUCKET', 'string', (c, v) => c.datasource.s3.bucket = v),
|
||||
|
||||
e('UPLOADER_ROUTE', 'string', (c, v) => c.uploader.route = v),
|
||||
|
@ -71,6 +72,7 @@ function tryReadEnv(): Config {
|
|||
secret_access_key: undefined,
|
||||
endpoint: undefined,
|
||||
bucket: undefined,
|
||||
force_s3_path: undefined,
|
||||
},
|
||||
},
|
||||
uploader: {
|
||||
|
|
|
@ -38,8 +38,9 @@ export interface ConfigLocalDatasource {
|
|||
export interface ConfigS3Datasource {
|
||||
access_key_id: string;
|
||||
secret_access_key: string;
|
||||
endpoint: string;
|
||||
endpoint?: string;
|
||||
bucket: string;
|
||||
force_s3_path: boolean;
|
||||
}
|
||||
|
||||
export interface ConfigUploader {
|
||||
|
|
|
@ -19,8 +19,9 @@ const validator = object({
|
|||
s3: object({
|
||||
access_key_id: string(),
|
||||
secret_access_key: string(),
|
||||
endpoint: string(),
|
||||
endpoint: string().notRequired().default(null),
|
||||
bucket: string(),
|
||||
force_s3_path: boolean().default(false),
|
||||
}).notRequired(),
|
||||
}).required(),
|
||||
uploader: object({
|
||||
|
@ -49,7 +50,6 @@ export default function validate(config): Config {
|
|||
const errors = [];
|
||||
if (!validated.datasource.s3.access_key_id) errors.push('datasource.s3.access_key_id is a required field');
|
||||
if (!validated.datasource.s3.secret_access_key) errors.push('datasource.s3.secret_access_key is a required field');
|
||||
if (!validated.datasource.s3.endpoint) errors.push('datasource.s3.endpoint is a required field');
|
||||
if (!validated.datasource.s3.bucket) errors.push('datasource.s3.bucket is a required field');
|
||||
if (errors.length) throw { errors };
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue