0
Fork 0
mirror of https://github.com/logto-io/logto.git synced 2025-03-17 22:31:28 -05:00

feat(core,schemas): record daily active users (#4113)

This commit is contained in:
wangsijie 2023-07-07 15:14:29 +08:00 committed by GitHub
parent 04cf242e48
commit 5ccdd7f31a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 183 additions and 14 deletions

View file

@ -0,0 +1,6 @@
---
"@logto/core": minor
"@logto/schemas": minor
---
Record daily active users

View file

@ -19,10 +19,15 @@ const setExcluded = (...fields: IdentifierSqlToken[]) =>
sql`, `
);
type OnConflict = {
fields: IdentifierSqlToken[];
setExcludedFields: IdentifierSqlToken[];
};
type OnConflict =
| {
fields: IdentifierSqlToken[];
setExcludedFields: IdentifierSqlToken[];
ignore?: false;
}
| {
ignore: true;
};
type InsertIntoConfigReturning = {
returning: true;
@ -70,12 +75,15 @@ export const buildInsertIntoWithPool =
insertingKeys.map((key) => convertToPrimitiveOrSql(key, data[key] ?? null)),
sql`, `
)})
${conditionalSql(
onConflict,
({ fields, setExcludedFields }) => sql`
on conflict (${sql.join(fields, sql`, `)}) do update
set ${setExcluded(...setExcludedFields)}
`
${conditionalSql(onConflict, (config) =>
config.ignore
? sql`
on conflict do nothing
`
: sql`
on conflict (${sql.join(config.fields, sql`, `)}) do update
set ${setExcluded(...config.setExcludedFields)}
`
)}
${conditionalSql(returning, () => sql`returning *`)}
`);

View file

@ -0,0 +1,25 @@
import { MockQueries } from '#src/test-utils/tenant.js';
import { accessTokenIssuedListener } from './access-token.js';
const { jest } = import.meta;
const insertActiveUser = jest.fn();
const queries = new MockQueries({
dailyActiveUsers: { insertActiveUser },
});
describe('accessTokenIssuedListener()', () => {
afterEach(() => {
insertActiveUser.mockClear();
});
it('should call insertActiveUser if accountId exists', async () => {
await accessTokenIssuedListener({ accountId: 'accountId' }, queries);
expect(insertActiveUser).toHaveBeenCalled();
});
it('should not call insertActiveUser if no accountId', async () => {
await accessTokenIssuedListener({ accountId: '' }, queries);
expect(insertActiveUser).not.toHaveBeenCalled();
});
});

View file

@ -0,0 +1,24 @@
import { generateStandardId } from '@logto/shared';
import { getUtcStartOfToday } from '#src/oidc/utils.js';
import type Queries from '#src/tenants/Queries.js';
export const accessTokenIssuedListener = async (
accessToken: { accountId: string },
queries: Queries
) => {
const { accountId } = accessToken;
const { insertActiveUser } = queries.dailyActiveUsers;
if (!accountId) {
// Some kind of tokens may not have accountId, for example, the one issued for application
return;
}
// Mark this user as active today
await insertActiveUser({
id: generateStandardId(),
userId: accountId,
date: getUtcStartOfToday().getTime(),
});
};

View file

@ -1,4 +1,5 @@
import { createMockProvider } from '#src/test-utils/oidc-provider.js';
import { MockQueries } from '#src/test-utils/tenant.js';
import { grantListener, grantRevocationListener } from './grant.js';
import { addOidcEventListeners } from './index.js';
@ -14,7 +15,7 @@ describe('addOidcEventListeners', () => {
it('should add proper listeners', () => {
const provider = createMockProvider();
const addListener = jest.spyOn(provider, 'addListener');
addOidcEventListeners(provider);
addOidcEventListeners(provider, new MockQueries());
expect(addListener).toHaveBeenCalledWith('grant.success', grantListener);
expect(addListener).toHaveBeenCalledWith('grant.error', grantListener);
expect(addListener).toHaveBeenCalledWith('grant.revoked', grantRevocationListener);

View file

@ -1,7 +1,9 @@
import type Provider from 'oidc-provider';
import type Queries from '#src/tenants/Queries.js';
import { consoleLog } from '#src/utils/console.js';
import { accessTokenIssuedListener } from './access-token.js';
import { grantListener, grantRevocationListener } from './grant.js';
import { interactionEndedListener, interactionStartedListener } from './interaction.js';
@ -9,10 +11,13 @@ import { interactionEndedListener, interactionStartedListener } from './interact
* @see {@link https://github.com/panva/node-oidc-provider/blob/v7.x/docs/README.md#im-getting-a-client-authentication-failed-error-with-no-details Getting auth error with no details?}
* @see {@link https://github.com/panva/node-oidc-provider/blob/v7.x/docs/events.md OIDC Provider events}
*/
export const addOidcEventListeners = (provider: Provider) => {
export const addOidcEventListeners = (provider: Provider, queries: Queries) => {
provider.addListener('grant.success', grantListener);
provider.addListener('grant.error', grantListener);
provider.addListener('grant.revoked', grantRevocationListener);
provider.addListener('access_token.issued', async (token) => {
return accessTokenIssuedListener(token, queries);
});
provider.addListener('interaction.started', interactionStartedListener);
provider.addListener('interaction.ended', interactionEndedListener);
provider.addListener('server_error', (_, error) => {

View file

@ -47,6 +47,7 @@ export default function initOidc(
const {
resources: { findResourceByIndicator, findDefaultResource },
users: { findUserById },
dailyActiveUsers: { insertActiveUser },
} = queries;
const { findUserScopesForResourceIndicator } = libraries.users;
const { findApplicationScopesForResourceIndicator } = libraries.applications;
@ -300,7 +301,7 @@ export default function initOidc(
},
});
addOidcEventListeners(oidc);
addOidcEventListeners(oidc, queries);
// Provide audit log context for event listeners
oidc.use(koaAuditLog(queries));

View file

@ -64,3 +64,9 @@ export const isOriginAllowed = (
return [...corsAllowedOrigins, ...redirectUriOrigins].includes(origin);
};
export const getUtcStartOfToday = () => {
const now = new Date();
return new Date(Date.UTC(now.getUTCFullYear(), now.getUTCMonth(), now.getUTCDate(), 0, 0, 0, 0));
};

View file

@ -0,0 +1,14 @@
import { DailyActiveUsers } from '@logto/schemas';
import type { CommonQueryMethods } from 'slonik';
import { buildInsertIntoWithPool } from '#src/database/insert-into.js';
export const createDailyActiveUsersQueries = (pool: CommonQueryMethods) => {
const insertActiveUser = buildInsertIntoWithPool(pool)(DailyActiveUsers, {
onConflict: { ignore: true },
});
return {
insertActiveUser,
};
};

View file

@ -5,6 +5,7 @@ import { createApplicationQueries } from '#src/queries/application.js';
import { createApplicationsRolesQueries } from '#src/queries/applications-roles.js';
import { createConnectorQueries } from '#src/queries/connector.js';
import { createCustomPhraseQueries } from '#src/queries/custom-phrase.js';
import { createDailyActiveUsersQueries } from '#src/queries/daily-active-user.js';
import { createDomainsQueries } from '#src/queries/domains.js';
import { createHooksQueries } from '#src/queries/hooks.js';
import { createLogQueries } from '#src/queries/log.js';
@ -39,6 +40,7 @@ export default class Queries {
verificationStatuses = createVerificationStatusQueries(this.pool);
hooks = createHooksQueries(this.pool);
domains = createDomainsQueries(this.pool);
dailyActiveUsers = createDailyActiveUsersQueries(this.pool);
constructor(
public readonly pool: CommonQueryMethods,

View file

@ -0,0 +1,61 @@
import { type CommonQueryMethods, sql } from 'slonik';
import type { AlterationScript } from '../lib/types/alteration.js';
const getId = (value: string) => sql.identifier([value]);
const getDatabaseName = async (pool: CommonQueryMethods) => {
const { currentDatabase } = await pool.one<{ currentDatabase: string }>(sql`
select current_database();
`);
return currentDatabase.replaceAll('-', '_');
};
const alteration: AlterationScript = {
up: async (pool) => {
const database = await getDatabaseName(pool);
const baseRoleId = getId(`logto_tenant_${database}`);
await pool.query(sql`
create table daily_active_users (
id varchar(21) not null,
tenant_id varchar(21) not null
references tenants (id) on update cascade on delete cascade,
user_id varchar(21) not null,
date timestamptz not null,
primary key (id),
constraint daily_active_users__user_id_date
unique (user_id, date)
);
create index daily_active_users__id
on daily_active_users (tenant_id, id);
create trigger set_tenant_id before insert on daily_active_users
for each row execute procedure set_tenant_id();
alter table daily_active_users enable row level security;
create policy daily_active_users_tenant_id on daily_active_users
as restrictive
using (tenant_id = (select id from tenants where db_user = current_user));
create policy daily_active_users_modification on daily_active_users
using (true);
grant select, insert, update, delete on daily_active_users to ${baseRoleId};
`);
},
down: async (pool) => {
await pool.query(sql`
drop policy daily_active_users_tenant_id on daily_active_users;
drop policy daily_active_users_modification on daily_active_users;
alter table daily_active_users disable row level security;
drop table daily_active_users;
`);
},
};
export default alteration;

View file

@ -0,0 +1,13 @@
create table daily_active_users (
id varchar(21) not null,
tenant_id varchar(21) not null
references tenants (id) on update cascade on delete cascade,
user_id varchar(21) not null,
date timestamptz not null,
primary key (id),
constraint daily_active_users__user_id_date
unique (user_id, date)
);
create index daily_active_users__id
on daily_active_users (tenant_id, id);

View file

@ -46,7 +46,10 @@ export const convertToPrimitiveOrSql = (
return JSON.stringify(value);
}
if (['_at', 'At'].some((value) => key.endsWith(value)) && typeof value === 'number') {
if (
(['_at', 'At'].some((value) => key.endsWith(value)) || key === 'date') &&
typeof value === 'number'
) {
return sql`to_timestamp(${value}::double precision / 1000)`;
}