0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-23 23:35:58 -05:00

Merge pull request #2596 from penpot/niwinz-backend-webhooks

 Improve scalability of the worker abstraction
This commit is contained in:
Andrey Antukh 2022-11-28 12:46:41 +01:00 committed by GitHub
commit 1c2a462124
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 951 additions and 755 deletions

View file

@ -7,6 +7,7 @@
app.common.data/export clojure.core/def
app.db/with-atomic clojure.core/with-open
app.common.data.macros/get-in clojure.core/get-in
app.common.data.macros/with-open clojure.core/with-open
app.common.data.macros/select-keys clojure.core/select-keys
app.common.logging/with-context clojure.core/do}

View file

@ -106,7 +106,8 @@
(s/def ::file-change-snapshot-timeout ::dt/duration)
(s/def ::default-executor-parallelism ::us/integer)
(s/def ::worker-executor-parallelism ::us/integer)
(s/def ::scheduled-executor-parallelism ::us/integer)
(s/def ::worker-parallelism ::us/integer)
(s/def ::authenticated-cookie-domain ::us/string)
(s/def ::authenticated-cookie-name ::us/string)
@ -218,7 +219,8 @@
::default-rpc-rlimit
::error-report-webhook
::default-executor-parallelism
::worker-executor-parallelism
::scheduled-executor-parallelism
::worker-parallelism
::file-change-snapshot-every
::file-change-snapshot-timeout
::user-feedback-destination

View file

@ -493,3 +493,18 @@
(let [n (xact-check-param n)
row (exec-one! conn ["select pg_try_advisory_xact_lock(?::bigint) as lock" n])]
(:lock row)))
(defn sql-exception?
[cause]
(instance? java.sql.SQLException cause))
(defn connection-error?
[cause]
(and (sql-exception? cause)
(contains? #{"08003" "08006" "08001" "08004"}
(.getSQLState ^java.sql.SQLException cause))))
(defn serialization-error?
[cause]
(and (sql-exception? cause)
(= "40001" (.getSQLState ^java.sql.SQLException cause))))

View file

@ -9,8 +9,13 @@
[app.auth.oidc]
[app.common.logging :as l]
[app.config :as cf]
[app.db :as-alias db]
[app.metrics :as-alias mtx]
[app.metrics.definition :as-alias mdef]
[app.redis :as-alias rds]
[app.storage :as-alias sto]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[cuerdas.core :as str]
[integrant.core :as ig])
(:gen-class))
@ -120,91 +125,83 @@
::mdef/type :gauge}})
(def system-config
{:app.db/pool
{::db/pool
{:uri (cf/get :database-uri)
:username (cf/get :database-username)
:password (cf/get :database-password)
:read-only (cf/get :database-readonly false)
:metrics (ig/ref :app.metrics/metrics)
:metrics (ig/ref ::mtx/metrics)
:migrations (ig/ref :app.migrations/all)
:name :main
:min-size (cf/get :database-min-pool-size 0)
:max-size (cf/get :database-max-pool-size 60)}
;; Default thread pool for IO operations
[::default :app.worker/executor]
{:parallelism (cf/get :default-executor-parallelism 70)}
::wrk/executor
{::wrk/parallelism (cf/get :default-executor-parallelism 100)}
;; Dedicated thread pool for background tasks execution.
[::worker :app.worker/executor]
{:parallelism (cf/get :worker-executor-parallelism 20)}
::wrk/scheduled-executor
{::wrk/parallelism (cf/get :scheduled-executor-parallelism 20)}
:app.worker/scheduler
{:parallelism 1
:prefix :scheduler}
:app.worker/executors
{:default (ig/ref [::default :app.worker/executor])
:worker (ig/ref [::worker :app.worker/executor])}
:app.worker/executor-monitor
{:metrics (ig/ref :app.metrics/metrics)
:executors (ig/ref :app.worker/executors)}
::wrk/monitor
{::mtx/metrics (ig/ref ::mtx/metrics)
::wrk/name "default"
::wrk/executor (ig/ref ::wrk/executor)}
:app.migrations/migrations
{}
:app.metrics/metrics
::mtx/metrics
{:default default-metrics}
:app.migrations/all
{:main (ig/ref :app.migrations/migrations)}
:app.redis/redis
{:uri (cf/get :redis-uri)
:metrics (ig/ref :app.metrics/metrics)}
::rds/redis
{::rds/uri (cf/get :redis-uri)
::mtx/metrics (ig/ref ::mtx/metrics)}
:app.msgbus/msgbus
{:backend (cf/get :msgbus-backend :redis)
:executor (ig/ref [::default :app.worker/executor])
:redis (ig/ref :app.redis/redis)}
:executor (ig/ref ::wrk/executor)
:redis (ig/ref ::rds/redis)}
:app.storage.tmp/cleaner
{:executor (ig/ref [::worker :app.worker/executor])
:scheduler (ig/ref :app.worker/scheduler)}
{::wrk/executor (ig/ref ::wrk/executor)
::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)}
:app.storage/gc-deleted-task
{:pool (ig/ref :app.db/pool)
:storage (ig/ref :app.storage/storage)
:executor (ig/ref [::worker :app.worker/executor])}
::sto/gc-deleted-task
{:pool (ig/ref ::db/pool)
:storage (ig/ref ::sto/storage)
:executor (ig/ref ::wrk/executor)}
:app.storage/gc-touched-task
{:pool (ig/ref :app.db/pool)}
::sto/gc-touched-task
{:pool (ig/ref ::db/pool)}
:app.http/client
{:executor (ig/ref [::default :app.worker/executor])}
{:executor (ig/ref ::wrk/executor)}
:app.http.session/manager
{:pool (ig/ref :app.db/pool)
{:pool (ig/ref ::db/pool)
:sprops (ig/ref :app.setup/props)
:executor (ig/ref [::default :app.worker/executor])}
:executor (ig/ref ::wrk/executor)}
:app.http.session/gc-task
{:pool (ig/ref :app.db/pool)
{:pool (ig/ref ::db/pool)
:max-age (cf/get :auth-token-cookie-max-age)}
:app.http.awsns/handler
{:sprops (ig/ref :app.setup/props)
:pool (ig/ref :app.db/pool)
:pool (ig/ref ::db/pool)
:http-client (ig/ref :app.http/client)
:executor (ig/ref [::worker :app.worker/executor])}
:executor (ig/ref ::wrk/executor)}
:app.http/server
{:port (cf/get :http-server-port)
:host (cf/get :http-server-host)
:router (ig/ref :app.http/router)
:metrics (ig/ref :app.metrics/metrics)
:executor (ig/ref [::default :app.worker/executor])
:metrics (ig/ref ::mtx/metrics)
:executor (ig/ref ::wrk/executor)
:io-threads (cf/get :http-server-io-threads)
:max-body-size (cf/get :http-server-max-body-size)
:max-multipart-body-size (cf/get :http-server-max-multipart-body-size)}
@ -264,10 +261,10 @@
:oidc (ig/ref :app.auth.oidc/generic-provider)}
:sprops (ig/ref :app.setup/props)
:http-client (ig/ref :app.http/client)
:pool (ig/ref :app.db/pool)
:pool (ig/ref ::db/pool)
:session (ig/ref :app.http.session/manager)
:public-uri (cf/get :public-uri)
:executor (ig/ref [::default :app.worker/executor])}
:executor (ig/ref ::wrk/executor)}
;; TODO: revisit the dependencies of this service, looks they are too much unused of them
:app.http/router
@ -278,61 +275,60 @@
:debug-routes (ig/ref :app.http.debug/routes)
:oidc-routes (ig/ref :app.auth.oidc/routes)
:ws (ig/ref :app.http.websocket/handler)
:metrics (ig/ref :app.metrics/metrics)
:metrics (ig/ref ::mtx/metrics)
:public-uri (cf/get :public-uri)
:storage (ig/ref :app.storage/storage)
:storage (ig/ref ::sto/storage)
:audit-handler (ig/ref :app.loggers.audit/http-handler)
:rpc-routes (ig/ref :app.rpc/routes)
:doc-routes (ig/ref :app.rpc.doc/routes)
:executor (ig/ref [::default :app.worker/executor])}
:executor (ig/ref ::wrk/executor)}
:app.http.debug/routes
{:pool (ig/ref :app.db/pool)
:executor (ig/ref [::worker :app.worker/executor])
:storage (ig/ref :app.storage/storage)
{:pool (ig/ref ::db/pool)
:executor (ig/ref ::wrk/executor)
:storage (ig/ref ::sto/storage)
:session (ig/ref :app.http.session/manager)}
:app.http.websocket/handler
{:pool (ig/ref :app.db/pool)
:metrics (ig/ref :app.metrics/metrics)
{:pool (ig/ref ::db/pool)
:metrics (ig/ref ::mtx/metrics)
:msgbus (ig/ref :app.msgbus/msgbus)}
:app.http.assets/handlers
{:metrics (ig/ref :app.metrics/metrics)
{:metrics (ig/ref ::mtx/metrics)
:assets-path (cf/get :assets-path)
:storage (ig/ref :app.storage/storage)
:executor (ig/ref [::default :app.worker/executor])
:storage (ig/ref ::sto/storage)
:executor (ig/ref ::wrk/executor)
:cache-max-age (dt/duration {:hours 24})
:signature-max-age (dt/duration {:hours 24 :minutes 5})}
:app.http.feedback/handler
{:pool (ig/ref :app.db/pool)
:executor (ig/ref [::default :app.worker/executor])}
{:pool (ig/ref ::db/pool)
:executor (ig/ref ::wrk/executor)}
:app.rpc/climit
{:metrics (ig/ref :app.metrics/metrics)
:executor (ig/ref [::default :app.worker/executor])}
{:metrics (ig/ref ::mtx/metrics)
:executor (ig/ref ::wrk/executor)}
:app.rpc/rlimit
{:executor (ig/ref [::worker :app.worker/executor])
:scheduler (ig/ref :app.worker/scheduler)}
{:executor (ig/ref ::wrk/executor)
:scheduled-executor (ig/ref ::wrk/scheduled-executor)}
:app.rpc/methods
{:pool (ig/ref :app.db/pool)
{:pool (ig/ref ::db/pool)
:session (ig/ref :app.http.session/manager)
:sprops (ig/ref :app.setup/props)
:metrics (ig/ref :app.metrics/metrics)
:storage (ig/ref :app.storage/storage)
:metrics (ig/ref ::mtx/metrics)
:storage (ig/ref ::sto/storage)
:msgbus (ig/ref :app.msgbus/msgbus)
:public-uri (cf/get :public-uri)
:redis (ig/ref :app.redis/redis)
:redis (ig/ref ::rds/redis)
:audit (ig/ref :app.loggers.audit/collector)
:ldap (ig/ref :app.auth.ldap/provider)
:http-client (ig/ref :app.http/client)
:climit (ig/ref :app.rpc/climit)
:rlimit (ig/ref :app.rpc/rlimit)
:executors (ig/ref :app.worker/executors)
:executor (ig/ref [::default :app.worker/executor])
:executor (ig/ref ::wrk/executor)
:templates (ig/ref :app.setup/builtin-templates)
}
@ -342,15 +338,15 @@
:app.rpc/routes
{:methods (ig/ref :app.rpc/methods)}
:app.worker/registry
{:metrics (ig/ref :app.metrics/metrics)
::wrk/registry
{:metrics (ig/ref ::mtx/metrics)
:tasks
{:sendmail (ig/ref :app.emails/handler)
:objects-gc (ig/ref :app.tasks.objects-gc/handler)
:file-gc (ig/ref :app.tasks.file-gc/handler)
:file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler)
:storage-gc-deleted (ig/ref :app.storage/gc-deleted-task)
:storage-gc-touched (ig/ref :app.storage/gc-touched-task)
:storage-gc-deleted (ig/ref ::sto/gc-deleted-task)
:storage-gc-touched (ig/ref ::sto/gc-touched-task)
:tasks-gc (ig/ref :app.tasks.tasks-gc/handler)
:telemetry (ig/ref :app.tasks.telemetry/handler)
:session-gc (ig/ref :app.http.session/gc-task)
@ -370,24 +366,24 @@
:app.emails/handler
{:sendmail (ig/ref :app.emails/sendmail)
:metrics (ig/ref :app.metrics/metrics)}
:metrics (ig/ref ::mtx/metrics)}
:app.tasks.tasks-gc/handler
{:pool (ig/ref :app.db/pool)
{:pool (ig/ref ::db/pool)
:max-age cf/deletion-delay}
:app.tasks.objects-gc/handler
{:pool (ig/ref :app.db/pool)
:storage (ig/ref :app.storage/storage)}
{:pool (ig/ref ::db/pool)
:storage (ig/ref ::sto/storage)}
:app.tasks.file-gc/handler
{:pool (ig/ref :app.db/pool)}
{:pool (ig/ref ::db/pool)}
:app.tasks.file-xlog-gc/handler
{:pool (ig/ref :app.db/pool)}
{:pool (ig/ref ::db/pool)}
:app.tasks.telemetry/handler
{:pool (ig/ref :app.db/pool)
{:pool (ig/ref ::db/pool)
:version (:full cf/version)
:uri (cf/get :telemetry-uri)
:sprops (ig/ref :app.setup/props)
@ -401,28 +397,28 @@
{:http-client (ig/ref :app.http/client)}
:app.setup/props
{:pool (ig/ref :app.db/pool)
{:pool (ig/ref ::db/pool)
:key (cf/get :secret-key)}
:app.loggers.zmq/receiver
{:endpoint (cf/get :loggers-zmq-uri)}
:app.loggers.audit/http-handler
{:pool (ig/ref :app.db/pool)
:executor (ig/ref [::default :app.worker/executor])}
{:pool (ig/ref ::db/pool)
:executor (ig/ref ::wrk/executor)}
:app.loggers.audit/collector
{:pool (ig/ref :app.db/pool)
:executor (ig/ref [::worker :app.worker/executor])}
{:pool (ig/ref ::db/pool)
:executor (ig/ref ::wrk/executor)}
:app.loggers.audit/archive-task
{:uri (cf/get :audit-log-archive-uri)
:sprops (ig/ref :app.setup/props)
:pool (ig/ref :app.db/pool)
:pool (ig/ref ::db/pool)
:http-client (ig/ref :app.http/client)}
:app.loggers.audit/gc-task
{:pool (ig/ref :app.db/pool)}
{:pool (ig/ref ::db/pool)}
:app.loggers.loki/reporter
{:uri (cf/get :loggers-loki-uri)
@ -436,12 +432,12 @@
:app.loggers.database/reporter
{:receiver (ig/ref :app.loggers.zmq/receiver)
:pool (ig/ref :app.db/pool)
:executor (ig/ref [::worker :app.worker/executor])}
:pool (ig/ref ::db/pool)
:executor (ig/ref ::wrk/executor)}
:app.storage/storage
{:pool (ig/ref :app.db/pool)
:executor (ig/ref [::default :app.worker/executor])
::sto/storage
{:pool (ig/ref ::db/pool)
:executor (ig/ref ::wrk/executor)
:backends
{:assets-s3 (ig/ref [::assets :app.storage.s3/backend])
@ -455,7 +451,7 @@
{:region (cf/get :storage-assets-s3-region)
:endpoint (cf/get :storage-assets-s3-endpoint)
:bucket (cf/get :storage-assets-s3-bucket)
:executor (ig/ref [::default :app.worker/executor])}
:executor (ig/ref ::wrk/executor)}
[::assets :app.storage.fs/backend]
{:directory (cf/get :storage-assets-fs-directory)}
@ -463,12 +459,11 @@
(def worker-config
{:app.worker/cron
{:executor (ig/ref [::worker :app.worker/executor])
:scheduler (ig/ref :app.worker/scheduler)
:tasks (ig/ref :app.worker/registry)
:pool (ig/ref :app.db/pool)
:entries
{::wrk/cron
{::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)
::wrk/registry (ig/ref ::wrk/registry)
::db/pool (ig/ref ::db/pool)
::wrk/entries
[{:cron #app/cron "0 0 * * * ?" ;; hourly
:task :file-xlog-gc}
@ -501,11 +496,18 @@
{:cron #app/cron "30 */5 * * * ?" ;; every 5m
:task :audit-log-gc})]}
:app.worker/worker
{:executor (ig/ref [::worker :app.worker/executor])
:tasks (ig/ref :app.worker/registry)
:metrics (ig/ref :app.metrics/metrics)
:pool (ig/ref :app.db/pool)}})
::wrk/scheduler
{::rds/redis (ig/ref ::rds/redis)
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}
::wrk/worker
{::wrk/parallelism (cf/get ::worker-parallelism 3)
::wrk/queue "default"
::rds/redis (ig/ref ::rds/redis)
::wrk/registry (ig/ref ::wrk/registry)
::mtx/metrics (ig/ref ::mtx/metrics)
::db/pool (ig/ref ::db/pool)}})
(def system nil)

View file

@ -20,7 +20,8 @@
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]))
[promesa.core :as p]
[promesa.exec :as px]))
(set! *warn-on-reflection* true)
@ -52,8 +53,8 @@
(s/def ::rcv-ch ::aa/channel)
(s/def ::pub-ch ::aa/channel)
(s/def ::state ::us/agent)
(s/def ::pconn ::redis/connection)
(s/def ::sconn ::redis/connection)
(s/def ::pconn ::redis/connection-holder)
(s/def ::sconn ::redis/connection-holder)
(s/def ::msgbus
(s/keys :req [::cmd-ch ::rcv-ch ::pub-ch ::state ::pconn ::sconn ::wrk/executor]))
@ -122,8 +123,8 @@
(defn- redis-disconnect
[{:keys [::pconn ::sconn] :as cfg}]
(redis/close! pconn)
(redis/close! sconn))
(d/close! pconn)
(d/close! sconn))
(defn- conj-subscription
"A low level function that is responsible to create on-demand
@ -205,9 +206,10 @@
(when-let [closed (a/<! (send-to-topic topic message))]
(send-via executor state unsubscribe-channels cfg closed nil))))
]
(a/go-loop []
(let [[val port] (a/alts! [pub-ch rcv-ch])]
(px/thread
{:name "penpot/msgbus-io-loop"}
(loop []
(let [[val port] (a/alts!! [pub-ch rcv-ch])]
(cond
(nil? val)
(do
@ -221,15 +223,16 @@
(= port rcv-ch)
(do
(a/<! (process-incoming val))
(a/<!! (process-incoming val))
(recur))
(= port pub-ch)
(let [result (a/<! (redis-pub cfg val))]
(let [result (a/<!! (redis-pub cfg val))]
(when (ex/exception? result)
(l/error :hint "unexpected error on publishing" :message val
(l/error :hint "unexpected error on publishing"
:message val
:cause result))
(recur)))))))
(recur))))))))
(defn- redis-pub
"Publish a message to the redis server. Asynchronous operation,

View file

@ -21,13 +21,19 @@
[promesa.core :as p])
(:import
clojure.lang.IDeref
clojure.lang.MapEntry
io.lettuce.core.KeyValue
io.lettuce.core.RedisClient
io.lettuce.core.RedisCommandInterruptedException
io.lettuce.core.RedisCommandTimeoutException
io.lettuce.core.RedisException
io.lettuce.core.RedisURI
io.lettuce.core.ScriptOutputType
io.lettuce.core.api.StatefulConnection
io.lettuce.core.api.StatefulRedisConnection
io.lettuce.core.api.async.RedisAsyncCommands
io.lettuce.core.api.async.RedisScriptingAsyncCommands
io.lettuce.core.api.sync.RedisCommands
io.lettuce.core.codec.ByteArrayCodec
io.lettuce.core.codec.RedisCodec
io.lettuce.core.codec.StringCodec
@ -45,13 +51,12 @@
(declare initialize-resources)
(declare shutdown-resources)
(declare connect)
(declare close!)
(declare connect*)
(s/def ::timer
#(instance? Timer %))
(s/def ::connection
(s/def ::default-connection
#(or (instance? StatefulRedisConnection %)
(and (instance? IDeref %)
(instance? StatefulRedisConnection (deref %)))))
@ -61,6 +66,13 @@
(and (instance? IDeref %)
(instance? StatefulRedisPubSubConnection (deref %)))))
(s/def ::connection
(s/or :default ::default-connection
:pubsub ::pubsub-connection))
(s/def ::connection-holder
(s/keys :req [::connection]))
(s/def ::redis-uri
#(instance? RedisURI %))
@ -75,32 +87,37 @@
(s/def ::connect? ::us/boolean)
(s/def ::io-threads ::us/integer)
(s/def ::worker-threads ::us/integer)
(s/def ::cache #(instance? clojure.lang.Atom %))
(s/def ::redis
(s/keys :req [::resources ::redis-uri ::timer ::mtx/metrics]
:opt [::connection]))
(defmethod ig/pre-init-spec ::redis [_]
(s/keys :req-un [::uri ::mtx/metrics]
:opt-un [::timeout
::connect?
::io-threads
::worker-threads]))
(s/keys :req [::resources
::redis-uri
::timer
::mtx/metrics]
:opt [::connection
::cache]))
(defmethod ig/prep-key ::redis
[_ cfg]
(let [runtime (Runtime/getRuntime)
cpus (.availableProcessors ^Runtime runtime)]
(merge {:timeout (dt/duration 5000)
:io-threads (max 3 cpus)
:worker-threads (max 3 cpus)}
(merge {::timeout (dt/duration "10s")
::io-threads (max 3 cpus)
::worker-threads (max 3 cpus)}
(d/without-nils cfg))))
(defmethod ig/pre-init-spec ::redis [_]
(s/keys :req [::uri ::mtx/metrics]
:opt [::timeout
::connect?
::io-threads
::worker-threads]))
(defmethod ig/init-key ::redis
[_ {:keys [connect?] :as cfg}]
(let [cfg (initialize-resources cfg)]
(cond-> cfg
connect? (assoc ::connection (connect cfg)))))
[_ {:keys [::connect?] :as cfg}]
(let [state (initialize-resources cfg)]
(cond-> state
connect? (assoc ::connection (connect* cfg {})))))
(defmethod ig/halt-key! ::redis
[_ state]
@ -114,7 +131,7 @@
(defn- initialize-resources
"Initialize redis connection resources"
[{:keys [uri io-threads worker-threads connect? metrics] :as cfg}]
[{:keys [::uri ::io-threads ::worker-threads ::connect?] :as cfg}]
(l/info :hint "initialize redis resources"
:uri uri
:io-threads io-threads
@ -131,34 +148,32 @@
redis-uri (RedisURI/create ^String uri)]
(-> cfg
(assoc ::mtx/metrics metrics)
(assoc ::cache (atom {}))
(assoc ::resources resources)
(assoc ::timer timer)
(assoc ::redis-uri redis-uri)
(assoc ::resources resources))))
(assoc ::cache (atom {}))
(assoc ::redis-uri redis-uri))))
(defn- shutdown-resources
[{:keys [::resources ::cache ::timer]}]
(run! close! (vals @cache))
(run! d/close! (vals @cache))
(when resources
(.shutdown ^ClientResources resources))
(when timer
(.stop ^Timer timer)))
(defn connect
[{:keys [::resources ::redis-uri] :as cfg}
& {:keys [timeout codec type] :or {codec default-codec type :default}}]
(defn connect*
[{:keys [::resources ::redis-uri] :as state}
{:keys [timeout codec type]
:or {codec default-codec type :default}}]
(us/assert! ::resources resources)
(let [client (RedisClient/create ^ClientResources resources ^RedisURI redis-uri)
timeout (or timeout (:timeout cfg))
timeout (or timeout (::timeout state))
conn (case type
:default (.connect ^RedisClient client ^RedisCodec codec)
:pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))]
(.setTimeout ^StatefulConnection conn ^Duration timeout)
(reify
IDeref
(deref [_] conn)
@ -168,53 +183,113 @@
(.close ^StatefulConnection conn)
(.shutdown ^RedisClient client)))))
(defn connect
[state & {:as opts}]
(let [connection (connect* state opts)]
(-> state
(assoc ::connection connection)
(dissoc ::cache)
(vary-meta assoc `d/close! (fn [_] (d/close! connection))))))
(defn get-or-connect
[{:keys [::cache] :as state} key options]
(assoc state ::connection
(-> state
(assoc ::connection
(or (get @cache key)
(-> (swap! cache (fn [cache]
(when-let [prev (get cache key)]
(close! prev))
(assoc cache key (connect state options))))
(get key)))))
(d/close! prev))
(assoc cache key (connect* state options))))
(get key))))
(dissoc ::cache)))
(defn add-listener!
[conn listener]
(us/assert! ::pubsub-connection @conn)
[{:keys [::connection] :as conn} listener]
(us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(us/assert! ::pubsub-listener listener)
(.addListener ^StatefulRedisPubSubConnection @conn
(.addListener ^StatefulRedisPubSubConnection @connection
^RedisPubSubListener listener)
conn)
(defn publish!
[conn topic message]
[{:keys [::connection] :as conn} topic message]
(us/assert! ::us/string topic)
(us/assert! ::us/bytes message)
(us/assert! ::connection @conn)
(us/assert! ::connection-holder conn)
(us/assert! ::default-connection connection)
(let [pcomm (.async ^StatefulRedisConnection @conn)]
(let [pcomm (.async ^StatefulRedisConnection @connection)]
(.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message)))
(defn subscribe!
"Blocking operation, intended to be used on a worker/agent thread."
[conn & topics]
(us/assert! ::pubsub-connection @conn)
"Blocking operation, intended to be used on a thread/agent thread."
[{:keys [::connection] :as conn} & topics]
(us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(try
(let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @conn)]
(.subscribe ^RedisPubSubCommands cmd topics)))
cmd (.sync ^StatefulRedisPubSubConnection @connection)]
(.subscribe ^RedisPubSubCommands cmd topics))
(catch RedisCommandInterruptedException cause
(throw (InterruptedException. (ex-message cause))))))
(defn unsubscribe!
"Blocking operation, intended to be used on a worker/agent thread."
[conn & topics]
(us/assert! ::pubsub-connection @conn)
"Blocking operation, intended to be used on a thread/agent thread."
[{:keys [::connection] :as conn} & topics]
(us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(try
(let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @conn)]
(.unsubscribe ^RedisPubSubCommands cmd topics)))
cmd (.sync ^StatefulRedisPubSubConnection @connection)]
(.unsubscribe ^RedisPubSubCommands cmd topics))
(catch RedisCommandInterruptedException cause
(throw (InterruptedException. (ex-message cause))))))
(defn rpush!
[{:keys [::connection] :as conn} key payload]
(us/assert! ::connection-holder conn)
(us/assert! (or (and (vector? payload)
(every? bytes? payload))
(bytes? payload)))
(try
(let [cmd (.sync ^StatefulRedisConnection @connection)
data (if (vector? payload) payload [payload])
vals (make-array (. Class (forName "[B")) (count data))]
(loop [i 0 xs (seq data)]
(when xs
(aset ^"[[B" vals i ^bytes (first xs))
(recur (inc i) (next xs))))
(.rpush ^RedisCommands cmd
^String key
^"[[B" vals))
(catch RedisCommandInterruptedException cause
(throw (InterruptedException. (ex-message cause))))))
(defn blpop!
[{:keys [::connection] :as conn} timeout & keys]
(us/assert! ::connection-holder conn)
(try
(let [keys (into-array Object (map str keys))
cmd (.sync ^StatefulRedisConnection @connection)
timeout (/ (double (inst-ms timeout)) 1000.0)]
(when-let [res (.blpop ^RedisCommands cmd
^double timeout
^"[Ljava.lang.String;" keys)]
(MapEntry/create
(.getKey ^KeyValue res)
(.getValue ^KeyValue res))))
(catch RedisCommandInterruptedException cause
(throw (InterruptedException. (ex-message cause))))))
(defn open?
[conn]
(.isOpen ^StatefulConnection @conn))
[{:keys [::connection] :as conn}]
(us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(.isOpen ^StatefulConnection @connection))
(defn pubsub-listener
[& {:keys [on-message on-subscribe on-unsubscribe]}]
@ -243,10 +318,6 @@
(when on-unsubscribe
(on-unsubscribe nil topic count)))))
(defn close!
[o]
(.close ^AutoCloseable o))
(def ^:private scripts-cache (atom {}))
(def noop-fn (constantly nil))
@ -262,12 +333,12 @@
::rscript/vals]))
(defn eval!
[{:keys [::mtx/metrics] :as state} script]
(us/assert! ::rscript/script script)
[{:keys [::mtx/metrics ::connection] :as state} script]
(us/assert! ::redis state)
(us/assert! ::connection-holder state)
(us/assert! ::rscript/script script)
(let [rconn (-> state ::connection deref)
cmd (.async ^StatefulRedisConnection rconn)
(let [cmd (.async ^StatefulRedisConnection @connection)
keys (into-array String (map str (::rscript/keys script)))
vals (into-array String (map str (::rscript/vals script)))
sname (::rscript/name script)]
@ -276,20 +347,20 @@
(if (instance? io.lettuce.core.RedisNoScriptException cause)
(do
(l/error :hint "no script found" :name sname :cause cause)
(-> (load-script)
(p/then eval-script)))
(->> (load-script)
(p/mapcat eval-script)))
(if-let [on-error (::rscript/on-error script)]
(on-error cause)
(p/rejected cause))))
(eval-script [sha]
(let [tpoint (dt/tpoint)]
(-> (.evalsha ^RedisScriptingAsyncCommands cmd
(->> (.evalsha ^RedisScriptingAsyncCommands cmd
^String sha
^ScriptOutputType ScriptOutputType/MULTI
^"[Ljava.lang.String;" keys
^"[Ljava.lang.String;" vals)
(p/then (fn [result]
(p/map (fn [result]
(let [elapsed (tpoint)]
(mtx/run! metrics {:id :redis-eval-timing
:labels [(name sname)]
@ -300,20 +371,28 @@
:params (str/join "," (::rscript/vals script))
:elapsed (dt/format-duration elapsed))
result)))
(p/catch on-error))))
(p/error on-error))))
(read-script []
(-> script ::rscript/path io/resource slurp))
(load-script []
(l/trace :hint "load script" :name sname)
(-> (.scriptLoad ^RedisScriptingAsyncCommands cmd
(->> (.scriptLoad ^RedisScriptingAsyncCommands cmd
^String (read-script))
(p/then (fn [sha]
(p/map (fn [sha]
(swap! scripts-cache assoc sname sha)
sha))))]
(if-let [sha (get @scripts-cache sname)]
(eval-script sha)
(-> (load-script)
(p/then eval-script))))))
(->> (load-script)
(p/mapcat eval-script))))))
(defn timeout-exception?
[cause]
(instance? RedisCommandTimeoutException cause))
(defn exception?
[cause]
(instance? RedisException cause))

View file

@ -23,6 +23,7 @@
[app.storage :as-alias sto]
[app.util.services :as sv]
[app.util.time :as ts]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]
@ -270,6 +271,7 @@
::http-client
::rlimit
::climit
::wrk/executor
::mtx/metrics
::db/pool
::ldap]))

View file

@ -63,16 +63,16 @@
(l/trace :hint "enqueued"
:key (name bkey)
:skey (str skey)
:queue-size (get instance :current-queue-size)
:concurrency (get instance :current-concurrency)
:queue-size (get instance ::pxb/current-queue-size)
:concurrency (get instance ::pxb/current-concurrency))
(mtx/run! metrics
:id :rpc-climit-queue-size
:val (get instance :current-queue-size)
:val (get instance ::pxb/current-queue-size)
:labels labels)
(mtx/run! metrics
:id :rpc-climit-concurrency
:val (get instance :current-concurrency)
:labels labels)))
:val (get instance ::pxb/current-concurrency)
:labels labels))
on-run (fn [instance task]
(let [elapsed (- (inst-ms (dt/now))
@ -87,11 +87,11 @@
:labels labels)
(mtx/run! metrics
:id :rpc-climit-queue-size
:val (get instance :current-queue-size)
:val (get instance ::pxb/current-queue-size)
:labels labels)
(mtx/run! metrics
:id :rpc-climit-concurrency
:val (get instance :current-concurrency)
:val (get instance ::pxb/current-concurrency)
:labels labels)))
options {:executor executor

View file

@ -332,7 +332,7 @@
::limits limits}))))
(defn- refresh-config
[{:keys [state path executor scheduler] :as params}]
[{:keys [state path executor scheduled-executor] :as params}]
(letfn [(update-config [{:keys [::updated-at] :as state}]
(let [updated-at' (fs/last-modified-time path)]
(merge state
@ -347,7 +347,7 @@
state)))))
(schedule-next [state]
(px/schedule! scheduler
(px/schedule! scheduled-executor
(inst-ms (::refresh state))
(partial refresh-config params))
state)]
@ -371,7 +371,7 @@
(and (fs/exists? path) (fs/regular-file? path) path)))
(defmethod ig/pre-init-spec :app.rpc/rlimit [_]
(s/keys :req-un [::wrk/executor ::wrk/scheduler]))
(s/keys :req-un [::wrk/executor ::wrk/scheduled-executor]))
(defmethod ig/init-key ::rpc/rlimit
[_ {:keys [executor] :as params}]

View file

@ -20,6 +20,7 @@
[app.util.objects-map :as omap]
[app.util.pointer-map :as pmap]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.pprint :refer [pprint]]
[cuerdas.core :as str]))
@ -37,6 +38,16 @@
(task-fn params)
(println (format "no task '%s' found" name))))))
(defn schedule-task!
([system name]
(schedule-task! system name {}))
([system name props]
(let [pool (:app.db/pool system)]
(wrk/submit!
::wrk/conn pool
::wrk/task name
::wrk/props props))))
(defn send-test-email!
[system destination]
(us/verify!

View file

@ -12,6 +12,7 @@
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.storage :as-alias sto]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.core.async :as a]
@ -23,43 +24,43 @@
(declare remove-temp-file)
(defonce queue (a/chan 128))
(s/def ::min-age ::dt/duration)
(defmethod ig/pre-init-spec ::cleaner [_]
(s/keys :req-un [::min-age ::wrk/scheduler ::wrk/executor]))
(s/keys :req [::sto/min-age ::wrk/scheduled-executor]))
(defmethod ig/prep-key ::cleaner
[_ cfg]
(merge {:min-age (dt/duration {:minutes 30})}
(merge {::sto/min-age (dt/duration "30m")}
(d/without-nils cfg)))
(defmethod ig/init-key ::cleaner
[_ {:keys [scheduler executor min-age] :as cfg}]
(l/info :hint "starting tempfile cleaner service")
(let [cch (a/chan)]
(a/go-loop []
(let [[path port] (a/alts! [queue cch])]
(when (not= port cch)
[_ {:keys [::sto/min-age ::wrk/scheduled-executor] :as cfg}]
(px/thread
{:name "penpot/storage-tmp-cleaner"}
(try
(l/info :hint "started tmp file cleaner")
(loop []
(when-let [path (a/<!! queue)]
(l/trace :hint "schedule tempfile deletion" :path path
:expires-at (dt/plus (dt/now) min-age))
(px/schedule! scheduler
(px/schedule! scheduled-executor
(inst-ms min-age)
(partial remove-temp-file executor path))
(recur))))
cch))
(partial remove-temp-file path))
(recur)))
(catch InterruptedException _
(l/debug :hint "interrupted"))
(finally
(l/info :hint "terminated tmp file cleaner")))))
(defmethod ig/halt-key! ::cleaner
[_ close-ch]
(l/info :hint "stopping tempfile cleaner service")
(some-> close-ch a/close!))
[_ thread]
(px/interrupt! thread))
(defn- remove-temp-file
"Permanently delete tempfile"
[executor path]
(px/with-dispatch executor
[path]
(l/trace :hint "permanently delete tempfile" :path path)
(when (fs/exists? path)
(fs/delete path))))
(fs/delete path)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; API

View file

@ -1,31 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.util.closeable
"A closeable abstraction. A drop in replacement for
clojure builtin `with-open` syntax abstraction."
(:refer-clojure :exclude [with-open]))
(defprotocol ICloseable
(-close [_] "Close the resource."))
(defmacro with-open
[bindings & body]
{:pre [(vector? bindings)
(even? (count bindings))
(pos? (count bindings))]}
(reduce (fn [acc bindings]
`(let ~(vec bindings)
(try
~acc
(finally
(-close ~(first bindings))))))
`(do ~@body)
(reverse (partition 2 bindings))))
(extend-protocol ICloseable
java.lang.AutoCloseable
(-close [this] (.close this)))

File diff suppressed because it is too large Load diff

View file

@ -68,7 +68,7 @@
:thumbnail-uri "test"
:path (-> "backend_tests/test_files/template.penpot" io/resource fs/path)}]
system (-> (merge main/system-config main/worker-config)
(assoc-in [:app.redis/redis :uri] (:redis-uri config))
(assoc-in [:app.redis/redis :app.redis/uri] (:redis-uri config))
(assoc-in [:app.db/pool :uri] (:database-uri config))
(assoc-in [:app.db/pool :username] (:database-username config))
(assoc-in [:app.db/pool :password] (:database-password config))
@ -324,7 +324,7 @@
(run-task! name {}))
([name params]
(let [tasks (:app.worker/registry *system*)]
(let [task-fn (get tasks name)]
(let [task-fn (get tasks (d/name name))]
(task-fn params)))))
;; --- UTILS

View file

@ -20,12 +20,10 @@
(t/deftest test-base-report-data-structure
(with-mocks [mock {:target 'app.tasks.telemetry/send!
:return nil}]
(let [task-fn (-> th/*system* :app.worker/registry :telemetry)
prof (th/create-profile* 1 {:is-active true
(let [prof (th/create-profile* 1 {:is-active true
:props {:newsletter-news true}})]
;; run the task
(task-fn {:send? true :enabled? true})
(th/run-task! :telemetry {:send? true :enabled? true})
(t/is (:called? @mock))
(let [[_ data] (-> @mock :call-args)]

View file

@ -23,7 +23,7 @@
com.cognitect/transit-cljs {:mvn/version "0.8.280"}
java-http-clj/java-http-clj {:mvn/version "0.4.3"}
funcool/promesa {:mvn/version "9.0.507"}
funcool/promesa {:mvn/version "9.2.542"}
funcool/cuerdas {:mvn/version "2022.06.16-403"}
lambdaisland/uri {:mvn/version "1.13.95"

View file

@ -5,7 +5,8 @@
;; Copyright (c) KALEIDOS INC
(ns app.common.data
"Data manipulation and query helper functions."
"A collection if helpers for working with data structures and other
data resources."
(:refer-clojure :exclude [read-string hash-map merge name update-vals
parse-double group-by iteration concat mapcat])
#?(:cljs
@ -22,7 +23,9 @@
[linked.set :as lks])
#?(:clj
(:import linked.set.LinkedSet)))
(:import
linked.set.LinkedSet
java.lang.AutoCloseable)))
(def boolean-or-nil?
(some-fn nil? boolean?))
@ -697,3 +700,16 @@
(map (fn [key]
[key (delay (generator-fn key))]))
keys))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Util protocols
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defprotocol ICloseable
:extend-via-metadata true
(close! [_] "Close the resource."))
#?(:clj
(extend-protocol ICloseable
AutoCloseable
(close! [this] (.close this))))

View file

@ -7,7 +7,7 @@
#_:clj-kondo/ignore
(ns app.common.data.macros
"Data retrieval & manipulation specific macros."
(:refer-clojure :exclude [get-in select-keys str])
(:refer-clojure :exclude [get-in select-keys str with-open])
#?(:cljs (:require-macros [app.common.data.macros]))
(:require
#?(:clj [clojure.core :as c]
@ -94,5 +94,16 @@
[s & params]
`(str/ffmt ~s ~@params))
(defmacro with-open
[bindings & body]
{:pre [(vector? bindings)
(even? (count bindings))
(pos? (count bindings))]}
(reduce (fn [acc bindings]
`(let ~(vec bindings)
(try
~acc
(finally
(d/close! ~(first bindings))))))
`(do ~@body)
(reverse (partition 2 bindings))))

View file

@ -50,6 +50,10 @@
[& exprs]
`(try* (^:once fn* [] ~@exprs) identity))
(defmacro try!
[& exprs]
`(try* (^:once fn* [] ~@exprs) identity))
(defn with-always
"A helper that evaluates an exptession independently if the body
raises exception or not."