diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index c3eb47e92..7a4297ca9 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -431,25 +431,40 @@ process-file (fn [file-id idx tpoint] - (try - (l/trc :hint "process:file:start" :file-id (str file-id) :index idx) - (let [system (assoc main/system ::db/rollback rollback?)] - (db/tx-run! system (fn [system] - (binding [h/*system* system] - (h/process-file! system file-id update-fn opts))))) - - (catch Throwable cause - (l/wrn :hint "unexpected error on processing file (skiping)" + (let [thread-id (px/get-thread-id)] + (try + (l/trc :hint "process:file:start" + :tid thread-id :file-id (str file-id) - :index idx - :cause cause)) - (finally - (ps/release! sjobs) - (let [elapsed (dt/format-duration (tpoint))] - (l/trc :hint "process:file:end" + :index idx) + (let [system (assoc main/system ::db/rollback rollback?)] + (db/tx-run! system (fn [system] + (binding [h/*system* system] + (h/process-file! system file-id update-fn opts))))) + + (catch Throwable cause + (l/wrn :hint "unexpected error on processing file (skiping)" + :tid thread-id :file-id (str file-id) :index idx - :elapsed elapsed))))) + :cause cause)) + (finally + (when-let [pause (:pause opts)] + (Thread/sleep (int pause))) + + (ps/release! sjobs) + (let [elapsed (dt/format-duration (tpoint))] + (l/trc :hint "process:file:end" + :tid thread-id + :file-id (str file-id) + :index idx + :elapsed elapsed)))))) + + process-file* + (fn [idx file-id] + (ps/acquire! sjobs) + (px/run! executor (partial process-file file-id idx (dt/tpoint))) + (inc idx)) process-files (fn [{:keys [::db/conn] :as system}] @@ -457,14 +472,12 @@ (db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) (try - (reduce (fn [idx file-id] - (ps/acquire! sjobs) - (px/run! executor (partial process-file file-id idx (dt/tpoint))) - (inc idx)) - 0 - (->> (db/cursor conn [query] {:chunk-size 1}) - (take max-items) - (map :id))) + (->> (db/plan conn [query]) + (transduce (comp + (take max-items) + (map :id)) + (completing process-file*) + 0)) (finally ;; Close and await tasks (pu/close! executor))))]