0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-16 01:31:22 -05:00

Initial work on remove-media and remove-demo-profile tasks.

This commit is contained in:
Andrey Antukh 2020-02-05 23:52:17 +01:00
parent 1ac6e466ce
commit 358136b840
8 changed files with 179 additions and 53 deletions

View file

@ -28,8 +28,10 @@
[uxbox.services.mutations :as sm] [uxbox.services.mutations :as sm]
[uxbox.services.util :as su] [uxbox.services.util :as su]
[uxbox.services.mutations.profile :as profile] [uxbox.services.mutations.profile :as profile]
[uxbox.tasks :as tasks]
[uxbox.util.blob :as blob] [uxbox.util.blob :as blob]
[uxbox.util.uuid :as uuid] [uxbox.util.uuid :as uuid]
[uxbox.util.time :as tm]
[vertx.core :as vc])) [vertx.core :as vc]))
(def sql:insert-user (def sql:insert-user
@ -41,7 +43,7 @@
values ($1, $2, true)") values ($1, $2, true)")
(sm/defmutation ::create-demo-profile (sm/defmutation ::create-demo-profile
[params] [_]
(let [id (uuid/next) (let [id (uuid/next)
sem (System/currentTimeMillis) sem (System/currentTimeMillis)
email (str "demo-" sem ".demo@nodomain.com") email (str "demo-" sem ".demo@nodomain.com")
@ -52,5 +54,10 @@
(db/with-atomic [conn db/pool] (db/with-atomic [conn db/pool]
(db/query-one conn [sql:insert-user id fullname email password']) (db/query-one conn [sql:insert-user id fullname email password'])
(db/query-one conn [sql:insert-email id email]) (db/query-one conn [sql:insert-email id email])
;; Schedule deletion of the demo profile
(tasks/schedule! conn {:name "remove-demo-profile"
:delay (tm/duration {:hours 48})
:props {:id id}})
{:email email {:email email
:password password}))) :password password})))

View file

@ -22,6 +22,7 @@
[uxbox.db :as db] [uxbox.db :as db]
[uxbox.emails :as emails] [uxbox.emails :as emails]
[uxbox.images :as images] [uxbox.images :as images]
[uxbox.tasks :as tasks]
[uxbox.media :as media] [uxbox.media :as media]
[uxbox.services.mutations :as sm] [uxbox.services.mutations :as sm]
[uxbox.services.mutations.images :as imgs] [uxbox.services.mutations.images :as imgs]
@ -46,7 +47,7 @@
;; --- Mutation: Login ;; --- Mutation: Login
(declare retrieve-user) (declare retrieve-user-by-email)
(s/def ::email ::us/email) (s/def ::email ::us/email)
(s/def ::scope ::us/string) (s/def ::scope ::us/string)
@ -70,7 +71,7 @@
:code ::wrong-credentials)) :code ::wrong-credentials))
{:id (:id user)})] {:id (:id user)})]
(-> (retrieve-user db/pool email) (-> (retrieve-user-by-email db/pool email)
(p/then' check-user)))) (p/then' check-user))))
(def sql:user-by-email (def sql:user-by-email
@ -79,7 +80,7 @@
where u.email=$1 where u.email=$1
and u.deleted_at is null") and u.deleted_at is null")
(defn- retrieve-user (defn- retrieve-user-by-email
[conn email] [conn email]
(db/query-one conn [sql:user-by-email email])) (db/query-one conn [sql:user-by-email email]))
@ -163,9 +164,14 @@
(sm/defmutation ::update-profile-photo (sm/defmutation ::update-profile-photo
[{:keys [user file] :as params}] [{:keys [user file] :as params}]
(db/with-atomic [conn db/pool] (db/with-atomic [conn db/pool]
;; TODO: send task for delete old photo (p/let [profile (profile/retrieve-profile conn user)
(-> (upload-photo conn params) photo (upload-photo conn params)]
(p/then (partial update-profile-photo conn user)))))
;; Schedule deletion of old photo
(tasks/schedule! conn {:name "remove-media"
:props {:path (:photo profile)}})
;; Save new photo
(update-profile-photo conn user photo))))
(defn- upload-photo (defn- upload-photo
[conn {:keys [file user]}] [conn {:keys [file user]}]
@ -281,7 +287,7 @@
:token (:token user) :token (:token user)
:name (:fullname user)}))] :name (:fullname user)}))]
(db/with-atomic [conn db/pool] (db/with-atomic [conn db/pool]
(-> (retrieve-user conn email) (-> (retrieve-user-by-email conn email)
(p/then' su/raise-not-found-if-nil) (p/then' su/raise-not-found-if-nil)
(p/then #(create-recovery-token conn %)) (p/then #(create-recovery-token conn %))
(p/then #(send-email-notification conn %)) (p/then #(send-email-notification conn %))

View file

@ -9,6 +9,7 @@
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[promesa.core :as p] [promesa.core :as p]
[promesa.exec :as px] [promesa.exec :as px]
[uxbox.common.exceptions :as ex]
[uxbox.common.spec :as us] [uxbox.common.spec :as us]
[uxbox.db :as db] [uxbox.db :as db]
[uxbox.images :as images] [uxbox.images :as images]

View file

@ -18,8 +18,9 @@
[uxbox.config :as cfg] [uxbox.config :as cfg]
[uxbox.core :refer [system]] [uxbox.core :refer [system]]
[uxbox.db :as db] [uxbox.db :as db]
[uxbox.tasks.demo-gc]
[uxbox.tasks.sendmail] [uxbox.tasks.sendmail]
[uxbox.tasks.remove-media]
[uxbox.tasks.remove-demo-profile]
[uxbox.tasks.impl :as impl] [uxbox.tasks.impl :as impl]
[uxbox.util.time :as dt] [uxbox.util.time :as dt]
[vertx.core :as vc] [vertx.core :as vc]
@ -28,9 +29,10 @@
;; --- Public API ;; --- Public API
(defn schedule! (defn schedule!
([task] (schedule! db/pool task)) ([opts] (schedule! db/pool opts))
([conn task] ([conn opts]
(impl/schedule! conn task))) (s/assert ::impl/task-options opts)
(impl/schedule! conn opts)))
;; --- State initialization ;; --- State initialization
@ -40,7 +42,8 @@
;; need to perform a maintenance and delete some old tasks. ;; need to perform a maintenance and delete some old tasks.
(def ^:private tasks (def ^:private tasks
{"demo-gc" #'uxbox.tasks.demo-gc/handler {"remove-demo-profile" #'uxbox.tasks.remove-demo-profile/handler
"remove-media" #'uxbox.tasks.remove-media/handler
"sendmail" #'uxbox.tasks.sendmail/handler}) "sendmail" #'uxbox.tasks.sendmail/handler})
(defstate tasks-worker (defstate tasks-worker
@ -48,13 +51,13 @@
(vc/deploy! system $$ {:instances 1}) (vc/deploy! system $$ {:instances 1})
(deref $$))) (deref $$)))
(def ^:private schedule ;; (def ^:private schedule
[{:id "every 1 hour" ;; [{:id "every 1 hour"
:cron (dt/cron "1 1 */1 * * ? *") ;; :cron (dt/cron "1 1 */1 * * ? *")
:fn #'uxbox.tasks.demo-gc/handler ;; :fn #'uxbox.tasks.demo-gc/handler
:props {:foo 1}}]) ;; :props {:foo 1}}])
(defstate scheduler ;; (defstate scheduler
:start (as-> (impl/scheduler-verticle {:schedule schedule}) $$ ;; :start (as-> (impl/scheduler-verticle {:schedule schedule}) $$
(vc/deploy! system $$ {:instances 1 :worker true}) ;; (vc/deploy! system $$ {:instances 1 :worker true})
(deref $$))) ;; (deref $$)))

View file

@ -21,17 +21,16 @@
[uxbox.util.blob :as blob] [uxbox.util.blob :as blob]
[uxbox.util.time :as tm] [uxbox.util.time :as tm]
[vertx.core :as vc] [vertx.core :as vc]
[vertx.util :as vu]
[vertx.timers :as vt]) [vertx.timers :as vt])
(:import (:import
java.time.Duration java.time.Duration
java.time.Instant java.time.Instant
java.util.Date)) java.util.Date))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Implementation ;; Tasks
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; --- Task Execution
(defn- string-strack-trace (defn- string-strack-trace
[^Throwable err] [^Throwable err]
@ -102,7 +101,7 @@
(cond-> row (cond-> row
props (assoc :props (blob/decode props))))) props (assoc :props (blob/decode props)))))
(defn- log-error (defn- log-task-error
[item err] [item err]
(log/error "Unhandled exception on task '" (:name item) (log/error "Unhandled exception on task '" (:name item)
"' (retry:" (:retry-num item) ") \n" "' (retry:" (:retry-num item) ") \n"
@ -118,11 +117,12 @@
(p/then decode-task-row) (p/then decode-task-row)
(p/then (fn [item] (p/then (fn [item]
(when item (when item
(log/debug "Execute task " (:name item))
(-> (p/do! (handle-task tasks item)) (-> (p/do! (handle-task tasks item))
(p/handle (fn [v e] (p/handle (fn [v e]
(if e (if e
(do (do
(log-error item e) (log-task-error item e)
(if (>= (:retry-num item) max-retries) (if (>= (:retry-num item) max-retries)
(mark-as-failed conn item e) (mark-as-failed conn item e)
(reschedule conn item e))) (reschedule conn item e)))
@ -156,8 +156,9 @@
::vt/delay 3000 ::vt/delay 3000
::vt/repeat true))) ::vt/repeat true)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; --- Task Scheduling ;; Scheduled Tasks
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:privatr sql:upsert-scheduled-task (def ^:privatr sql:upsert-scheduled-task
"insert into scheduled_tasks (id, cron_expr) "insert into scheduled_tasks (id, cron_expr)
@ -180,24 +181,29 @@
(declare schedule-task) (declare schedule-task)
(defn thr-name (defn- log-scheduled-task-error
[] [item err]
(.getName (Thread/currentThread))) (log/error "Unhandled exception on scheduled task '" (:id item) "' \n"
(with-out-str
(.printStackTrace ^Throwable err (java.io.PrintWriter. *out*)))))
(defn- execute-scheduled-task (defn- execute-scheduled-task
[{:keys [id cron] :as stask}] [{:keys [id cron] :as stask}]
(db/with-atomic [conn db/pool] (db/with-atomic [conn db/pool]
;; First we try to lock the task in the database, if locking us
;; successful, then we execute the scheduled task; if locking is
;; not possible (because other instance is already locked id) we
;; just skip it and schedule to be executed in the next slot.
(-> (db/query-one conn [sql:lock-scheduled-task id]) (-> (db/query-one conn [sql:lock-scheduled-task id])
(p/then (fn [result] (p/then (fn [result]
(when result (when result
(-> (p/do! ((:fn stask) stask)) (-> (p/do! ((:fn stask) stask))
(p/catch (fn [e] (p/catch (fn [e]
(log/warn "Excepton happens on executing scheduled task" e) (log-scheduled-task-error stask e)
nil)))))) nil))))))
(p/finally (fn [v e] (p/finally (fn [v e]
(-> (vc/current-context) (-> (vu/current-context)
(schedule-task stask))))))) (schedule-task stask)))))))
(defn ms-until-valid (defn ms-until-valid
[cron] [cron]
(s/assert tm/cron? cron) (s/assert tm/cron? cron)
@ -221,9 +227,9 @@
(p/then' (fn [_] (p/then' (fn [_]
(run! #(schedule-task ctx %) schedule))))) (run! #(schedule-task ctx %) schedule)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Public API ;; Public API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; --- Worker Verticle ;; --- Worker Verticle
@ -270,14 +276,15 @@
(s/def ::delay ::us/integer) (s/def ::delay ::us/integer)
(s/def ::queue ::us/string) (s/def ::queue ::us/string)
(s/def ::task-options (s/def ::task-options
(s/keys :req-un [::name ::delay] (s/keys :req-un [::name]
:opt-un [::props ::queue])) :opt-un [::delay ::props ::queue]))
(defn schedule! (defn schedule!
[conn {:keys [name delay props queue key] :as options}] [conn {:keys [name delay props queue key]
:or {delay 0 props {} queue "default"}
:as options}]
(us/verify ::task-options options) (us/verify ::task-options options)
(let [queue (if (string? queue) queue "default") (let [duration (-> (tm/duration delay)
duration (-> (tm/duration delay)
(duration->pginterval)) (duration->pginterval))
props (blob/encode props)] props (blob/encode props)]
(-> (db/query-one conn [sql:insert-new-task name props queue duration]) (-> (db/query-one conn [sql:insert-new-task name props queue duration])

View file

@ -0,0 +1,93 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
(ns uxbox.tasks.remove-demo-profile
"Demo accounts garbage collector."
(:require
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[promesa.core :as p]
[uxbox.common.exceptions :as ex]
[uxbox.common.spec :as us]
[uxbox.db :as db]
[uxbox.media :as media]
[uxbox.util.storage :as ust]
[vertx.util :as vu]))
(declare remove-file-images)
(declare remove-images)
(declare remove-profile)
(s/def ::id ::us/uuid)
(s/def ::props
(s/keys :req-un [::id]))
(defn handler
[{:keys [props] :as task}]
(us/verify ::props props)
(prn "handler" props (.getName (Thread/currentThread)))
(db/with-atomic [conn db/pool]
(remove-file-images conn (:id props))
(remove-images conn (:id props))
(remove-profile conn (:id props))
(prn "finished" (.getName (Thread/currentThread)))))
(def ^:private sql:file-images-to-delete
"select pfi.id, pfi.path, pfi.thumb_path
from project_file_images as pfi
inner join project_files as pf on (pf.id = pfi.file_id)
inner join projects as p on (p.id = pf.project_id)
where p.user_id = $1
limit 2")
(defn remove-file-images
[conn id]
(p/loop []
(p/let [files (db/query conn [sql:file-images-to-delete id])]
(prn "remove-file-images" files)
(when-not (empty? files)
(-> (vu/blocking
(doseq [item files]
(ust/delete! media/media-storage (:path item))
(ust/delete! media/media-storage (:thumb-path item))))
(p/then' #(p/recur)))))))
(def ^:private sql:images
"select img.id, img.path, img.thumb_path
from images as img
where img.user_id = $1
limit 5")
(defn remove-files
[files]
(prn "remove-files" (.getName (Thread/currentThread)))
(doseq [item files]
(ust/delete! media/media-storage (:path item))
(ust/delete! media/media-storage (:thumb-path item)))
files)
(defn remove-images
[conn id]
(prn "remove-images" (.getName (Thread/currentThread)))
(vu/loop [i 0]
(prn "remove-images loop" i (.getName (Thread/currentThread)))
(-> (db/query conn [sql:images id])
(p/then (vu/wrap-blocking remove-files))
(p/then (fn [images]
(prn "ending" (.getName (Thread/currentThread)))
(when (and (not (empty? images))
(< i 1000))
(p/recur (inc i))))))))
(defn remove-profile
[conn id]
(let [sql "delete from users where id=$1"]
(db/query conn [sql id])))

View file

@ -7,17 +7,26 @@
;; ;;
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz> ;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
(ns uxbox.tasks.demo-gc (ns uxbox.tasks.remove-media
"Demo accounts garbage collector." "Demo accounts garbage collector."
(:require (:require
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[uxbox.common.exceptions :as ex])) [uxbox.common.exceptions :as ex]
[uxbox.common.spec :as us]
[uxbox.media :as media]
[uxbox.util.storage :as ust]
[vertx.util :as vu]))
(s/def ::path ::us/string)
(s/def ::props
(s/keys :req-un [::path]))
(defn handler (defn handler
{:uxbox.tasks/name "demo-gc"}
[{:keys [props] :as task}] [{:keys [props] :as task}]
(try (us/verify ::props props)
(Thread/sleep 100) (vu/blocking
(prn (.getName (Thread/currentThread)) "demo-gc" (:id task) (:props task)) (when (ust/exists? media/media-storage (:path props))
(catch Throwable e (ust/delete! media/media-storage (:path props))
nil))) (log/debug "Media " (:path props) " removed."))))

View file

@ -18,7 +18,7 @@
[clojure.repl :refer :all] [clojure.repl :refer :all]
[criterium.core :refer [quick-bench bench with-progress-reporting]] [criterium.core :refer [quick-bench bench with-progress-reporting]]
[promesa.core :as p] [promesa.core :as p]
[promesa.exec :as pe] [promesa.exec :as px]
[uxbox.migrations] [uxbox.migrations]
[uxbox.util.storage :as st] [uxbox.util.storage :as st]
[mount.core :as mount])) [mount.core :as mount]))