0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-03-11 02:23:09 -05:00

refactor: use new updateId column for user CUD sync (#16384)

This commit is contained in:
Zack Pollard 2025-02-27 14:22:02 +00:00 committed by GitHub
parent 7d6cfd09e6
commit fb907d707d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 50 additions and 47 deletions

1
server/src/db.d.ts vendored
View file

@ -415,6 +415,7 @@ export interface Users {
}
export interface UsersAudit {
id: Generated<string>;
userId: string;
deletedAt: Generated<Timestamp>;
}

View file

@ -1,14 +1,14 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';
import { Column, CreateDateColumn, Entity, Index, PrimaryColumn } from 'typeorm';
@Entity('users_audit')
@Index('IDX_users_audit_deleted_at_asc_user_id_asc', ['deletedAt', 'userId'])
export class UserAuditEntity {
@PrimaryGeneratedColumn('increment')
id!: number;
@PrimaryColumn({ type: 'uuid', nullable: false, default: () => 'immich_uuid_v7()' })
id!: string;
@Column({ type: 'uuid' })
userId!: string;
@CreateDateColumn({ type: 'timestamptz' })
@Index('IDX_users_audit_deleted_at')
@CreateDateColumn({ type: 'timestamptz', default: () => 'clock_timestamp()' })
deletedAt!: Date;
}

View file

@ -0,0 +1,26 @@
import { MigrationInterface, QueryRunner } from "typeorm";
export class UsersAuditUuidv7PrimaryKey1740595460866 implements MigrationInterface {
name = 'UsersAuditUuidv7PrimaryKey1740595460866'
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "public"."IDX_users_audit_deleted_at_asc_user_id_asc"`);
await queryRunner.query(`ALTER TABLE "users_audit" DROP CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180"`);
await queryRunner.query(`ALTER TABLE "users_audit" DROP COLUMN "id"`);
await queryRunner.query(`ALTER TABLE "users_audit" ADD "id" uuid NOT NULL DEFAULT immich_uuid_v7()`);
await queryRunner.query(`ALTER TABLE "users_audit" ADD CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180" PRIMARY KEY ("id")`);
await queryRunner.query(`ALTER TABLE "users_audit" ALTER COLUMN "deletedAt" SET DEFAULT clock_timestamp()`)
await queryRunner.query(`CREATE INDEX "IDX_users_audit_deleted_at" ON "users_audit" ("deletedAt")`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "public"."IDX_users_audit_deleted_at"`);
await queryRunner.query(`ALTER TABLE "users_audit" DROP CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180"`);
await queryRunner.query(`ALTER TABLE "users_audit" DROP COLUMN "id"`);
await queryRunner.query(`ALTER TABLE "users_audit" ADD "id" SERIAL NOT NULL`);
await queryRunner.query(`ALTER TABLE "users_audit" ADD CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180" PRIMARY KEY ("id")`);
await queryRunner.query(`ALTER TABLE "users_audit" ALTER COLUMN "deletedAt" SET DEFAULT now()`);
await queryRunner.query(`CREATE INDEX "IDX_users_audit_deleted_at_asc_user_id_asc" ON "users_audit" ("userId", "deletedAt") `);
}
}

View file

@ -1,7 +1,6 @@
import { Injectable } from '@nestjs/common';
import { Insertable, Kysely, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { columns } from 'src/database';
import { DB, SessionSyncCheckpoints } from 'src/db';
import { SyncEntityType } from 'src/enum';
import { SyncAck } from 'src/types';
@ -41,39 +40,19 @@ export class SyncRepository {
getUserUpserts(ack?: SyncAck) {
return this.db
.selectFrom('users')
.select(['id', 'name', 'email', 'deletedAt'])
.select(columns.ackEpoch('updatedAt'))
.$if(!!ack, (qb) =>
qb.where((eb) =>
eb.or([
eb(eb.fn<Date>('to_timestamp', [sql.val(ack!.ackEpoch)]), '<', eb.ref('updatedAt')),
eb.and([
eb(eb.fn<Date>('to_timestamp', [sql.val(ack!.ackEpoch)]), '<=', eb.ref('updatedAt')),
eb('id', '>', ack!.ids[0]),
]),
]),
),
)
.orderBy(['updatedAt asc', 'id asc'])
.select(['id', 'name', 'email', 'deletedAt', 'updateId'])
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['updateId asc'])
.stream();
}
getUserDeletes(ack?: SyncAck) {
return this.db
.selectFrom('users_audit')
.select(['userId'])
.select(columns.ackEpoch('deletedAt'))
.$if(!!ack, (qb) =>
qb.where((eb) =>
eb.or([
eb(eb.fn<Date>('to_timestamp', [sql.val(ack!.ackEpoch)]), '<', eb.ref('deletedAt')),
eb.and([
eb(eb.fn<Date>('to_timestamp', [sql.val(ack!.ackEpoch)]), '<=', eb.ref('deletedAt')),
eb('userId', '>', ack!.ids[0]),
]),
]),
),
)
.select(['id', 'userId'])
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['deletedAt asc', 'userId asc'])
.stream();
}

View file

@ -87,13 +87,13 @@ export class SyncService extends BaseService {
switch (type) {
case SyncRequestType.UsersV1: {
const deletes = this.syncRepository.getUserDeletes(checkpointMap[SyncEntityType.UserDeleteV1]);
for await (const { ackEpoch, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.UserDeleteV1, ackEpoch, ids: [data.userId], data }));
for await (const { id, ...data } of deletes) {
response.write(serialize({ type: SyncEntityType.UserDeleteV1, updateId: id, data }));
}
const upserts = this.syncRepository.getUserUpserts(checkpointMap[SyncEntityType.UserV1]);
for await (const { ackEpoch, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.UserV1, ackEpoch, ids: [data.id], data }));
for await (const { updateId, ...data } of upserts) {
response.write(serialize({ type: SyncEntityType.UserV1, updateId, data }));
}
break;

View file

@ -421,6 +421,5 @@ export interface IBulkAsset {
export type SyncAck = {
type: SyncEntityType;
ackEpoch: string;
ids: string[];
updateId: string;
};

View file

@ -9,22 +9,20 @@ type Impossible<K extends keyof any> = {
type Exact<T, U extends T = T> = U & Impossible<Exclude<keyof U, keyof T>>;
export const fromAck = (ack: string): SyncAck => {
const [type, timestamp, ...ids] = ack.split('|');
return { type: type as SyncEntityType, ackEpoch: timestamp, ids };
const [type, updateId] = ack.split('|');
return { type: type as SyncEntityType, updateId };
};
export const toAck = ({ type, ackEpoch, ids }: SyncAck) => [type, ackEpoch, ...ids].join('|');
export const toAck = ({ type, updateId }: SyncAck) => [type, updateId].join('|');
export const mapJsonLine = (object: unknown) => JSON.stringify(object) + '\n';
export const serialize = <T extends keyof SyncItem, D extends SyncItem[T]>({
type,
ackEpoch,
ids,
updateId,
data,
}: {
type: T;
ackEpoch: string;
ids: string[];
updateId: string;
data: Exact<SyncItem[T], D>;
}) => mapJsonLine({ type, data, ack: toAck({ type, ackEpoch, ids }) });
}) => mapJsonLine({ type, data, ack: toAck({ type, updateId }) });