mirror of
https://github.com/penpot/penpot.git
synced 2025-04-12 23:11:23 -05:00
Merge pull request #4455 from penpot/niwinz-staging-bugfix-2
✨ Make some storage operations asynchronous
This commit is contained in:
commit
c975e0bcee
8 changed files with 107 additions and 15 deletions
|
@ -349,6 +349,8 @@
|
|||
:audit-log-archive (ig/ref :app.loggers.audit.archive-task/handler)
|
||||
:audit-log-gc (ig/ref :app.loggers.audit.gc-task/handler)
|
||||
|
||||
:object-update
|
||||
(ig/ref :app.tasks.object-update/handler)
|
||||
:process-webhook-event
|
||||
(ig/ref ::webhooks/process-event-handler)
|
||||
:run-webhook
|
||||
|
@ -376,7 +378,10 @@
|
|||
::sto/storage (ig/ref ::sto/storage)}
|
||||
|
||||
:app.tasks.orphan-teams-gc/handler
|
||||
{::db/pool (ig/ref ::db/pool)}
|
||||
{::db/pool (ig/ref ::db/pool)}
|
||||
|
||||
:app.tasks.object-update/handler
|
||||
{::db/pool (ig/ref ::db/pool)}
|
||||
|
||||
:app.tasks.file-gc/handler
|
||||
{::db/pool (ig/ref ::db/pool)
|
||||
|
|
|
@ -271,7 +271,7 @@
|
|||
(when (and (some? th1)
|
||||
(not= (:media-id th1)
|
||||
(:media-id th2)))
|
||||
(sto/touch-object! storage (:media-id th1)))
|
||||
(sto/touch-object! storage (:media-id th1) :async true))
|
||||
|
||||
th2))
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
[app.storage.impl :as impl]
|
||||
[app.storage.s3 :as ss3]
|
||||
[app.util.time :as dt]
|
||||
[app.worker :as wrk]
|
||||
[clojure.spec.alpha :as s]
|
||||
[datoteka.fs :as fs]
|
||||
[integrant.core :as ig]
|
||||
|
@ -170,15 +171,28 @@
|
|||
(impl/put-object object content))
|
||||
object)))
|
||||
|
||||
(def ^:private default-touch-delay
|
||||
"A default delay for the asynchronous touch operation"
|
||||
(dt/duration "5m"))
|
||||
|
||||
(defn touch-object!
|
||||
"Mark object as touched."
|
||||
[{:keys [::db/pool-or-conn] :as storage} object-or-id]
|
||||
[{:keys [::db/pool-or-conn] :as storage} object-or-id & {:keys [async]}]
|
||||
(us/assert! ::storage storage)
|
||||
(let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)
|
||||
rs (db/update! pool-or-conn :storage-object
|
||||
{:touched-at (dt/now)}
|
||||
{:id id})]
|
||||
(pos? (db/get-update-count rs))))
|
||||
(let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)]
|
||||
(if async
|
||||
(wrk/submit! ::wrk/conn pool-or-conn
|
||||
::wrk/task :object-update
|
||||
::wrk/delay default-touch-delay
|
||||
:object :storage-object
|
||||
:id id
|
||||
:key :touched-at
|
||||
:val (dt/now))
|
||||
(-> (db/update! pool-or-conn :storage-object
|
||||
{:touched-at (dt/now)}
|
||||
{:id id})
|
||||
(db/get-update-count)
|
||||
(pos?)))))
|
||||
|
||||
(defn get-object-data
|
||||
"Return an input stream instance of the object content."
|
||||
|
|
32
backend/src/app/tasks/object_update.clj
Normal file
32
backend/src/app/tasks/object_update.clj
Normal file
|
@ -0,0 +1,32 @@
|
|||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
;;
|
||||
;; Copyright (c) KALEIDOS INC
|
||||
|
||||
(ns app.tasks.object-update
|
||||
"A task used for perform simple object properties update
|
||||
in an asynchronous flow."
|
||||
(:require
|
||||
[app.common.data :as d]
|
||||
[app.common.logging :as l]
|
||||
[app.db :as db]
|
||||
[clojure.spec.alpha :as s]
|
||||
[integrant.core :as ig]))
|
||||
|
||||
(defn- update-object
|
||||
[{:keys [::db/conn] :as cfg} {:keys [id object key val] :as props}]
|
||||
(l/trc :hint "update object prop"
|
||||
:id (str id)
|
||||
:object (d/name object)
|
||||
:key (d/name key)
|
||||
:val val)
|
||||
(db/update! conn object {key val} {:id id} {::db/return-keys false}))
|
||||
|
||||
(defmethod ig/pre-init-spec ::handler [_]
|
||||
(s/keys :req [::db/pool]))
|
||||
|
||||
(defmethod ig/init-key ::handler
|
||||
[_ cfg]
|
||||
(fn [{:keys [props] :as params}]
|
||||
(db/tx-run! cfg update-object props)))
|
|
@ -119,11 +119,13 @@
|
|||
:next.jdbc/update-count))]
|
||||
(l/trc :hint "submit task"
|
||||
:name task
|
||||
:task-id (str id)
|
||||
:queue queue
|
||||
:label label
|
||||
:dedupe (boolean dedupe)
|
||||
:deleted (or deleted 0)
|
||||
:in (dt/format-duration duration))
|
||||
:delay (dt/format-duration duration)
|
||||
:replace (or deleted 0))
|
||||
|
||||
|
||||
(db/exec-one! conn [sql:insert-new-task id task props queue
|
||||
label priority max-retries interval])
|
||||
|
|
|
@ -92,7 +92,7 @@
|
|||
(let [ts (ms-until-valid cron)
|
||||
ft (px/schedule! ts (partial execute-cron-task cfg task))]
|
||||
|
||||
(l/dbg :hint "schedule task" :id id
|
||||
(l/dbg :hint "schedule" :id id
|
||||
:ts (dt/format-duration ts)
|
||||
:at (dt/format-instant (dt/in-future ts)))
|
||||
|
||||
|
|
|
@ -139,7 +139,7 @@
|
|||
|
||||
:else
|
||||
(try
|
||||
(l/trc :hint "start task"
|
||||
(l/trc :hint "start"
|
||||
:name (:name task)
|
||||
:task-id (str task-id)
|
||||
:queue queue
|
||||
|
@ -149,7 +149,7 @@
|
|||
result (handle-task task)
|
||||
elapsed (dt/format-duration (tpoint))]
|
||||
|
||||
(l/trc :hint "end task"
|
||||
(l/trc :hint "end"
|
||||
:name (:name task)
|
||||
:task-id (str task-id)
|
||||
:queue queue
|
||||
|
|
|
@ -277,8 +277,6 @@
|
|||
(t/is (thrown? org.postgresql.util.PSQLException
|
||||
(th/db-delete! :storage-object {:id (:media-id row1)}))))))
|
||||
|
||||
|
||||
|
||||
(t/deftest get-file-object-thumbnail
|
||||
(let [storage (::sto/storage th/*system*)
|
||||
profile (th/create-profile* 1)
|
||||
|
@ -317,3 +315,44 @@
|
|||
|
||||
(let [result (:result out)]
|
||||
(t/is (contains? result "test-key-2"))))))
|
||||
|
||||
(t/deftest create-file-object-thumbnail
|
||||
(th/db-delete! :task {:name "object-update"})
|
||||
(let [storage (::sto/storage th/*system*)
|
||||
profile (th/create-profile* 1)
|
||||
file (th/create-file* 1 {:profile-id (:id profile)
|
||||
:project-id (:default-project-id profile)
|
||||
:is-shared false})
|
||||
data {::th/type :create-file-object-thumbnail
|
||||
::rpc/profile-id (:id profile)
|
||||
:file-id (:id file)
|
||||
:object-id "test-key-2"
|
||||
:media {:filename "sample.jpg"
|
||||
:mtype "image/jpeg"}}]
|
||||
|
||||
(let [data (update data :media
|
||||
(fn [media]
|
||||
(-> media
|
||||
(assoc :path (th/tempfile "backend_tests/test_files/sample2.jpg"))
|
||||
(assoc :size 7923))))
|
||||
out (th/command! data)]
|
||||
(t/is (nil? (:error out)))
|
||||
(t/is (map? (:result out))))
|
||||
|
||||
(let [data (update data :media
|
||||
(fn [media]
|
||||
(-> media
|
||||
(assoc :path (th/tempfile "backend_tests/test_files/sample.jpg"))
|
||||
(assoc :size 312043))))
|
||||
out (th/command! data)]
|
||||
(t/is (nil? (:error out)))
|
||||
(t/is (map? (:result out))))
|
||||
|
||||
(let [[row1 :as rows]
|
||||
(->> (th/db-query :task {:name "object-update"})
|
||||
(map #(update % :props db/decode-transit-pgobject)))]
|
||||
|
||||
;; (app.common.pprint/pprint rows)
|
||||
(t/is (= 1 (count rows)))
|
||||
(t/is (> (inst-ms (dt/diff (:created-at row1) (:scheduled-at row1)))
|
||||
(inst-ms (dt/duration "4m")))))))
|
||||
|
|
Loading…
Add table
Reference in a new issue