0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-04-16 17:01:33 -05:00

♻️ Rewrite all the async jobs subsystem and async email sending.

This commit is contained in:
Andrey Antukh 2019-11-22 18:08:27 +01:00
parent 3d4808e024
commit 8809c5238f
9 changed files with 232 additions and 20 deletions

View file

@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS email_queue (
modified_at timestamptz NOT NULL DEFAULT clock_timestamp(),
deleted_at timestamptz DEFAULT NULL,
data jsonb NOT NULL,
data bytea NOT NULL,
priority smallint NOT NULL DEFAULT 10
CHECK (priority BETWEEN 0 and 10),

View file

@ -47,13 +47,14 @@
:email-reply-to (lookup-env env :uxbox-email-reply-to "no-reply@uxbox.io")
:email-from (lookup-env env :uxbox-email-from "no-reply@uxbox.io")
:smtp-host (lookup-env env :uxbox-smtp-host "localhost")
:smtp-host (lookup-env env :uxbox-smtp-host "smtp")
:smtp-port (lookup-env env :uxbox-smtp-port 25)
:smtp-user (lookup-env env :uxbox-smtp-user nil)
:smtp-password (lookup-env env :uxbox-smtp-password nil)
:smtp-tls (lookup-env env :uxbox-smtp-tls false)
:smtp-ssl (lookup-env env :uxbox-smtp-ssl false)
:smtp-enabled (lookup-env env :uxbox-smtp-enabled false)
:smtp-enabled (lookup-env env :uxbox-smtp-enabled true)
:registration-enabled (lookup-env env :uxbox-registration-enabled true)
:secret (lookup-env env :uxbox-secret "5qjiAndGY3")})

View file

@ -25,12 +25,12 @@
"A new profile registration welcome email."
(emails/build :register default-context))
;; (defn render
;; [email context]
;; (let [defaults {:from (:email-from cfg/config)
;; :reply-to (:email-reply-to cfg/config)}]
;; (->> (email context)
;; (merge defaults))))
(defn render
[email context]
(let [defaults {:from (:email-from cfg/config)
:reply-to (:email-reply-to cfg/config)}]
(->> (email context)
(merge defaults))))
(defn send!
"Schedule the email for sending."
@ -42,7 +42,7 @@
data (->> (email context)
(merge defaults)
(blob/encode))
priority (case (::priority context) :low 1 :high 10)
priority (case (:priority context :high) :low 1 :high 10)
sql "insert into email_queue (data, priority)
values ($1, $2) returning *"]
(-> (db/query-one db/pool [sql data priority])

View file

@ -0,0 +1,37 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2016-2019 Andrey Antukh <niwi@niwi.nz>
(ns uxbox.jobs.gc
(:require
[promesa.core :as p]
[uxbox.core :refer [system]]
[uxbox.db :as db]
[uxbox.util.jobs :as uj]
[mount.core :as mount :refer [defstate]]))
;; TODO: add images-gc
;; TODO: add icons-gc
;; TODO: add pages-gc
;; --- Delete Projects
(def ^:private clean-deleted-projects-sql
"DELETE FROM projects
WHERE deleted_at is not null
AND (now()-deleted_at)::interval > '10 day'::interval
RETURNING id;")
(defn clean-deleted-projects
"Clean deleted projects."
[opts]
(db/with-atomic [conn db/pool]
(-> (db/query-one conn clean-deleted-projects-sql)
(p/then (constantly nil)))))
(defstate projects-cleaner-task
:start (uj/schedule! system #'clean-deleted-projects {::uj/interval 3600000})) ;; 1h

View file

@ -0,0 +1,135 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2016-2019 Andrey Antukh <niwi@niwi.nz>
(ns uxbox.jobs.sendmail
"Email sending jobs."
(:require
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[promesa.core :as p]
[uxbox.core :refer [system]]
[postal.core :as postal]
[uxbox.config :as cfg]
[uxbox.db :as db]
[uxbox.util.jobs :as uj]
[uxbox.util.blob :as blob]
[uxbox.util.exceptions :as ex]
[mount.core :as mount :refer [defstate]]))
;; TODO: implement low priority sending emails.
(defn- decode-email-row
[{:keys [data] :as row}]
(when row
(cond-> row
data (assoc :data (blob/decode data)))))
(defn- fetch-emails
[conn]
(let [sql "select eq.* from email_queue as eq
where eq.status = 'pending'
and eq.priority = 10
and eq.deleted_at is null
order by eq.priority desc,
eq.created_at desc;"]
(-> (db/query conn sql)
(p/then (partial mapv decode-email-row)))))
(defn- fetch-failed-emails
[conn]
(let [sql "select eq.* from email_queue as eq
where eq.status = 'failed'
and eq.deleted_at is null
and eq.retries < $1
order by eq.priority desc,
eq.created_at desc;"]
(-> (db/query conn sql)
(p/then (partial mapv decode-email-row)))))
(defn- mark-email-as-sent
[conn id]
(let [sql "update email_queue
set status = 'ok'
where id = $1
and deleted_at is null;"]
(-> (db/query-one conn [sql id])
(p/then (constantly nil)))))
(defn- mark-email-as-failed
[conn id]
(let [sql "update email_queue
set status = 'failed',
retries = retries + 1
where id = $1
and deleted_at is null;"]
(-> (db/query-one conn [sql id])
(p/then (constantly nil)))))
(defn- get-smtp-config
[config]
{:host (:smtp-host config)
:port (:smtp-port config)
:user (:smtp-user config)
:pass (:smtp-password config)
:ssl (:smtp-ssl config)
:tls (:smtp-tls config)
:noop (not (:smtp-enabled config))})
(defn- send-email-to-console
[email]
(let [out (with-out-str
(println "email console dump:")
(println "******** start email" (:id email) "**********")
(println " from: " (:from email))
(println " to: " (:to email "---"))
(println " reply-to: " (:reply-to email))
(println " subject: " (:subject email))
(println " content:")
(doseq [item (rest (:body email))]
(when (str/starts-with? (:type item) "text/plain")
(println (:content item))))
(println "******** end email "(:id email) "**********"))]
(log/info out)
{:error :SUCCESS}))
(defn impl-sendmail
[email]
(p/future
(let [config (get-smtp-config cfg/config)
result (if (:noop config)
(send-email-to-console email)
(postal/send-message config email))]
(when (not= (:error result) :SUCCESS)
(ex/raise :type :sendmail-error
:code :email-not-sent
:context result))
nil)))
(defn send-email
[conn {:keys [id data] :as entry}]
(-> (impl-sendmail data)
(p/then (fn [_]
(mark-email-as-sent conn id)))
(p/catch (fn [e]
(log/error e "Error on sending email" id)
(mark-email-as-failed conn id)))))
;; --- Main Task Functions
(defn send-emails
[opts]
(db/with-atomic [conn db/pool]
(p/let [items (fetch-emails conn)]
(p/run! (partial send-email conn) items))))
(defn send-failed-emails
[opts]
(db/with-atomic [conn db/pool]
(p/let [items (fetch-failed-emails conn)]
(p/run! (partial send-email conn) items))))
(defstate sendmail-task
:start (uj/schedule! system #'send-emails {::uj/interval (* 10 1000)})) ;; 20s

View file

@ -59,11 +59,11 @@
:hint "Seems like the email template has invalid data."
:contex data))
{:subject (:subject data)
:body [:alternatives
:body [:alternative
{:type "text/plain; charset=utf-8"
:contex (:body-text data)}
:content (:body-text data)}
{:type "text/html; charset=utf-8"
:contex (:body-html data)}]})
:content (:body-html data)}]})
(defn- impl-build-email
[id context]
@ -98,6 +98,7 @@
:code :email-template-does-not-exists
:hint "seems like the template is wrong or does not exists."
::id id))
(cond-> (assoc email :id id)
(cond-> (assoc email :id (name id))
(:to context) (assoc :to (:to context))
(:from context) (assoc :from (:from context))
(:reply-to context) (assoc :reply-to (:reply-to context)))))))

View file

@ -0,0 +1,38 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2016-2019 Andrey Antukh <niwi@niwi.nz>
(ns uxbox.util.jobs
"Scheduled jobs facilities."
(:require
[clojure.tools.logging :as log]
[clojure.spec.alpha :as s]
[promesa.core :as p]
[vertx.timers :as vt]
[vertx.util :as vu]))
(defn schedule!
[vsm f {:keys [::interval] :as options}]
(s/assert var? f)
(let [system (vu/resolve-system vsm)
state (atom nil)
taskfn (fn wrapped-task []
(-> (p/do! (@f options))
(p/catch (fn [err]
(log/error err "Error on executing the task")
nil))
(p/then (fn [_]
(let [tid (vt/schedule-once! vsm interval wrapped-task)]
(reset! state tid)
nil)))))
tid (vt/schedule-once! vsm interval taskfn)]
(reset! state tid)
(reify
java.lang.AutoCloseable
(close [this]
(locking this
(when-let [timer-id (deref state)]
(.cancelTimer system timer-id)
(reset! state nil)))))))

View file

@ -13,6 +13,8 @@
[uxbox.util.exceptions :as ex])
(:import java.time.Instant))
(s/check-asserts true)
;; --- Constants
(def email-rx

View file

@ -10,7 +10,7 @@
[cuerdas.core :as str]
[beicon.core :as rx]
[potok.core :as ptk]
[uxbox.main.repo :as rp]
[uxbox.main.repo.core :as rp]
[uxbox.main.data.pages :as udp]
[uxbox.util.uuid :as uuid]
[uxbox.util.spec :as us]
@ -91,8 +91,7 @@
(defrecord FetchProjects []
ptk/WatchEvent
(watch [_ state stream]
(->> (rp/req :fetch/projects)
(rx/map :payload)
(->> (rp/query :projects)
(rx/map projects-fetched))))
(defn fetch-projects
@ -117,8 +116,7 @@
ptk/WatchEvent
(watch [_ state stream]
(let [project (get-in state [:projects id])]
(->> (rp/req :update/project project)
(rx/map :payload)
(->> (rp/mutation :update-project project)
(rx/map project-persisted)))))
(defn persist-project
@ -149,7 +147,7 @@
(watch [_ state s]
(letfn [(on-success [_]
#(dissoc-project % id))]
(->> (rp/req :delete/project id)
(->> (rp/mutation :delete-project {:id id})
(rx/map on-success)))))
(defn delete-project