diff --git a/backend/src/app/srepl/helpers.clj b/backend/src/app/srepl/helpers.clj index 307ccbc54..ecb25981d 100644 --- a/backend/src/app/srepl/helpers.clj +++ b/backend/src/app/srepl/helpers.clj @@ -25,6 +25,7 @@ [app.util.blob :as blob] [app.util.time :as dt] [clojure.spec.alpha :as s] + [clojure.stacktrace :as strace] [clojure.walk :as walk] [cuerdas.core :as str] [expound.alpha :as expound])) @@ -78,7 +79,7 @@ (update file :data blob/decode)))) (def ^:private sql:retrieve-files-chunk - "SELECT id, name, modified_at, data FROM file + "SELECT id, name, created_at, data FROM file WHERE created_at < ? AND deleted_at is NULL ORDER BY created_at desc LIMIT ?") @@ -88,26 +89,39 @@ The `on-file` parameter should be a function that receives the file and the previous state and returns the new state." - [system & {:keys [chunk-size on-file] :or {chunk-size 10}}] + [system & {:keys [chunk-size max-chunks start-at on-file on-error on-end] + :or {chunk-size 10 max-chunks Long/MAX_VALUE}}] (letfn [(get-chunk [conn cursor] (let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])] [(some->> rows peek :created-at) (seq rows)])) (get-candidates [conn] (->> (d/iteration (partial get-chunk conn) - :vf second - :kf first - :initk (dt/now)) - (sequence cat) - (map #(update % :data blob/decode))))] + :vf second + :kf first + :initk (or start-at (dt/now))) + (take max-chunks) + (mapcat identity) + (map #(update % :data blob/decode)))) + + (on-error* [file cause] + (println "unexpected exception happened on processing file: " (:id file)) + (strace/print-stack-trace cause))] (db/with-atomic [conn (:app.db/pool system)] (loop [state {} files (get-candidates conn)] (if-let [file (first files)] - (let [state (on-file file state)] - (recur state (rest files))) - state))))) + (let [state' (try + (on-file file state) + (catch Throwable cause + (let [on-error (or on-error on-error*)] + (on-error file cause))))] + (recur (or state' state) (rest files))) + + (if (fn? on-end) + (on-end state) + state)))))) (defn analyze-file-data