mirror of
https://github.com/immich-app/immich.git
synced 2025-03-18 02:31:28 -05:00
refactor: migrate map repository to kysely (#15348)
* chore: migrate map repository to kysely * chore: add kysely codegen command, exclude from prettier and re-run it on latest migrations * refactor: migrate map repository to kysely * chore: dont log postgres notices
This commit is contained in:
parent
efbc0cb192
commit
c821458e6c
6 changed files with 138 additions and 114 deletions
|
@ -10,6 +10,7 @@ node_modules
|
|||
coverage
|
||||
dist
|
||||
**/migrations/**
|
||||
db.d.ts
|
||||
|
||||
# Ignore files for PNPM, NPM and YARN
|
||||
pnpm-lock.yaml
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
"typeorm:migrations:revert": "typeorm migration:revert -d ./dist/bin/database.js",
|
||||
"typeorm:schema:drop": "typeorm query -d ./dist/bin/database.js 'DROP schema public cascade; CREATE schema public;'",
|
||||
"typeorm:schema:reset": "npm run typeorm:schema:drop && npm run typeorm:migrations:run",
|
||||
"kysely:codegen": "npx kysely-codegen --include-pattern=\"(public|vectors).*\" --dialect postgres --url postgres://postgres:postgres@localhost/immich --log-level debug --out-file=./src/db.d.ts",
|
||||
"sync:open-api": "node ./dist/bin/sync-open-api.js",
|
||||
"sync:sql": "node ./dist/bin/sync-sql.js",
|
||||
"email:dev": "email dev -p 3050 --dir src/emails"
|
||||
|
|
23
server/src/db.d.ts
vendored
23
server/src/db.d.ts
vendored
|
@ -3,16 +3,21 @@
|
|||
* Please do not edit it manually.
|
||||
*/
|
||||
|
||||
import type { ColumnType } from 'kysely';
|
||||
import type { ColumnType } from "kysely";
|
||||
|
||||
export type ArrayType<T> = ArrayTypeImpl<T> extends (infer U)[] ? U[] : ArrayTypeImpl<T>;
|
||||
export type ArrayType<T> = ArrayTypeImpl<T> extends (infer U)[]
|
||||
? U[]
|
||||
: ArrayTypeImpl<T>;
|
||||
|
||||
export type ArrayTypeImpl<T> = T extends ColumnType<infer S, infer I, infer U> ? ColumnType<S[], I[], U[]> : T[];
|
||||
export type ArrayTypeImpl<T> = T extends ColumnType<infer S, infer I, infer U>
|
||||
? ColumnType<S[], I[], U[]>
|
||||
: T[];
|
||||
|
||||
export type AssetsStatusEnum = 'active' | 'deleted' | 'trashed';
|
||||
export type AssetsStatusEnum = "active" | "deleted" | "trashed";
|
||||
|
||||
export type Generated<T> =
|
||||
T extends ColumnType<infer S, infer I, infer U> ? ColumnType<S, I | undefined, U> : ColumnType<T, T | undefined, T>;
|
||||
export type Generated<T> = T extends ColumnType<infer S, infer I, infer U>
|
||||
? ColumnType<S, I | undefined, U>
|
||||
: ColumnType<T, T | undefined, T>;
|
||||
|
||||
export type Int8 = ColumnType<string, bigint | number | string, bigint | number | string>;
|
||||
|
||||
|
@ -28,7 +33,7 @@ export type JsonPrimitive = boolean | number | string | null;
|
|||
|
||||
export type JsonValue = JsonArray | JsonObject | JsonPrimitive;
|
||||
|
||||
export type Sourcetype = 'exif' | 'machine-learning';
|
||||
export type Sourcetype = "exif" | "machine-learning";
|
||||
|
||||
export type Timestamp = ColumnType<Date, Date | string, Date | string>;
|
||||
|
||||
|
@ -257,7 +262,7 @@ export interface NaturalearthCountries {
|
|||
admin: string;
|
||||
admin_a3: string;
|
||||
coordinates: string;
|
||||
id: number;
|
||||
id: Generated<number>;
|
||||
type: string;
|
||||
}
|
||||
|
||||
|
@ -433,6 +438,6 @@ export interface DB {
|
|||
tags_closure: TagsClosure;
|
||||
user_metadata: UserMetadata;
|
||||
users: Users;
|
||||
'vectors.pg_vector_index_stat': VectorsPgVectorIndexStat;
|
||||
"vectors.pg_vector_index_stat": VectorsPgVectorIndexStat;
|
||||
version_history: VersionHistory;
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ describe('getEnv', () => {
|
|||
config: {
|
||||
kysely: {
|
||||
dialect: expect.any(PostgresJSDialect),
|
||||
log: ['error'],
|
||||
log: expect.any(Function),
|
||||
},
|
||||
typeorm: expect.objectContaining({
|
||||
type: 'postgres',
|
||||
|
|
|
@ -5,7 +5,7 @@ import { Request, Response } from 'express';
|
|||
import { PostgresJSDialect } from 'kysely-postgres-js';
|
||||
import { CLS_ID } from 'nestjs-cls';
|
||||
import { join, resolve } from 'node:path';
|
||||
import postgres from 'postgres';
|
||||
import postgres, { Notice } from 'postgres';
|
||||
import { citiesFile, excludePaths, IWorker } from 'src/constants';
|
||||
import { Telemetry } from 'src/decorators';
|
||||
import { EnvDto } from 'src/dtos/env.dto';
|
||||
|
@ -99,6 +99,11 @@ const getEnv = (): EnvData => {
|
|||
}
|
||||
|
||||
const driverOptions = {
|
||||
onnotice: (notice: Notice) => {
|
||||
if (notice['severity'] !== 'NOTICE') {
|
||||
console.warn('Postgres notice:', notice);
|
||||
}
|
||||
},
|
||||
max: 10,
|
||||
types: {
|
||||
date: {
|
||||
|
@ -194,7 +199,16 @@ const getEnv = (): EnvData => {
|
|||
dialect: new PostgresJSDialect({
|
||||
postgres: databaseUrl ? postgres(databaseUrl, driverOptions) : postgres({ ...parts, ...driverOptions }),
|
||||
}),
|
||||
log: ['error'] as const,
|
||||
log(event) {
|
||||
if (event.level === 'error') {
|
||||
console.error('Query failed :', {
|
||||
durationMs: event.queryDurationMillis,
|
||||
error: event.error,
|
||||
sql: event.query.sql,
|
||||
params: event.query.parameters,
|
||||
});
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
|
|
|
@ -1,17 +1,15 @@
|
|||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { InjectDataSource, InjectRepository } from '@nestjs/typeorm';
|
||||
import { getName } from 'i18n-iso-countries';
|
||||
import { Expression, Kysely, sql, SqlBool } from 'kysely';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { createReadStream, existsSync } from 'node:fs';
|
||||
import { readFile } from 'node:fs/promises';
|
||||
import readLine from 'node:readline';
|
||||
import { citiesFile } from 'src/constants';
|
||||
import { AssetEntity } from 'src/entities/asset.entity';
|
||||
import { GeodataPlacesEntity, GeodataPlacesTempEntity } from 'src/entities/geodata-places.entity';
|
||||
import {
|
||||
NaturalEarthCountriesEntity,
|
||||
NaturalEarthCountriesTempEntity,
|
||||
} from 'src/entities/natural-earth-countries.entity';
|
||||
import { DB, GeodataPlaces, NaturalearthCountries } from 'src/db';
|
||||
import { AssetEntity, withExif } from 'src/entities/asset.entity';
|
||||
import { NaturalEarthCountriesTempEntity } from 'src/entities/natural-earth-countries.entity';
|
||||
import { LogLevel, SystemMetadataKey } from 'src/enum';
|
||||
import { IConfigRepository } from 'src/interfaces/config.interface';
|
||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
||||
|
@ -23,21 +21,19 @@ import {
|
|||
ReverseGeocodeResult,
|
||||
} from 'src/interfaces/map.interface';
|
||||
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
|
||||
import { OptionalBetween } from 'src/utils/database';
|
||||
import { DataSource, In, IsNull, Not, Repository } from 'typeorm';
|
||||
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity.js';
|
||||
|
||||
interface MapDB extends DB {
|
||||
geodata_places_tmp: GeodataPlaces;
|
||||
naturalearth_countries_tmp: NaturalearthCountries;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class MapRepository implements IMapRepository {
|
||||
constructor(
|
||||
@InjectRepository(AssetEntity) private assetRepository: Repository<AssetEntity>,
|
||||
@InjectRepository(GeodataPlacesEntity) private geodataPlacesRepository: Repository<GeodataPlacesEntity>,
|
||||
@InjectRepository(NaturalEarthCountriesEntity)
|
||||
private naturalEarthCountriesRepository: Repository<NaturalEarthCountriesEntity>,
|
||||
@InjectDataSource() private dataSource: DataSource,
|
||||
@Inject(IConfigRepository) private configRepository: IConfigRepository,
|
||||
@Inject(ISystemMetadataRepository) private metadataRepository: ISystemMetadataRepository,
|
||||
@Inject(ILoggerRepository) private logger: ILoggerRepository,
|
||||
@InjectKysely() private db: Kysely<MapDB>,
|
||||
) {
|
||||
this.logger.setContext(MapRepository.name);
|
||||
}
|
||||
|
@ -70,39 +66,34 @@ export class MapRepository implements IMapRepository {
|
|||
): Promise<MapMarker[]> {
|
||||
const { isArchived, isFavorite, fileCreatedAfter, fileCreatedBefore } = options;
|
||||
|
||||
const where = {
|
||||
isVisible: true,
|
||||
isArchived,
|
||||
exifInfo: {
|
||||
latitude: Not(IsNull()),
|
||||
longitude: Not(IsNull()),
|
||||
},
|
||||
isFavorite,
|
||||
fileCreatedAt: OptionalBetween(fileCreatedAfter, fileCreatedBefore),
|
||||
};
|
||||
const assets = (await this.db
|
||||
.selectFrom('assets')
|
||||
.$call(withExif)
|
||||
.select('id')
|
||||
.leftJoin('albums_assets_assets', (join) => join.onRef('assets.id', '=', 'albums_assets_assets.assetsId'))
|
||||
.where('isVisible', '=', true)
|
||||
.$if(isArchived !== undefined, (q) => q.where('isArchived', '=', isArchived!))
|
||||
.$if(isFavorite !== undefined, (q) => q.where('isFavorite', '=', isFavorite!))
|
||||
.$if(fileCreatedAfter !== undefined, (q) => q.where('fileCreatedAt', '>=', fileCreatedAfter!))
|
||||
.$if(fileCreatedBefore !== undefined, (q) => q.where('fileCreatedAt', '<=', fileCreatedBefore!))
|
||||
.where('deletedAt', 'is', null)
|
||||
.where('exif.latitude', 'is not', null)
|
||||
.where('exif.longitude', 'is not', null)
|
||||
.where((eb) => {
|
||||
const ors: Expression<SqlBool>[] = [];
|
||||
|
||||
const assets = await this.assetRepository.find({
|
||||
select: {
|
||||
id: true,
|
||||
exifInfo: {
|
||||
city: true,
|
||||
state: true,
|
||||
country: true,
|
||||
latitude: true,
|
||||
longitude: true,
|
||||
},
|
||||
},
|
||||
where: [
|
||||
{ ...where, ownerId: In([...ownerIds]) },
|
||||
{ ...where, albums: { id: In([...albumIds]) } },
|
||||
],
|
||||
relations: {
|
||||
exifInfo: true,
|
||||
},
|
||||
order: {
|
||||
fileCreatedAt: 'DESC',
|
||||
},
|
||||
});
|
||||
if (ownerIds.length > 0) {
|
||||
ors.push(eb('ownerId', 'in', ownerIds));
|
||||
}
|
||||
|
||||
if (albumIds.length > 0) {
|
||||
ors.push(eb('albums_assets_assets.albumsId', 'in', albumIds));
|
||||
}
|
||||
|
||||
return eb.or(ors);
|
||||
})
|
||||
.orderBy('fileCreatedAt', 'desc')
|
||||
.execute()) as any as AssetEntity[];
|
||||
|
||||
return assets.map((asset) => ({
|
||||
id: asset.id,
|
||||
|
@ -117,15 +108,19 @@ export class MapRepository implements IMapRepository {
|
|||
async reverseGeocode(point: GeoPoint): Promise<ReverseGeocodeResult> {
|
||||
this.logger.debug(`Request: ${point.latitude},${point.longitude}`);
|
||||
|
||||
const response = await this.geodataPlacesRepository
|
||||
.createQueryBuilder('geoplaces')
|
||||
const response = await this.db
|
||||
.selectFrom('geodata_places')
|
||||
.selectAll()
|
||||
.where(
|
||||
'earth_box(ll_to_earth_public(:latitude, :longitude), 25000) @> ll_to_earth_public(latitude, longitude)',
|
||||
point,
|
||||
sql`earth_box(ll_to_earth_public(${point.latitude}, ${point.longitude}), 25000)`,
|
||||
'@>',
|
||||
sql`ll_to_earth_public(latitude, longitude)`,
|
||||
)
|
||||
.orderBy(
|
||||
sql`(earth_distance(ll_to_earth_public(${point.latitude}, ${point.longitude}), ll_to_earth_public(latitude, longitude)))`,
|
||||
)
|
||||
.orderBy('earth_distance(ll_to_earth_public(:latitude, :longitude), ll_to_earth_public(latitude, longitude))')
|
||||
.limit(1)
|
||||
.getOne();
|
||||
.executeTakeFirst();
|
||||
|
||||
if (response) {
|
||||
if (this.logger.isLevelEnabled(LogLevel.VERBOSE)) {
|
||||
|
@ -143,11 +138,12 @@ export class MapRepository implements IMapRepository {
|
|||
`Response from database for reverse geocoding latitude: ${point.latitude}, longitude: ${point.longitude} was null`,
|
||||
);
|
||||
|
||||
const ne_response = await this.naturalEarthCountriesRepository
|
||||
.createQueryBuilder('naturalearth_countries')
|
||||
.where('coordinates @> point (:longitude, :latitude)', point)
|
||||
const ne_response = await this.db
|
||||
.selectFrom('naturalearth_countries')
|
||||
.selectAll()
|
||||
.where('coordinates', '@>', sql<string>`point(${point.longitude}, ${point.latitude})`)
|
||||
.limit(1)
|
||||
.getOne();
|
||||
.executeTakeFirst();
|
||||
|
||||
if (!ne_response) {
|
||||
this.logger.warn(
|
||||
|
@ -176,10 +172,11 @@ export class MapRepository implements IMapRepository {
|
|||
return;
|
||||
}
|
||||
|
||||
await this.dataSource.query('DROP TABLE IF EXISTS naturalearth_countries_tmp');
|
||||
await this.dataSource.query(
|
||||
'CREATE TABLE naturalearth_countries_tmp (LIKE naturalearth_countries INCLUDING ALL EXCLUDING INDEXES)',
|
||||
await this.db.schema.dropTable('naturalearth_countries_tmp').ifExists().execute();
|
||||
await sql`CREATE TABLE naturalearth_countries_tmp (LIKE naturalearth_countries INCLUDING ALL EXCLUDING INDEXES)`.execute(
|
||||
this.db,
|
||||
);
|
||||
|
||||
const entities: Omit<NaturalEarthCountriesTempEntity, 'id'>[] = [];
|
||||
for (const feature of geoJSONData.features) {
|
||||
for (const entry of feature.geometry.coordinates) {
|
||||
|
@ -196,14 +193,14 @@ export class MapRepository implements IMapRepository {
|
|||
}
|
||||
}
|
||||
}
|
||||
await this.dataSource.manager.insert(NaturalEarthCountriesTempEntity, entities);
|
||||
await this.db.insertInto('naturalearth_countries_tmp').values(entities).execute();
|
||||
|
||||
await this.dataSource.query(`ALTER TABLE naturalearth_countries_tmp ADD PRIMARY KEY (id) WITH (FILLFACTOR = 100)`);
|
||||
await sql`ALTER TABLE naturalearth_countries_tmp ADD PRIMARY KEY (id) WITH (FILLFACTOR = 100)`.execute(this.db);
|
||||
|
||||
await this.dataSource.transaction(async (manager) => {
|
||||
await manager.query('ALTER TABLE naturalearth_countries RENAME TO naturalearth_countries_old');
|
||||
await manager.query('ALTER TABLE naturalearth_countries_tmp RENAME TO naturalearth_countries');
|
||||
await manager.query('DROP TABLE naturalearth_countries_old');
|
||||
await this.db.transaction().execute(async (manager) => {
|
||||
await manager.schema.alterTable('naturalearth_countries').renameTo('naturalearth_countries_old').execute();
|
||||
await manager.schema.alterTable('naturalearth_countries_tmp').renameTo('naturalearth_countries').execute();
|
||||
await manager.schema.dropTable('naturalearth_countries_old').execute();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -214,17 +211,15 @@ export class MapRepository implements IMapRepository {
|
|||
this.loadAdmin(resourcePaths.geodata.admin2),
|
||||
]);
|
||||
|
||||
await this.dataSource.query('DROP TABLE IF EXISTS geodata_places_tmp');
|
||||
await this.dataSource.query(
|
||||
'CREATE TABLE geodata_places_tmp (LIKE geodata_places INCLUDING ALL EXCLUDING INDEXES)',
|
||||
);
|
||||
await this.db.schema.dropTable('geodata_places_tmp').ifExists().execute();
|
||||
await sql`CREATE TABLE geodata_places_tmp (LIKE geodata_places INCLUDING ALL EXCLUDING INDEXES)`.execute(this.db);
|
||||
await this.loadCities500(admin1, admin2);
|
||||
await this.createGeodataIndices();
|
||||
|
||||
await this.dataSource.transaction(async (manager) => {
|
||||
await manager.query('ALTER TABLE geodata_places RENAME TO geodata_places_old');
|
||||
await manager.query('ALTER TABLE geodata_places_tmp RENAME TO geodata_places');
|
||||
await manager.query('DROP TABLE geodata_places_old');
|
||||
await this.db.transaction().execute(async (manager) => {
|
||||
await manager.schema.alterTable('geodata_places').renameTo('geodata_places_old').execute();
|
||||
await manager.schema.alterTable('geodata_places_tmp').renameTo('geodata_places').execute();
|
||||
await manager.schema.dropTable('geodata_places_old').execute();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -236,7 +231,7 @@ export class MapRepository implements IMapRepository {
|
|||
}
|
||||
|
||||
const input = createReadStream(cities500, { highWaterMark: 512 * 1024 * 1024 });
|
||||
let bufferGeodata: QueryDeepPartialEntity<GeodataPlacesTempEntity>[] = [];
|
||||
let bufferGeodata = [];
|
||||
const lineReader = readLine.createInterface({ input });
|
||||
let count = 0;
|
||||
|
||||
|
@ -257,19 +252,23 @@ export class MapRepository implements IMapRepository {
|
|||
admin1Code: lineSplit[10],
|
||||
admin2Code: lineSplit[11],
|
||||
modificationDate: lineSplit[18],
|
||||
admin1Name: admin1Map.get(`${lineSplit[8]}.${lineSplit[10]}`),
|
||||
admin2Name: admin2Map.get(`${lineSplit[8]}.${lineSplit[10]}.${lineSplit[11]}`),
|
||||
admin1Name: admin1Map.get(`${lineSplit[8]}.${lineSplit[10]}`) ?? null,
|
||||
admin2Name: admin2Map.get(`${lineSplit[8]}.${lineSplit[10]}.${lineSplit[11]}`) ?? null,
|
||||
};
|
||||
bufferGeodata.push(geoData);
|
||||
if (bufferGeodata.length >= 5000) {
|
||||
const curLength = bufferGeodata.length;
|
||||
futures.push(
|
||||
this.dataSource.manager.insert(GeodataPlacesTempEntity, bufferGeodata).then(() => {
|
||||
count += curLength;
|
||||
if (count % 10_000 === 0) {
|
||||
this.logger.log(`${count} geodata records imported`);
|
||||
}
|
||||
}),
|
||||
this.db
|
||||
.insertInto('geodata_places_tmp')
|
||||
.values(bufferGeodata)
|
||||
.execute()
|
||||
.then(() => {
|
||||
count += curLength;
|
||||
if (count % 10_000 === 0) {
|
||||
this.logger.log(`${count} geodata records imported`);
|
||||
}
|
||||
}),
|
||||
);
|
||||
bufferGeodata = [];
|
||||
// leave spare connection for other queries
|
||||
|
@ -280,7 +279,7 @@ export class MapRepository implements IMapRepository {
|
|||
}
|
||||
}
|
||||
|
||||
await this.dataSource.manager.insert(GeodataPlacesTempEntity, bufferGeodata);
|
||||
await this.db.insertInto('geodata_places_tmp').values(bufferGeodata).execute();
|
||||
}
|
||||
|
||||
private async loadAdmin(filePath: string) {
|
||||
|
@ -303,24 +302,28 @@ export class MapRepository implements IMapRepository {
|
|||
|
||||
private createGeodataIndices() {
|
||||
return Promise.all([
|
||||
this.dataSource.query(`ALTER TABLE geodata_places_tmp ADD PRIMARY KEY (id) WITH (FILLFACTOR = 100)`),
|
||||
this.dataSource.query(`
|
||||
CREATE INDEX IDX_geodata_gist_earthcoord_${randomUUID().replaceAll('-', '_')}
|
||||
ON geodata_places_tmp
|
||||
USING gist (ll_to_earth_public(latitude, longitude))
|
||||
WITH (fillfactor = 100)`),
|
||||
this.dataSource.query(`
|
||||
CREATE INDEX idx_geodata_places_name_${randomUUID().replaceAll('-', '_')}
|
||||
ON geodata_places_tmp
|
||||
USING gin (f_unaccent(name) gin_trgm_ops)`),
|
||||
this.dataSource.query(`
|
||||
CREATE INDEX idx_geodata_places_admin1_name_${randomUUID().replaceAll('-', '_')}
|
||||
ON geodata_places_tmp
|
||||
USING gin (f_unaccent("admin1Name") gin_trgm_ops)`),
|
||||
this.dataSource.query(`
|
||||
CREATE INDEX idx_geodata_places_admin2_name_${randomUUID().replaceAll('-', '_')}
|
||||
ON geodata_places_tmp
|
||||
USING gin (f_unaccent("admin2Name") gin_trgm_ops)`),
|
||||
sql`ALTER TABLE geodata_places_tmp ADD PRIMARY KEY (id) WITH (FILLFACTOR = 100)`.execute(this.db),
|
||||
sql`
|
||||
CREATE INDEX IDX_geodata_gist_earthcoord_${sql.raw(randomUUID().replaceAll('-', '_'))}
|
||||
ON geodata_places_tmp
|
||||
USING gist (ll_to_earth_public(latitude, longitude))
|
||||
WITH (fillfactor = 100)
|
||||
`.execute(this.db),
|
||||
this.db.schema
|
||||
.createIndex(`idx_geodata_places_country_code_${randomUUID().replaceAll('-', '_')}`)
|
||||
.on('geodata_places_tmp')
|
||||
.using('gin (f_unaccent(name) gin_trgm_ops)')
|
||||
.execute(),
|
||||
this.db.schema
|
||||
.createIndex(`idx_geodata_places_country_code_${randomUUID().replaceAll('-', '_')}`)
|
||||
.on('geodata_places_tmp')
|
||||
.using('gin (f_unaccent("admin1Name") gin_trgm_ops)')
|
||||
.execute(),
|
||||
this.db.schema
|
||||
.createIndex(`idx_geodata_places_admin2_name_${randomUUID().replaceAll('-', '_')}`)
|
||||
.on('geodata_places_tmp')
|
||||
.using('gin (f_unaccent("admin2Name") gin_trgm_ops)')
|
||||
.execute(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue