From 2698944ec7c451c2f5d9bbd5fa59d7f9d812d51d Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 5 Jan 2024 11:45:05 +0100 Subject: [PATCH] :sparkles: Add proper file iteration on srepl helpers --- backend/src/app/srepl/helpers.clj | 118 +++++++++++++----------------- backend/src/app/srepl/main.clj | 108 +++++++++++++-------------- 2 files changed, 103 insertions(+), 123 deletions(-) diff --git a/backend/src/app/srepl/helpers.clj b/backend/src/app/srepl/helpers.clj index c135d3e76..f42fcf26c 100644 --- a/backend/src/app/srepl/helpers.clj +++ b/backend/src/app/srepl/helpers.clj @@ -24,7 +24,7 @@ [app.db :as db] [app.db.sql :as sql] [app.features.fdata :as feat.fdata] - [app.main :refer [system]] + [app.main :as main] [app.rpc.commands.files :as files] [app.rpc.commands.files-update :as files-update] [app.util.blob :as blob] @@ -55,16 +55,17 @@ (defn reset-file-data! "Hardcode replace of the data of one file." - [system id data] - (db/tx-run! system (fn [system] - (db/update! system :file - {:data data} - {:id id})))) + [id data] + (db/tx-run! main/system + (fn [system] + (db/update! system :file + {:data data} + {:id id})))) (defn get-file "Get the migrated data of one file." - [system id & {:keys [migrate?] :or {migrate? true}}] - (db/run! system + [id & {:keys [migrate?] :or {migrate? true}}] + (db/run! main/system (fn [system] (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] (-> (files/get-file system id :migrate? migrate?) @@ -73,8 +74,8 @@ (defn validate "Validate structure, referencial integrity and semantic coherence of all contents of a file. Returns a list of errors." - [system id] - (db/tx-run! system + [id] + (db/tx-run! main/system (fn [{:keys [::db/conn] :as system}] (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] (let [id (if (string? id) (parse-uuid id) id) @@ -90,8 +91,8 @@ (defn repair! "Repair the list of errors detected by validation." - [system id] - (db/tx-run! system + [id] + (db/tx-run! main/system (fn [{:keys [::db/conn] :as system}] (binding [pmap/*tracked* (pmap/create-tracked) pmap/*load-fn* (partial feat.fdata/load-pointer system id)] @@ -127,8 +128,8 @@ (defn update-file! "Apply a function to the data of one file. Optionally save the changes or not. The function receives the decoded and migrated file data." - [system & {:keys [update-fn id rollback? migrate? inc-revn?] - :or {rollback? true migrate? true inc-revn? true}}] + [& {:keys [update-fn id rollback? migrate? inc-revn?] + :or {rollback? true migrate? true inc-revn? true}}] (letfn [(process-file [{:keys [::db/conn] :as system} {:keys [features] :as file}] (binding [pmap/*tracked* (pmap/create-tracked) pmap/*load-fn* (partial feat.fdata/load-pointer system id) @@ -153,7 +154,7 @@ (dissoc file :data)))] - (db/tx-run! system + (db/tx-run! main/system (fn [system] (binding [*system* system] (try @@ -163,6 +164,12 @@ (when rollback? (db/rollback! system))))))))) + +(def ^:private sql:get-file-ids + "SELECT id FROM file + WHERE created_at < ? AND deleted_at is NULL + ORDER BY created_at DESC") + (defn analyze-files "Apply a function to all files in the database, reading them in batches. Do not change data. @@ -171,21 +178,11 @@ and the previous state and returns the new state. Emits rollback at the end of operation." - [system & {:keys [chunk-size max-items start-at on-file on-error on-end on-init with-libraries?] - :or {chunk-size 10 max-items Long/MAX_VALUE}}] - (letfn [(get-chunk [conn cursor] - (let [sql (str "SELECT id, created_at FROM file " - " WHERE created_at < ? AND deleted_at is NULL " - " ORDER BY created_at desc LIMIT ?") - rows (db/exec! conn [sql cursor chunk-size])] - [(some->> rows peek :created-at) (map :id rows)])) - - (get-candidates [conn] - (->> (d/iteration (partial get-chunk conn) - :vf second - :kf first - :initk (or start-at (dt/now))) - (take max-items))) + [& {:keys [max-items start-at on-file on-error on-end on-init with-libraries?]}] + (letfn [(get-candidates [conn] + (cond->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))]) + (some? max-items) + (take max-items))) (on-error* [cause file] (println "unexpected exception happened on processing file: " (:id file)) @@ -210,12 +207,13 @@ (catch Throwable cause ((or on-error on-error*) cause file)))))] - (db/tx-run! system + (db/tx-run! main/system (fn [{:keys [::db/conn] :as system}] (try (binding [*system* system] (when (fn? on-init) (on-init)) - (run! (partial process-file system) (get-candidates conn))) + (run! (partial process-file system) + (get-candidates conn))) (finally (when (fn? on-end) (ex/ignoring (on-end))) @@ -224,33 +222,20 @@ (defn process-files! "Apply a function to all files in the database, reading them in batches." - - [system & {:keys [chunk-size - max-items - workers - start-at - on-file - on-error - on-end - on-init - rollback?] - :or {chunk-size 10 - max-items Long/MAX_VALUE - workers 1 - rollback? true}}] - (letfn [(get-chunk [conn cursor] - (let [sql (str "SELECT id, created_at FROM file " - " WHERE created_at < ? AND deleted_at is NULL " - " ORDER BY created_at desc LIMIT ?") - rows (db/exec! conn [sql cursor chunk-size])] - [(some->> rows peek :created-at) (map :id rows)])) - - (get-candidates [conn] - (->> (d/iteration (partial get-chunk conn) - :vf second - :kf first - :initk (or start-at (dt/now))) - (take max-items))) + [& {:keys [max-items + workers + start-at + on-file + on-error + on-end + on-init + rollback?] + :or {workers 1 + rollback? true}}] + (letfn [(get-candidates [conn] + (cond->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))]) + (some? max-items) + (take max-items))) (on-error* [cause file] (println! "unexpected exception happened on processing file: " (:id file)) @@ -275,7 +260,7 @@ ((or on-error on-error*) cause file-id)))) (run-worker [in index] - (db/tx-run! system + (db/tx-run! main/system (fn [system] (binding [*system* system] (loop [i 0] @@ -288,15 +273,16 @@ (db/rollback! system))))) (run-producer [input] - (db/tx-run! system (fn [{:keys [::db/conn]}] - (doseq [file-id (get-candidates conn)] - (println! "=> producer:" file-id "|" (px/get-name)) - (sp/put! input file-id)) - (sp/close! input))))] + (db/tx-run! main/system + (fn [{:keys [::db/conn]}] + (doseq [file-id (get-candidates conn)] + (println! "=> producer:" file-id "|" (px/get-name)) + (sp/put! input file-id)) + (sp/close! input))))] (when (fn? on-init) (on-init)) - (let [input (sp/chan :buf chunk-size) + (let [input (sp/chan :buf 25) producer (px/thread {:name "penpot/srepl/producer"} (run-producer input)) diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index e7c3eb89d..d186bfe11 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -38,52 +38,45 @@ [cuerdas.core :as str])) (defn print-available-tasks - [system] - (let [tasks (:app.worker/registry system)] + [] + (let [tasks (:app.worker/registry main/system)] (p/pprint (keys tasks) :level 200))) (defn run-task! - ([system name] - (run-task! system name {})) - ([system name params] - (let [tasks (:app.worker/registry system)] - (if-let [task-fn (get tasks name)] + ([tname] + (run-task! tname {})) + ([tname params] + (let [tasks (:app.worker/registry main/system) + tname (if (keyword? tname) (name tname) name)] + (if-let [task-fn (get tasks tname)] (task-fn params) - (println (format "no task '%s' found" name)))))) + (println (format "no task '%s' found" tname)))))) (defn schedule-task! - ([system name] - (schedule-task! system name {})) - ([system name props] - (let [pool (:app.db/pool system)] + ([name] + (schedule-task! name {})) + ([name props] + (let [pool (:app.db/pool main/system)] (wrk/submit! ::wrk/conn pool ::wrk/task name ::wrk/props props)))) (defn send-test-email! - [system destination] - (us/verify! - :expr (some? system) - :hint "system should be provided") - + [destination] (us/verify! :expr (string? destination) :hint "destination should be provided") - (let [handler (:app.email/sendmail system)] + (let [handler (:app.email/sendmail main/system)] (handler {:body "test email" :subject "test email" :to [destination]}))) (defn resend-email-verification-email! - [system email] - (us/verify! - :expr (some? system) - :hint "system should be provided") - - (let [sprops (:app.setup/props system) - pool (:app.db/pool system) + [email] + (let [sprops (:app.setup/props main/system) + pool (:app.db/pool main/system) profile (profile/get-profile-by-email pool email)] (auth/send-email-verification! pool sprops profile) @@ -92,8 +85,8 @@ (defn mark-profile-as-active! "Mark the profile blocked and removes all the http sessiones associated with the profile-id." - [system email] - (db/with-atomic [conn (:app.db/pool system)] + [email] + (db/with-atomic [conn (:app.db/pool main/system)] (when-let [profile (db/get* conn :profile {:email (str/lower email)} {:columns [:id :email]})] @@ -104,8 +97,8 @@ (defn mark-profile-as-blocked! "Mark the profile blocked and removes all the http sessiones associated with the profile-id." - [system email] - (db/with-atomic [conn (:app.db/pool system)] + [email] + (db/with-atomic [conn (:app.db/pool main/system)] (when-let [profile (db/get* conn :profile {:email (str/lower email)} {:columns [:id :email]})] @@ -117,9 +110,9 @@ (defn reset-password! "Reset a password to a specific one for a concrete user or all users if email is `:all` keyword." - [system & {:keys [email password] :or {password "123123"} :as params}] + [& {:keys [email password] :or {password "123123"} :as params}] (us/verify! (contains? params :email) "`email` parameter is mandatory") - (db/with-atomic [conn (:app.db/pool system)] + (db/with-atomic [conn (:app.db/pool main/system)] (let [password (derive-password password)] (if (= email :all) (db/exec! conn ["update profile set password=?" password]) @@ -127,21 +120,21 @@ (db/exec! conn ["update profile set password=? where email=?" password email])))))) (defn enable-objects-map-feature-on-file! - [system & {:keys [save? id]}] - (h/update-file! system + [& {:keys [save? id]}] + (h/update-file! main/system :id id :update-fn features.fdata/enable-objects-map :save? save?)) (defn enable-pointer-map-feature-on-file! - [system & {:keys [save? id]}] - (h/update-file! system + [& {:keys [save? id]}] + (h/update-file! main/system :id id :update-fn features.fdata/enable-pointer-map :save? save?)) (defn enable-team-feature! - [system team-id feature] + [team-id feature] (dm/verify! "feature should be supported" (contains? cfeat/supported-features feature)) @@ -149,7 +142,7 @@ (let [team-id (if (string? team-id) (parse-uuid team-id) team-id)] - (db/tx-run! system + (db/tx-run! main/system (fn [{:keys [::db/conn]}] (let [team (-> (db/get conn :team {:id team-id}) (update :features db/decode-pgarray #{})) @@ -161,7 +154,7 @@ :enabled)))))) (defn disable-team-feature! - [system team-id feature] + [team-id feature] (dm/verify! "feature should be supported" (contains? cfeat/supported-features feature)) @@ -169,7 +162,7 @@ (let [team-id (if (string? team-id) (parse-uuid team-id) team-id)] - (db/tx-run! system + (db/tx-run! main/system (fn [{:keys [::db/conn]}] (let [team (-> (db/get conn :team {:id team-id}) (update :features db/decode-pgarray #{})) @@ -181,9 +174,9 @@ :disabled)))))) (defn enable-storage-features-on-file! - [system & {:as params}] - (enable-objects-map-feature-on-file! system params) - (enable-pointer-map-feature-on-file! system params)) + [& {:as params}] + (enable-objects-map-feature-on-file! main/system params) + (enable-pointer-map-feature-on-file! main/system params)) (defn instrument-var [var] @@ -207,13 +200,13 @@ (defn take-file-snapshot! "An internal helper that persist the file snapshot using non-gc collectable file-changes entry." - [system & {:keys [file-id label]}] + [& {:keys [file-id label]}] (let [file-id (h/parse-uuid file-id)] - (db/tx-run! system fsnap/take-file-snapshot! {:file-id file-id :label label}))) + (db/tx-run! main/system fsnap/take-file-snapshot! {:file-id file-id :label label}))) (defn restore-file-snapshot! - [system & {:keys [file-id id]}] - (db/tx-run! system + [& {:keys [file-id id]}] + (db/tx-run! main/system (fn [cfg] (let [file-id (h/parse-uuid file-id) id (h/parse-uuid id)] @@ -224,12 +217,13 @@ (defn list-file-snapshots! - [system & {:keys [file-id limit]}] - (db/tx-run! system (fn [system] - (let [params {:file-id (h/parse-uuid file-id) - :limit limit}] - (->> (fsnap/get-file-snapshots system (d/without-nils params)) - (print-table [:id :revn :created-at :label])))))) + [& {:keys [file-id limit]}] + (db/tx-run! main/system + (fn [system] + (let [params {:file-id (h/parse-uuid file-id) + :limit limit}] + (->> (fsnap/get-file-snapshots system (d/without-nils params)) + (print-table [:id :revn :created-at :label])))))) (defn notify! [{:keys [::mbus/msgbus ::db/pool]} & {:keys [dest code message level] @@ -334,12 +328,12 @@ (into #{}) (run! send)))) - (defn duplicate-team - [system team-id & {:keys [name]}] + [team-id & {:keys [name]}] (let [team-id (if (string? team-id) (parse-uuid team-id) team-id) name (or name (fn [prev-name] (str/ffmt "Cloned: % (%)" prev-name (dt/format-instant (dt/now)))))] - (db/tx-run! system (fn [cfg] - (db/exec-one! cfg ["SET CONSTRAINTS ALL DEFERRED"]) - (mgmt/duplicate-team cfg :team-id team-id :name name))))) + (db/tx-run! main/system + (fn [cfg] + (db/exec-one! cfg ["SET CONSTRAINTS ALL DEFERRED"]) + (mgmt/duplicate-team cfg :team-id team-id :name name)))))