From fb907d707de8102ca29d1603d7f9b325cf5a8d2f Mon Sep 17 00:00:00 2001 From: Zack Pollard Date: Thu, 27 Feb 2025 14:22:02 +0000 Subject: [PATCH] refactor: use new updateId column for user CUD sync (#16384) --- server/src/db.d.ts | 1 + server/src/entities/user-audit.entity.ts | 10 +++--- ...740595460866-UsersAuditUuidv7PrimaryKey.ts | 26 ++++++++++++++ server/src/repositories/sync.repository.ts | 35 ++++--------------- server/src/services/sync.service.ts | 8 ++--- server/src/types.ts | 3 +- server/src/utils/sync.ts | 14 ++++---- 7 files changed, 50 insertions(+), 47 deletions(-) create mode 100644 server/src/migrations/1740595460866-UsersAuditUuidv7PrimaryKey.ts diff --git a/server/src/db.d.ts b/server/src/db.d.ts index ff4cb4a1d2..7fb073d8ce 100644 --- a/server/src/db.d.ts +++ b/server/src/db.d.ts @@ -415,6 +415,7 @@ export interface Users { } export interface UsersAudit { + id: Generated; userId: string; deletedAt: Generated; } diff --git a/server/src/entities/user-audit.entity.ts b/server/src/entities/user-audit.entity.ts index 305994a6d6..c29bc94d97 100644 --- a/server/src/entities/user-audit.entity.ts +++ b/server/src/entities/user-audit.entity.ts @@ -1,14 +1,14 @@ -import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm'; +import { Column, CreateDateColumn, Entity, Index, PrimaryColumn } from 'typeorm'; @Entity('users_audit') -@Index('IDX_users_audit_deleted_at_asc_user_id_asc', ['deletedAt', 'userId']) export class UserAuditEntity { - @PrimaryGeneratedColumn('increment') - id!: number; + @PrimaryColumn({ type: 'uuid', nullable: false, default: () => 'immich_uuid_v7()' }) + id!: string; @Column({ type: 'uuid' }) userId!: string; - @CreateDateColumn({ type: 'timestamptz' }) + @Index('IDX_users_audit_deleted_at') + @CreateDateColumn({ type: 'timestamptz', default: () => 'clock_timestamp()' }) deletedAt!: Date; } diff --git a/server/src/migrations/1740595460866-UsersAuditUuidv7PrimaryKey.ts b/server/src/migrations/1740595460866-UsersAuditUuidv7PrimaryKey.ts new file mode 100644 index 0000000000..997f718fd9 --- /dev/null +++ b/server/src/migrations/1740595460866-UsersAuditUuidv7PrimaryKey.ts @@ -0,0 +1,26 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class UsersAuditUuidv7PrimaryKey1740595460866 implements MigrationInterface { + name = 'UsersAuditUuidv7PrimaryKey1740595460866' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IDX_users_audit_deleted_at_asc_user_id_asc"`); + await queryRunner.query(`ALTER TABLE "users_audit" DROP CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180"`); + await queryRunner.query(`ALTER TABLE "users_audit" DROP COLUMN "id"`); + await queryRunner.query(`ALTER TABLE "users_audit" ADD "id" uuid NOT NULL DEFAULT immich_uuid_v7()`); + await queryRunner.query(`ALTER TABLE "users_audit" ADD CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180" PRIMARY KEY ("id")`); + await queryRunner.query(`ALTER TABLE "users_audit" ALTER COLUMN "deletedAt" SET DEFAULT clock_timestamp()`) + await queryRunner.query(`CREATE INDEX "IDX_users_audit_deleted_at" ON "users_audit" ("deletedAt")`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IDX_users_audit_deleted_at"`); + await queryRunner.query(`ALTER TABLE "users_audit" DROP CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180"`); + await queryRunner.query(`ALTER TABLE "users_audit" DROP COLUMN "id"`); + await queryRunner.query(`ALTER TABLE "users_audit" ADD "id" SERIAL NOT NULL`); + await queryRunner.query(`ALTER TABLE "users_audit" ADD CONSTRAINT "PK_e9b2bdfd90e7eb5961091175180" PRIMARY KEY ("id")`); + await queryRunner.query(`ALTER TABLE "users_audit" ALTER COLUMN "deletedAt" SET DEFAULT now()`); + await queryRunner.query(`CREATE INDEX "IDX_users_audit_deleted_at_asc_user_id_asc" ON "users_audit" ("userId", "deletedAt") `); + } + +} diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index 4023bf890e..d1d0e9b8ee 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -1,7 +1,6 @@ import { Injectable } from '@nestjs/common'; import { Insertable, Kysely, sql } from 'kysely'; import { InjectKysely } from 'nestjs-kysely'; -import { columns } from 'src/database'; import { DB, SessionSyncCheckpoints } from 'src/db'; import { SyncEntityType } from 'src/enum'; import { SyncAck } from 'src/types'; @@ -41,39 +40,19 @@ export class SyncRepository { getUserUpserts(ack?: SyncAck) { return this.db .selectFrom('users') - .select(['id', 'name', 'email', 'deletedAt']) - .select(columns.ackEpoch('updatedAt')) - .$if(!!ack, (qb) => - qb.where((eb) => - eb.or([ - eb(eb.fn('to_timestamp', [sql.val(ack!.ackEpoch)]), '<', eb.ref('updatedAt')), - eb.and([ - eb(eb.fn('to_timestamp', [sql.val(ack!.ackEpoch)]), '<=', eb.ref('updatedAt')), - eb('id', '>', ack!.ids[0]), - ]), - ]), - ), - ) - .orderBy(['updatedAt asc', 'id asc']) + .select(['id', 'name', 'email', 'deletedAt', 'updateId']) + .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) + .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .orderBy(['updateId asc']) .stream(); } getUserDeletes(ack?: SyncAck) { return this.db .selectFrom('users_audit') - .select(['userId']) - .select(columns.ackEpoch('deletedAt')) - .$if(!!ack, (qb) => - qb.where((eb) => - eb.or([ - eb(eb.fn('to_timestamp', [sql.val(ack!.ackEpoch)]), '<', eb.ref('deletedAt')), - eb.and([ - eb(eb.fn('to_timestamp', [sql.val(ack!.ackEpoch)]), '<=', eb.ref('deletedAt')), - eb('userId', '>', ack!.ids[0]), - ]), - ]), - ), - ) + .select(['id', 'userId']) + .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) + .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) .orderBy(['deletedAt asc', 'userId asc']) .stream(); } diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index 98e4d5fb09..b756c11ef4 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -87,13 +87,13 @@ export class SyncService extends BaseService { switch (type) { case SyncRequestType.UsersV1: { const deletes = this.syncRepository.getUserDeletes(checkpointMap[SyncEntityType.UserDeleteV1]); - for await (const { ackEpoch, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.UserDeleteV1, ackEpoch, ids: [data.userId], data })); + for await (const { id, ...data } of deletes) { + response.write(serialize({ type: SyncEntityType.UserDeleteV1, updateId: id, data })); } const upserts = this.syncRepository.getUserUpserts(checkpointMap[SyncEntityType.UserV1]); - for await (const { ackEpoch, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.UserV1, ackEpoch, ids: [data.id], data })); + for await (const { updateId, ...data } of upserts) { + response.write(serialize({ type: SyncEntityType.UserV1, updateId, data })); } break; diff --git a/server/src/types.ts b/server/src/types.ts index 3aa7a14add..5360e519bd 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -421,6 +421,5 @@ export interface IBulkAsset { export type SyncAck = { type: SyncEntityType; - ackEpoch: string; - ids: string[]; + updateId: string; }; diff --git a/server/src/utils/sync.ts b/server/src/utils/sync.ts index 8e426ab860..cfb6660bdc 100644 --- a/server/src/utils/sync.ts +++ b/server/src/utils/sync.ts @@ -9,22 +9,20 @@ type Impossible = { type Exact = U & Impossible>; export const fromAck = (ack: string): SyncAck => { - const [type, timestamp, ...ids] = ack.split('|'); - return { type: type as SyncEntityType, ackEpoch: timestamp, ids }; + const [type, updateId] = ack.split('|'); + return { type: type as SyncEntityType, updateId }; }; -export const toAck = ({ type, ackEpoch, ids }: SyncAck) => [type, ackEpoch, ...ids].join('|'); +export const toAck = ({ type, updateId }: SyncAck) => [type, updateId].join('|'); export const mapJsonLine = (object: unknown) => JSON.stringify(object) + '\n'; export const serialize = ({ type, - ackEpoch, - ids, + updateId, data, }: { type: T; - ackEpoch: string; - ids: string[]; + updateId: string; data: Exact; -}) => mapJsonLine({ type, data, ack: toAck({ type, ackEpoch, ids }) }); +}) => mapJsonLine({ type, data, ack: toAck({ type, updateId }) });