diff --git a/backend/dev/user.clj b/backend/dev/user.clj index 1b02c6ea0..bbbcac780 100644 --- a/backend/dev/user.clj +++ b/backend/dev/user.clj @@ -19,10 +19,11 @@ [app.common.schema.generators :as sg] [app.common.spec :as us] [app.common.transit :as t] + [app.common.types.file :as ctf] [app.common.uuid :as uuid] [app.config :as cfg] [app.main :as main] - [app.srepl.helpers] + [app.srepl.helpers :as srepl.helpers] [app.srepl.main :as srepl] [app.util.blob :as blob] [app.util.json :as json] @@ -48,7 +49,8 @@ [malli.generator :as mg] [malli.registry :as mr] [malli.transform :as mt] - [malli.util :as mu])) + [malli.util :as mu] + [promesa.exec :as px])) (repl/disable-reload! (find-ns 'integrant.core)) (set! *warn-on-reflection* true) @@ -176,4 +178,3 @@ [:map [:type [:= :b]] [:b :int]]]]]]]]) - diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index d148ccc46..03f56b2bb 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -299,6 +299,10 @@ :hint "database object not found")) row)) +(defn plan + [ds sql] + (jdbc/plan ds sql sql/default-opts)) + (defn get-by-id [ds table id & {:as opts}] (get ds table {:id id} opts)) diff --git a/backend/src/app/srepl/helpers.clj b/backend/src/app/srepl/helpers.clj index 13b399f1b..96fbe6c4c 100644 --- a/backend/src/app/srepl/helpers.clj +++ b/backend/src/app/srepl/helpers.clj @@ -12,9 +12,9 @@ [app.common.data :as d] [app.common.exceptions :as ex] [app.common.files.features :as ffeat] + [app.common.files.migrations :as pmg] [app.common.logging :as l] [app.common.pages :as cp] - [app.common.files.migrations :as pmg] [app.common.pprint :refer [pprint]] [app.common.spec :as us] [app.common.uuid :as uuid] @@ -31,9 +31,13 @@ [clojure.stacktrace :as strace] [clojure.walk :as walk] [cuerdas.core :as str] - [expound.alpha :as expound])) + [expound.alpha :as expound] + [promesa.core :as p] + [promesa.exec :as px] + [promesa.exec.csp :as sp])) (def ^:dynamic *conn*) +(def ^:dynamic *pool*) (defn reset-password! "Reset a password to a specific one for a concrete user or all users @@ -146,6 +150,90 @@ (when (fn? on-end) (on-end)))) +(defn- println! + [& params] + (locking println + (apply println params))) + +(defn process-files! + "Apply a function to all files in the database, reading them in + batches." + + [{:keys [::db/pool] :as system} & {:keys [chunk-size + max-items + workers + start-at + on-file + on-error + on-end + on-init] + :or {chunk-size 10 + workers 1}}] + + (letfn [(get-chunk [conn cursor] + (let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])] + [(some->> rows peek :created-at) (seq rows)])) + + (get-candidates [conn] + (->> (d/iteration (partial get-chunk conn) + :vf second + :kf first + :initk (or start-at (dt/now))) + (take max-items))) + + + (on-error* [cause file] + (println! "unexpected exception happened on processing file: " (:id file)) + (strace/print-stack-trace cause)) + + (process-file [conn file] + (try + (binding [*conn* conn + pmap/*tracked* (atom {}) + pmap/*load-fn* (partial files/load-pointer conn (:id file)) + ffeat/*wrap-with-pointer-map-fn* + (if (contains? (:features file) "storage/pointer-map") pmap/wrap identity) + ffeat/*wrap-with-objects-map-fn* + (if (contains? (:features file) "storage/objectd-map") omap/wrap identity)] + (on-file file)) + (catch Throwable cause + ((or on-error on-error*) cause file)))) + + (run-worker [in index] + (db/with-atomic [conn pool] + (loop [i 0] + (when-let [file (sp/take! in)] + (println! "=> worker: index:" index "| loop:" i "| file:" (:id file) "|" (px/get-name)) + (process-file conn file) + (recur (inc i)))))) + + (run-producer [input] + (db/with-atomic [conn pool] + (doseq [file (get-candidates conn)] + (println! "=> producer:" (:id file) "|" (px/get-name)) + (sp/put! input file)) + (sp/close! input))) + + (start-worker [input index] + (px/thread + {:name (str "penpot/srepl/worker/" index)} + (run-worker input index))) + ] + + (when (fn? on-init) (on-init)) + + (let [input (sp/chan :buf chunk-size) + producer (px/thread + {:name "penpot/srepl/producer"} + (run-producer input)) + threads (->> (range workers) + (map (partial start-worker input)) + (cons producer) + (doall))] + + (run! p/await! threads) + (when (fn? on-end) (on-end))))) + (defn update-pages "Apply a function to all pages of one file. The function receives a page and returns an updated page." [data f]