From ec51e0c0d77025cbbe20f2e5a5ee0885d29a6677 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 8 Nov 2023 11:13:31 +0100 Subject: [PATCH] :sparkles: Add max-time constraint for migration --- backend/src/app/srepl/components_v2.clj | 35 ++++++++++++++++--------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/backend/src/app/srepl/components_v2.clj b/backend/src/app/srepl/components_v2.clj index b6cb138b4..af69bb79b 100644 --- a/backend/src/app/srepl/components_v2.clj +++ b/backend/src/app/srepl/components_v2.clj @@ -203,7 +203,7 @@ (defn migrate-teams! [{:keys [::db/pool] :as system} - & {:keys [chunk-size max-jobs max-items start-at rollback preset skip-on-error] + & {:keys [chunk-size max-jobs max-items start-at rollback preset skip-on-error max-time] :or {chunk-size 10000 rollback true skip-on-error true @@ -227,14 +227,29 @@ (map #(update % :features db/decode-pgarray #{})) (remove #(contains? (:features %) "ephimeral/v2-migration")) (take max-items) - (map :id)))] + (map :id))) + + (migrate-team [team-id] + (try + (-> (assoc system ::db/rollback rollback) + (feat/migrate-team! team-id)) + (catch Throwable cause + (l/err :hint "unexpected error on processing team" :team-id (dm/str team-id) :cause cause)))) + + (process-team [scope tpoint mtime team-id] + (ps/acquire! feat/*semaphore*) + (let [ts (tpoint)] + (if (and mtime (neg? (compare mtime ts))) + (l/trc :hint "max time constraint reached" :elapsed (dt/format-duration ts)) + (px/submit! scope (partial migrate-team team-id)))))] (l/dbg :hint "migrate:start") (let [sem (ps/create :permits max-jobs) total (get-total-teams pool) stats (atom {:total/teams (min total max-items)}) - tpoint (dt/tpoint)] + tpoint (dt/tpoint) + mtime (some-> max-time dt/duration)] (add-watch stats :progress-report (report-progress-teams tpoint)) @@ -244,17 +259,11 @@ (try (pu/with-open [scope (px/structured-task-scope :preset preset :factory :virtual)] - (run! (fn [team-id] - (l/trc :hint "scheduling task" :team-id (dm/str team-id)) - (ps/acquire! sem) - (px/submit! scope (fn [] - (try - (-> (assoc system ::db/rollback rollback) - (feat/migrate-team! team-id)) - (catch Throwable cause - (l/err :hint "unexpected error on processing team" :team-id (dm/str team-id) :cause cause)))))) + (loop [candidates (get-candidates)] + (when-let [team-id (first candidates)] + (when (process-team scope tpoint mtime team-id) + (recur (rest candidates))))) - (get-candidates)) (p/await! scope)) (print-stats!