0
Fork 0
mirror of https://github.com/withastro/astro.git synced 2024-12-30 22:03:56 -05:00

Moves content layer sync to a queue and support selective sync (#11767)

* wip: allow integrations to refresh contel layer

* Use queue for sync jobs

* Remove integration-specific code

* Fix type

* changeset
This commit is contained in:
Matt Kane 2024-08-29 15:22:40 +01:00 committed by GitHub
parent 17f71278f4
commit d1bd1a11f7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 49 additions and 36 deletions

View file

@ -0,0 +1,5 @@
---
'astro': patch
---
Refactors content layer sync to use a queue

View file

@ -152,6 +152,7 @@
"esbuild": "^0.21.5", "esbuild": "^0.21.5",
"estree-walker": "^3.0.3", "estree-walker": "^3.0.3",
"fast-glob": "^3.3.2", "fast-glob": "^3.3.2",
"fastq": "^1.17.1",
"flattie": "^1.1.1", "flattie": "^1.1.1",
"github-slugger": "^2.0.0", "github-slugger": "^2.0.0",
"gray-matter": "^4.0.3", "gray-matter": "^4.0.3",

View file

@ -3242,6 +3242,11 @@ export interface SSRLoadedRenderer extends Pick<AstroRenderer, 'name' | 'clientE
ssr: SSRLoadedRendererValue; ssr: SSRLoadedRendererValue;
} }
export interface RefreshContentOptions {
loaders?: Array<string>;
context?: Record<string, any>;
}
export type HookParameters< export type HookParameters<
Hook extends keyof AstroIntegration['hooks'], Hook extends keyof AstroIntegration['hooks'],
Fn = AstroIntegration['hooks'][Hook], Fn = AstroIntegration['hooks'][Hook],

View file

@ -1,9 +1,10 @@
import { promises as fs, existsSync } from 'node:fs'; import { promises as fs, existsSync } from 'node:fs';
import { isAbsolute } from 'node:path'; import { isAbsolute } from 'node:path';
import { fileURLToPath } from 'node:url'; import { fileURLToPath } from 'node:url';
import * as fastq from 'fastq';
import type { FSWatcher } from 'vite'; import type { FSWatcher } from 'vite';
import xxhash from 'xxhash-wasm'; import xxhash from 'xxhash-wasm';
import type { AstroSettings, ContentEntryType } from '../@types/astro.js'; import type { AstroSettings, ContentEntryType, RefreshContentOptions } from '../@types/astro.js';
import { AstroUserError } from '../core/errors/errors.js'; import { AstroUserError } from '../core/errors/errors.js';
import type { Logger } from '../core/logger/core.js'; import type { Logger } from '../core/logger/core.js';
import { import {
@ -38,7 +39,8 @@ export class ContentLayer {
#generateDigest?: (data: Record<string, unknown> | string) => string; #generateDigest?: (data: Record<string, unknown> | string) => string;
#loading = false; #queue: fastq.queueAsPromised<RefreshContentOptions, void>;
constructor({ settings, logger, store, watcher }: ContentLayerOptions) { constructor({ settings, logger, store, watcher }: ContentLayerOptions) {
// The default max listeners is 10, which can be exceeded when using a lot of loaders // The default max listeners is 10, which can be exceeded when using a lot of loaders
watcher?.setMaxListeners(50); watcher?.setMaxListeners(50);
@ -47,13 +49,14 @@ export class ContentLayer {
this.#store = store; this.#store = store;
this.#settings = settings; this.#settings = settings;
this.#watcher = watcher; this.#watcher = watcher;
this.#queue = fastq.promise(this.#doSync.bind(this), 1);
} }
/** /**
* Whether the content layer is currently loading content * Whether the content layer is currently loading content
*/ */
get loading() { get loading() {
return this.#loading; return !this.#queue.idle();
} }
/** /**
@ -62,11 +65,7 @@ export class ContentLayer {
watchContentConfig() { watchContentConfig() {
this.#unsubscribe?.(); this.#unsubscribe?.();
this.#unsubscribe = globalContentConfigObserver.subscribe(async (ctx) => { this.#unsubscribe = globalContentConfigObserver.subscribe(async (ctx) => {
if ( if (ctx.status === 'loaded' && ctx.config.digest !== this.#lastConfigDigest) {
!this.#loading &&
ctx.status === 'loaded' &&
ctx.config.digest !== this.#lastConfigDigest
) {
this.sync(); this.sync();
} }
}); });
@ -76,23 +75,6 @@ export class ContentLayer {
this.#unsubscribe?.(); this.#unsubscribe?.();
} }
/**
* Run the `load()` method of each collection's loader, which will load the data and save it in the data store.
* The loader itself is responsible for deciding whether this will clear and reload the full collection, or
* perform an incremental update. After the data is loaded, the data store is written to disk.
*/
async sync() {
if (this.#loading) {
return;
}
this.#loading = true;
try {
await this.#doSync();
} finally {
this.#loading = false;
}
}
async #getGenerateDigest() { async #getGenerateDigest() {
if (this.#generateDigest) { if (this.#generateDigest) {
return this.#generateDigest; return this.#generateDigest;
@ -113,10 +95,12 @@ export class ContentLayer {
collectionName, collectionName,
loaderName = 'content', loaderName = 'content',
parseData, parseData,
refreshContextData,
}: { }: {
collectionName: string; collectionName: string;
loaderName: string; loaderName: string;
parseData: LoaderContext['parseData']; parseData: LoaderContext['parseData'];
refreshContextData?: Record<string, unknown>;
}): Promise<LoaderContext> { }): Promise<LoaderContext> {
return { return {
collection: collectionName, collection: collectionName,
@ -127,6 +111,7 @@ export class ContentLayer {
parseData, parseData,
generateDigest: await this.#getGenerateDigest(), generateDigest: await this.#getGenerateDigest(),
watcher: this.#watcher, watcher: this.#watcher,
refreshContextData,
entryTypes: getEntryConfigByExtMap([ entryTypes: getEntryConfigByExtMap([
...this.#settings.contentEntryTypes, ...this.#settings.contentEntryTypes,
...this.#settings.dataEntryTypes, ...this.#settings.dataEntryTypes,
@ -134,7 +119,18 @@ export class ContentLayer {
}; };
} }
async #doSync() { /**
* Enqueues a sync job that runs the `load()` method of each collection's loader, which will load the data and save it in the data store.
* The loader itself is responsible for deciding whether this will clear and reload the full collection, or
* perform an incremental update. After the data is loaded, the data store is written to disk. Jobs are queued,
* so that only one sync can run at a time. The function returns a promise that resolves when this sync job is complete.
*/
sync(options: RefreshContentOptions = {}): Promise<void> {
return this.#queue.push(options);
}
async #doSync(options: RefreshContentOptions) {
const contentConfig = globalContentConfigObserver.get(); const contentConfig = globalContentConfigObserver.get();
const logger = this.#logger.forkIntegrationLogger('content'); const logger = this.#logger.forkIntegrationLogger('content');
if (contentConfig?.status !== 'loaded') { if (contentConfig?.status !== 'loaded') {
@ -180,6 +176,15 @@ export class ContentLayer {
} }
} }
// If loaders are specified, only sync the specified loaders
if (
options?.loaders &&
(typeof collection.loader !== 'object' ||
!options.loaders.includes(collection.loader.name))
) {
return;
}
const collectionWithResolvedSchema = { ...collection, schema }; const collectionWithResolvedSchema = { ...collection, schema };
const parseData: LoaderContext['parseData'] = async ({ id, data, filePath = '' }) => { const parseData: LoaderContext['parseData'] = async ({ id, data, filePath = '' }) => {
@ -213,6 +218,7 @@ export class ContentLayer {
collectionName: name, collectionName: name,
parseData, parseData,
loaderName: collection.loader.name, loaderName: collection.loader.name,
refreshContextData: options?.context,
}); });
if (typeof collection.loader === 'function') { if (typeof collection.loader === 'function') {
@ -293,18 +299,12 @@ export async function simpleLoader<TData extends { id: string }>(
function contentLayerSingleton() { function contentLayerSingleton() {
let instance: ContentLayer | null = null; let instance: ContentLayer | null = null;
return { return {
initialized: () => Boolean(instance),
init: (options: ContentLayerOptions) => { init: (options: ContentLayerOptions) => {
instance?.unwatchContentConfig(); instance?.unwatchContentConfig();
instance = new ContentLayer(options); instance = new ContentLayer(options);
return instance; return instance;
}, },
get: () => { get: () => instance,
if (!instance) {
throw new Error('Content layer not initialized');
}
return instance;
},
dispose: () => { dispose: () => {
instance?.unwatchContentConfig(); instance?.unwatchContentConfig();
instance = null; instance = null;

View file

@ -31,6 +31,7 @@ export interface LoaderContext {
/** When running in dev, this is a filesystem watcher that can be used to trigger updates */ /** When running in dev, this is a filesystem watcher that can be used to trigger updates */
watcher?: FSWatcher; watcher?: FSWatcher;
refreshContextData?: Record<string, unknown>;
entryTypes: Map<string, ContentEntryType>; entryTypes: Map<string, ContentEntryType>;
} }

View file

@ -185,9 +185,7 @@ export async function createContainerWithAutomaticRestart({
key: 's', key: 's',
description: 'sync content layer', description: 'sync content layer',
action: () => { action: () => {
if (globalContentLayer.initialized()) { globalContentLayer.get()?.sync();
globalContentLayer.get().sync();
}
}, },
}); });
} }

View file

@ -645,6 +645,9 @@ importers:
fast-glob: fast-glob:
specifier: ^3.3.2 specifier: ^3.3.2
version: 3.3.2 version: 3.3.2
fastq:
specifier: ^1.17.1
version: 1.17.1
flattie: flattie:
specifier: ^1.1.1 specifier: ^1.1.1
version: 1.1.1 version: 1.1.1