0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-11 15:21:18 -05:00

Merge pull request #4621 from penpot/niwinz-objects-gc-locking

 Reduce locking on objects-gc task
This commit is contained in:
Alejandro 2024-05-24 09:04:11 +02:00 committed by GitHub
commit 5ab4ed9a05
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 370 additions and 302 deletions

View file

@ -349,8 +349,8 @@
:audit-log-archive (ig/ref :app.loggers.audit.archive-task/handler)
:audit-log-gc (ig/ref :app.loggers.audit.gc-task/handler)
:object-update
(ig/ref :app.tasks.object-update/handler)
:delete-object
(ig/ref :app.tasks.delete-object/handler)
:process-webhook-event
(ig/ref ::webhooks/process-event-handler)
:run-webhook
@ -380,7 +380,7 @@
:app.tasks.orphan-teams-gc/handler
{::db/pool (ig/ref ::db/pool)}
:app.tasks.object-update/handler
:app.tasks.delete-object/handler
{::db/pool (ig/ref ::db/pool)}
:app.tasks.file-gc/handler

View file

@ -35,6 +35,7 @@
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]))
@ -916,7 +917,8 @@
(db/update! conn :file
{:deleted-at (dt/now)}
{:id file-id}
{::db/return-keys [:id :name :is-shared :project-id :created-at :modified-at]}))
{::db/return-keys [:id :name :is-shared :deleted-at
:project-id :created-at :modified-at]}))
(def ^:private
schema:delete-file
@ -929,6 +931,13 @@
(check-edition-permissions! conn profile-id id)
(let [file (mark-file-deleted! conn id)]
(wrk/submit! {::wrk/task :delete-object
::wrk/delay (dt/duration "1m")
::wrk/conn conn
:object :file
:deleted-at (:deleted-at file)
:id id})
;; NOTE: when a file is a shared library, then we proceed to load
;; the whole file, proceed with feature checking and properly execute
;; the absorb-library procedure

View file

@ -271,7 +271,7 @@
(when (and (some? th1)
(not= (:media-id th1)
(:media-id th2)))
(sto/touch-object! storage (:media-id th1) :async true))
(sto/touch-object! storage (:media-id th1)))
th2))

View file

@ -20,6 +20,7 @@
[app.rpc.quotes :as quotes]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]))
(s/def ::id ::us/uuid)
@ -262,10 +263,16 @@
{:deleted-at (dt/now)}
{:id id :is-default false}
{::db/return-keys true})]
(wrk/submit! {::wrk/task :delete-object
::wrk/delay (dt/duration "1m")
::wrk/conn conn
:object :project
:deleted-at (:deleted-at project)
:id id})
(rph/with-meta (rph/wrap)
{::audit/props {:team-id (:team-id project)
:name (:name project)
:created-at (:created-at project)
:modified-at (:modified-at project)}}))))

View file

@ -31,6 +31,7 @@
[app.tokens :as tokens]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]))
@ -528,14 +529,23 @@
{::doc/added "1.17"}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(db/with-atomic [conn pool]
(let [perms (get-permissions conn profile-id id)]
(let [perms (get-permissions conn profile-id id)
deleted-at (dt/now)]
(when-not (:is-owner perms)
(ex/raise :type :validation
:code :only-owner-can-delete-team))
(db/update! conn :team
{:deleted-at (dt/now)}
{:deleted-at deleted-at}
{:id id :is-default false})
(wrk/submit! {::wrk/task :delete-object
::wrk/delay (dt/duration "1m")
::wrk/conn conn
:object :team
:deleted-at deleted-at
:id id})
nil)))

View file

@ -192,7 +192,6 @@
;; NOTIFICATIONS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn notify!
[{:keys [::mbus/msgbus ::db/pool]} & {:keys [dest code message level]
:or {code :generic level :info}
@ -474,6 +473,83 @@
:rollback rollback?
:elapsed elapsed))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RESTORE DELETED OBJECTS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn restore-deleted-team!
"Mark a team and all related objects as not deleted"
[team-id]
(let [team-id (h/parse-uuid team-id)]
(db/tx-run! main/system
(fn [{:keys [::db/conn]}]
(db/update! conn :team-font-variant
{:deleted-at nil}
{:team-id team-id})
(doseq [project (db/update! conn :project
{:deleted-at nil}
{:team-id team-id}
{::db/return-keys [:id]
::db/many true})]
(doseq [file (db/update! conn :file
{:deleted-at nil
:has-media-trimmed false}
{:project-id (:id project)}
{::db/return-keys [:id]
::db/many true})]
;; Fragments are not handled here because they
;; use the database cascade operation and they
;; are not marked for deletion with objects-gc
;; task
(db/update! conn :file-media-object
{:deleted-at nil}
{:file-id (:id file)})
;; Mark thumbnails to be deleted
(db/update! conn :file-thumbnail
{:deleted-at nil}
{:file-id (:id file)})
(db/update! conn :file-tagged-object-thumbnail
{:deleted-at nil}
{:file-id (:id file)})))))))
(defn restore-deleted-project!
"Mark a project and all related objects as not deleted"
[project-id]
(let [project-id (h/parse-uuid project-id)]
(db/tx-run! main/system
(fn [{:keys [::db/conn]}]
(doseq [file (db/update! conn :file
{:deleted-at nil
:has-media-trimmed false}
{:project-id project-id}
{::db/return-keys [:id]
::db/many true})]
;; Fragments are not handled here because they use
;; the database cascade operation and they are not
;; marked for deletion with objects-gc task
(db/update! conn :file-media-object
{:deleted-at nil}
{:file-id (:id file)})
;; Mark thumbnails to be deleted
(db/update! conn :file-thumbnail
{:deleted-at nil}
{:file-id (:id file)})
(db/update! conn :file-tagged-object-thumbnail
{:deleted-at nil}
{:file-id (:id file)}))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; MISC
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View file

@ -16,7 +16,6 @@
[app.storage.impl :as impl]
[app.storage.s3 :as ss3]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[datoteka.fs :as fs]
[integrant.core :as ig]
@ -171,28 +170,16 @@
(impl/put-object object content))
object)))
(def ^:private default-touch-delay
"A default delay for the asynchronous touch operation"
(dt/duration "5m"))
(defn touch-object!
"Mark object as touched."
[{:keys [::db/pool-or-conn] :as storage} object-or-id & {:keys [async]}]
[{:keys [::db/pool-or-conn] :as storage} object-or-id]
(us/assert! ::storage storage)
(let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)]
(if async
(wrk/submit! ::wrk/conn pool-or-conn
::wrk/task :object-update
::wrk/delay default-touch-delay
:object :storage-object
:id id
:key :touched-at
:val (dt/now))
(-> (db/update! pool-or-conn :storage-object
{:touched-at (dt/now)}
{:id id})
(db/get-update-count)
(pos?)))))
(-> (db/update! pool-or-conn :storage-object
{:touched-at (dt/now)}
{:id id})
(db/get-update-count)
(pos?))))
(defn get-object-data
"Return an input stream instance of the object content."

View file

@ -0,0 +1,69 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.tasks.delete-object
"A generic task for object deletion cascade handling"
(:require
[app.common.logging :as l]
[app.db :as db]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(defmulti delete-object
(fn [_ props] (:object props)))
(defmethod delete-object :file
[{:keys [::db/conn]} {:keys [id deleted-at]}]
(l/trc :hint "marking for deletion" :rel "file" :id id)
;; Mark file media objects to be deleted
(db/update! conn :file-media-object
{:deleted-at deleted-at}
{:file-id id})
;; Mark thumbnails to be deleted
(db/update! conn :file-thumbnail
{:deleted-at deleted-at}
{:file-id id})
(db/update! conn :file-tagged-object-thumbnail
{:deleted-at deleted-at}
{:file-id id}))
(defmethod delete-object :project
[{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}]
(l/trc :hint "marking for deletion" :rel "project" :id id)
(doseq [file (db/update! conn :file
{:deleted-at deleted-at}
{:project-id id}
{::db/return-keys [:id :deleted-at]
::db/many true})]
(delete-object cfg (assoc file :object :file))))
(defmethod delete-object :team
[{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}]
(l/trc :hint "marking for deletion" :rel "team" :id id)
(db/update! conn :team-font-variant
{:deleted-at deleted-at}
{:team-id id})
(doseq [project (db/update! conn :project
{:deleted-at deleted-at}
{:team-id id}
{::db/return-keys [:id :deleted-at]
::db/many true})]
(delete-object cfg (assoc project :object :project))))
(defmethod delete-object :default
[_cfg props]
(l/wrn :hint "not implementation found" :rel (:object props)))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool]))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as params}]
(db/tx-run! cfg delete-object props)))

View file

@ -1,32 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.tasks.object-update
"A task used for perform simple object properties update
in an asynchronous flow."
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.db :as db]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(defn- update-object
[{:keys [::db/conn] :as cfg} {:keys [id object key val] :as props}]
(l/trc :hint "update object prop"
:id (str id)
:object (d/name object)
:key (d/name key)
:val val)
(db/update! conn object {key val} {:id id} {::db/return-keys false}))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool]))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as params}]
(db/tx-run! cfg update-object props)))

View file

@ -17,67 +17,18 @@
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(declare ^:private delete-file-data-fragments!)
(declare ^:private delete-file-media-objects!)
(declare ^:private delete-file-object-thumbnails!)
(declare ^:private delete-file-thumbnails!)
(declare ^:private delete-files!)
(declare ^:private delete-fonts!)
(declare ^:private delete-profiles!)
(declare ^:private delete-projects!)
(declare ^:private delete-teams!)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool ::sto/storage]))
(defmethod ig/prep-key ::handler
[_ cfg]
(assoc cfg ::min-age cf/deletion-delay))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [params]
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
;; Disable deletion protection for the current transaction
(db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"])
(db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"])
(let [min-age (dt/duration (or (:min-age params) (::min-age cfg)))
cfg (-> cfg
(assoc ::min-age (db/interval min-age))
(update ::sto/storage media/configure-assets-storage conn))
total (reduce + 0
[(delete-profiles! cfg)
(delete-teams! cfg)
(delete-fonts! cfg)
(delete-projects! cfg)
(delete-files! cfg)
(delete-file-thumbnails! cfg)
(delete-file-object-thumbnails! cfg)
(delete-file-data-fragments! cfg)
(delete-file-media-objects! cfg)])]
(l/info :hint "task finished"
:deleted total
:rollback? (boolean (:rollback? params)))
(when (:rollback? params)
(db/rollback! conn))
{:processed total})))))
(def ^:private sql:get-profiles
"SELECT id, photo_id FROM profile
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-profiles!
[{:keys [::db/conn ::min-age ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-profiles min-age])
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-profiles min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [id photo-id]}]
(l/trc :hint "permanently delete" :rel "profile" :id (str id))
@ -99,13 +50,13 @@
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-teams!
[{:keys [::db/conn ::min-age ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-teams min-age])
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-teams min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [id photo-id deleted-at]}]
(l/trc :hint "permanently delete"
:rel "team"
@ -118,15 +69,6 @@
;; And finally, permanently delete the team.
(db/delete! conn :team {:id id})
;; Mark for deletion in cascade
(db/update! conn :team-font-variant
{:deleted-at deleted-at}
{:team-id id})
(db/update! conn :project
{:deleted-at deleted-at}
{:team-id id})
(inc total))
0)))
@ -136,12 +78,13 @@
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-fonts!
[{:keys [::db/conn ::min-age ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-fonts min-age])
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-fonts min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [id team-id deleted-at] :as font}]
(l/trc :hint "permanently delete"
:rel "team-font-variant"
@ -167,12 +110,13 @@
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-projects!
[{:keys [::db/conn ::min-age] :as cfg}]
(->> (db/cursor conn [sql:get-projects min-age])
[{:keys [::db/conn ::min-age ::chunk-size] :as cfg}]
(->> (db/cursor conn [sql:get-projects min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [id team-id deleted-at]}]
(l/trc :hint "permanently delete"
:rel "project"
@ -183,11 +127,6 @@
;; And finally, permanently delete the project.
(db/delete! conn :project {:id id})
;; Mark files to be deleted
(db/update! conn :file
{:deleted-at deleted-at}
{:project-id id})
(inc total))
0)))
@ -197,12 +136,13 @@
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-files!
[{:keys [::db/conn ::min-age] :as cfg}]
(->> (db/cursor conn [sql:get-files min-age])
[{:keys [::db/conn ::min-age ::chunk-size] :as cfg}]
(->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [id deleted-at project-id]}]
(l/trc :hint "permanently delete"
:rel "file"
@ -210,26 +150,9 @@
:project-id (str project-id)
:deleted-at (dt/format-instant deleted-at))
;; NOTE: fragments not handled here because they have
;; cascade.
;; And finally, permanently delete the file.
(db/delete! conn :file {:id id})
;; Mark file media objects to be deleted
(db/update! conn :file-media-object
{:deleted-at deleted-at}
{:file-id id})
;; Mark thumbnails to be deleted
(db/update! conn :file-thumbnail
{:deleted-at deleted-at}
{:file-id id})
(db/update! conn :file-tagged-object-thumbnail
{:deleted-at deleted-at}
{:file-id id})
(inc total))
0)))
@ -239,12 +162,13 @@
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn delete-file-thumbnails!
[{:keys [::db/conn ::min-age ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-file-thumbnails min-age])
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-file-thumbnails min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [file-id revn media-id deleted-at]}]
(l/trc :hint "permanently delete"
:rel "file-thumbnail"
@ -267,12 +191,13 @@
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn delete-file-object-thumbnails!
[{:keys [::db/conn ::min-age ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-file-object-thumbnails min-age])
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-file-object-thumbnails min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [file-id object-id media-id deleted-at]}]
(l/trc :hint "permanently delete"
:rel "file-tagged-object-thumbnail"
@ -295,12 +220,13 @@
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-file-data-fragments!
[{:keys [::db/conn ::min-age] :as cfg}]
(->> (db/cursor conn [sql:get-file-data-fragments min-age])
[{:keys [::db/conn ::min-age ::chunk-size] :as cfg}]
(->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [file-id id deleted-at]}]
(l/trc :hint "permanently delete"
:rel "file-data-fragment"
@ -319,12 +245,13 @@
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
ORDER BY deleted_at ASC
LIMIT ?
FOR UPDATE
SKIP LOCKED")
(defn- delete-file-media-objects!
[{:keys [::db/conn ::min-age ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-file-media-objects min-age])
[{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}]
(->> (db/cursor conn [sql:get-file-media-objects min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [id file-id deleted-at] :as fmo}]
(l/trc :hint "permanently delete"
:rel "file-media-object"
@ -340,3 +267,53 @@
(inc total))
0)))
(def ^:private deletion-proc-vars
[#'delete-file-media-objects!
#'delete-file-data-fragments!
#'delete-file-object-thumbnails!
#'delete-file-thumbnails!
#'delete-files!
#'delete-projects!
#'delete-fonts!
#'delete-teams!
#'delete-profiles!])
(defn- execute-proc!
"A generic function that executes the specified proc iterativelly
until 0 results is returned"
[cfg proc-fn]
(loop [total 0]
(let [result (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"])
(proc-fn cfg)))]
(if (pos? result)
(recur (+ total result))
total))))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool ::sto/storage]))
(defmethod ig/prep-key ::handler
[_ cfg]
(assoc cfg
::min-age cf/deletion-delay
::chunk-size 10))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [params]
(let [min-age (dt/duration (or (:min-age params) (::min-age cfg)))
cfg (-> cfg
(assoc ::min-age (db/interval min-age))
(update ::sto/storage media/configure-assets-storage))]
(loop [procs (map deref deletion-proc-vars)
total 0]
(if-let [proc-fn (first procs)]
(let [result (execute-proc! cfg proc-fn)]
(recur (rest procs)
(+ total result)))
(do
(l/inf :hint "task finished" :deleted total)
{:processed total}))))))

View file

@ -35,8 +35,92 @@
[_ item]
{:params item})
(defn- get-task
[{:keys [::db/pool]} task-id]
(ex/try!
(some-> (db/get* pool :task {:id task-id})
(decode-task-row))))
(defn- run-task
[{:keys [::wrk/registry ::id ::queue] :as cfg} task]
(try
(l/dbg :hint "start"
:name (:name task)
:task-id (str (:id task))
:queue queue
:runner-id id
:retry (:retry-num task))
(let [tpoint (dt/tpoint)
task-fn (get registry (:name task))
result (if task-fn
(task-fn task)
{:status :completed :task task})
elapsed (dt/format-duration (tpoint))]
(when-not task-fn
(l/wrn :hint "no task handler found" :name (:name task)))
(l/dbg :hint "end"
:name (:name task)
:task-id (str (:id task))
:queue queue
:runner-id id
:retry (:retry-num task)
:elapsed elapsed)
result)
(catch InterruptedException cause
(throw cause))
(catch Throwable cause
(let [edata (ex-data cause)]
(if (and (< (:retry-num task)
(:max-retries task))
(= ::retry (:type edata)))
(cond-> {:status :retry :task task :error cause}
(dt/duration? (:delay edata))
(assoc :delay (:delay edata))
(= ::noop (:strategy edata))
(assoc :inc-by 0))
(do
(l/err :hint "unhandled exception on task"
::l/context (get-error-context cause task)
:cause cause)
(if (>= (:retry-num task) (:max-retries task))
{:status :failed :task task :error cause}
{:status :retry :task task :error cause})))))))
(defn- run-task!
[{:keys [::rds/rconn ::id] :as cfg} task-id]
(loop [task (get-task cfg task-id)]
(cond
(ex/exception? task)
(if (or (db/connection-error? task)
(db/serialization-error? task))
(do
(l/wrn :hint "connection error on retrieving task from database (retrying in some instants)"
:id id
:cause task)
(px/sleep (::rds/timeout rconn))
(recur (get-task cfg task-id)))
(do
(l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)"
:id id
:cause task)
(px/sleep (::rds/timeout rconn))
(recur (get-task cfg task-id))))
(nil? task)
(l/wrn :hint "no task found on the database"
:id id
:task-id task-id)
:else
(run-task cfg task))))
(defn- run-worker-loop!
[{:keys [::db/pool ::rds/rconn ::wrk/registry ::timeout ::queue ::id]}]
[{:keys [::db/pool ::rds/rconn ::timeout ::queue] :as cfg}]
(letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}]
(let [explain (ex-message error)
nretry (+ (:retry-num task) inc-by)
@ -82,88 +166,6 @@
:length (alength payload)
:cause cause))))
(handle-task [{:keys [name] :as task}]
(let [task-fn (get registry name)]
(if task-fn
(task-fn task)
(l/wrn :hint "no task handler found" :name name))
{:status :completed :task task}))
(handle-task-exception [cause task]
(let [edata (ex-data cause)]
(if (and (< (:retry-num task)
(:max-retries task))
(= ::retry (:type edata)))
(cond-> {:status :retry :task task :error cause}
(dt/duration? (:delay edata))
(assoc :delay (:delay edata))
(= ::noop (:strategy edata))
(assoc :inc-by 0))
(do
(l/err :hint "unhandled exception on task"
::l/context (get-error-context cause task)
:cause cause)
(if (>= (:retry-num task) (:max-retries task))
{:status :failed :task task :error cause}
{:status :retry :task task :error cause})))))
(get-task [task-id]
(ex/try!
(some-> (db/get* pool :task {:id task-id})
(decode-task-row))))
(run-task [task-id]
(loop [task (get-task task-id)]
(cond
(ex/exception? task)
(if (or (db/connection-error? task)
(db/serialization-error? task))
(do
(l/wrn :hint "connection error on retrieving task from database (retrying in some instants)"
:id id
:cause task)
(px/sleep (::rds/timeout rconn))
(recur (get-task task-id)))
(do
(l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)"
:id id
:cause task)
(px/sleep (::rds/timeout rconn))
(recur (get-task task-id))))
(nil? task)
(l/wrn :hint "no task found on the database"
:id id
:task-id task-id)
:else
(try
(l/dbg :hint "start"
:name (:name task)
:task-id (str task-id)
:queue queue
:runner-id id
:retry (:retry-num task))
(let [tpoint (dt/tpoint)
result (handle-task task)
elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "end"
:name (:name task)
:task-id (str task-id)
:queue queue
:runner-id id
:retry (:retry-num task)
:elapsed elapsed)
result)
(catch InterruptedException cause
(throw cause))
(catch Throwable cause
(handle-task-exception cause task))))))
(process-result [{:keys [status] :as result}]
(ex/try!
(case status
@ -173,7 +175,7 @@
nil)))
(run-task-loop [task-id]
(loop [result (run-task task-id)]
(loop [result (run-task! cfg task-id)]
(when-let [cause (process-result result)]
(if (or (db/connection-error? cause)
(db/serialization-error? cause))

View file

@ -34,6 +34,7 @@
[app.util.blob :as blob]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker.runner]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[clojure.test :as t]
@ -77,47 +78,6 @@
:enable-feature-components-v2
:disable-file-validation])
(def test-init-sql
["alter table project_profile_rel set unlogged;\n"
"alter table file_profile_rel set unlogged;\n"
"alter table presence set unlogged;\n"
"alter table presence set unlogged;\n"
"alter table http_session set unlogged;\n"
"alter table team_profile_rel set unlogged;\n"
"alter table team_project_profile_rel set unlogged;\n"
"alter table comment_thread_status set unlogged;\n"
"alter table comment set unlogged;\n"
"alter table comment_thread set unlogged;\n"
"alter table profile_complaint_report set unlogged;\n"
"alter table file_change set unlogged;\n"
"alter table team_font_variant set unlogged;\n"
"alter table share_link set unlogged;\n"
"alter table usage_quote set unlogged;\n"
"alter table access_token set unlogged;\n"
"alter table profile set unlogged;\n"
"alter table file_library_rel set unlogged;\n"
"alter table file_thumbnail set unlogged;\n"
"alter table file_object_thumbnail set unlogged;\n"
"alter table file_tagged_object_thumbnail set unlogged;\n"
"alter table file_media_object set unlogged;\n"
"alter table file_data_fragment set unlogged;\n"
"alter table file set unlogged;\n"
"alter table project set unlogged;\n"
"alter table team_invitation set unlogged;\n"
"alter table webhook_delivery set unlogged;\n"
"alter table webhook set unlogged;\n"
"alter table team set unlogged;\n"
;; For some reason, modifying the task realted tables is very very
;; slow (5s); so we just don't alter them
;; "alter table task set unlogged;\n"
;; "alter table task_default set unlogged;\n"
;; "alter table task_completed set unlogged;\n"
"alter table audit_log set unlogged ;\n"
"alter table storage_object set unlogged;\n"
"alter table server_error_report set unlogged;\n"
"alter table server_prop set unlogged;\n"
"alter table global_complaint_report set unlogged;\n"])
(defn state-init
[next]
(with-redefs [app.config/flags (flags/parse flags/default default-flags)
@ -164,9 +124,6 @@
(try
(binding [*system* system
*pool* (:app.db/pool system)]
(db/with-atomic [conn *pool*]
(doseq [sql test-init-sql]
(db/exec! conn [sql])))
(next))
(finally
(ig/halt! system))))))
@ -181,8 +138,7 @@
(db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"])
(db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"])
(let [result (->> (db/exec! conn [sql])
(map :table-name)
(remove #(= "task" %)))]
(map :table-name))]
(doseq [table result]
(db/exec! conn [(str "delete from " table ";")]))))
@ -425,6 +381,18 @@
(let [task-fn (get tasks (d/name name))]
(task-fn params)))))
(def sql:pending-tasks
"select t.* from task as t
where t.status = 'new'
order by t.priority desc, t.scheduled_at")
(defn run-pending-tasks!
[]
(db/tx-run! *system* (fn [{:keys [::db/conn] :as cfg}]
(let [tasks (->> (db/exec! conn [sql:pending-tasks])
(map #'app.worker.runner/decode-task-row))]
(run! (partial #'app.worker.runner/run-task cfg) tasks)))))
;; --- UTILS
(defn print-error!

View file

@ -1189,6 +1189,7 @@
(t/is (nil? error))
(t/is (map? result)))
;; insert another thumbnail with different revn
(let [data {::th/type :create-file-thumbnail
::rpc/profile-id (:id prof)
:file-id (:id file)
@ -1207,8 +1208,6 @@
(t/is (= 2 (count rows)))))
(t/testing "gc task"
;; make the file eligible for GC waiting 300ms (configured
;; timeout for testing)
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))

View file

@ -346,13 +346,5 @@
(assoc :size 312043))))
out (th/command! data)]
(t/is (nil? (:error out)))
(t/is (map? (:result out))))
(t/is (map? (:result out))))))
(let [[row1 :as rows]
(->> (th/db-query :task {:name "object-update"})
(map #(update % :props db/decode-transit-pgobject)))]
;; (app.common.pprint/pprint rows)
(t/is (= 1 (count rows)))
(t/is (> (inst-ms (dt/diff (:created-at row1) (:scheduled-at row1)))
(inst-ms (dt/duration "4m")))))))

View file

@ -391,6 +391,8 @@
(t/is (= 1 (count result)))
(t/is (= (:default-team-id profile1) (get-in result [0 :id])))))
(th/run-pending-tasks!)
;; run permanent deletion (should be noop)
(let [result (th/run-task! :objects-gc {:min-age (dt/duration {:minutes 1})})]
(t/is (= 0 (:processed result))))
@ -457,6 +459,8 @@
#_(th/print-result! out)
(t/is (nil? (:error out))))
(th/run-pending-tasks!)
(let [rows (th/db-exec! ["select * from team where id = ?" (:id team)])]
(t/is (= 1 (count rows)))
(t/is (dt/instant? (:deleted-at (first rows)))))