0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-11 23:31:21 -05:00

♻️ Normalize redis api and its usage in msgbus module

This commit is contained in:
Andrey Antukh 2022-11-22 13:04:33 +01:00
parent 427e43585c
commit 10bf6c5e56
3 changed files with 94 additions and 76 deletions

View file

@ -20,7 +20,8 @@
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.core :as p])) [promesa.core :as p]
[promesa.exec :as px]))
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
@ -52,8 +53,8 @@
(s/def ::rcv-ch ::aa/channel) (s/def ::rcv-ch ::aa/channel)
(s/def ::pub-ch ::aa/channel) (s/def ::pub-ch ::aa/channel)
(s/def ::state ::us/agent) (s/def ::state ::us/agent)
(s/def ::pconn ::redis/connection) (s/def ::pconn ::redis/connection-holder)
(s/def ::sconn ::redis/connection) (s/def ::sconn ::redis/connection-holder)
(s/def ::msgbus (s/def ::msgbus
(s/keys :req [::cmd-ch ::rcv-ch ::pub-ch ::state ::pconn ::sconn ::wrk/executor])) (s/keys :req [::cmd-ch ::rcv-ch ::pub-ch ::state ::pconn ::sconn ::wrk/executor]))
@ -205,9 +206,10 @@
(when-let [closed (a/<! (send-to-topic topic message))] (when-let [closed (a/<! (send-to-topic topic message))]
(send-via executor state unsubscribe-channels cfg closed nil)))) (send-via executor state unsubscribe-channels cfg closed nil))))
] ]
(px/thread
(a/go-loop [] {:name "penpot/msgbus-io-loop"}
(let [[val port] (a/alts! [pub-ch rcv-ch])] (loop []
(let [[val port] (a/alts!! [pub-ch rcv-ch])]
(cond (cond
(nil? val) (nil? val)
(do (do
@ -221,15 +223,16 @@
(= port rcv-ch) (= port rcv-ch)
(do (do
(a/<! (process-incoming val)) (a/<!! (process-incoming val))
(recur)) (recur))
(= port pub-ch) (= port pub-ch)
(let [result (a/<! (redis-pub cfg val))] (let [result (a/<!! (redis-pub cfg val))]
(when (ex/exception? result) (when (ex/exception? result)
(l/error :hint "unexpected error on publishing" :message val (l/error :hint "unexpected error on publishing"
:message val
:cause result)) :cause result))
(recur))))))) (recur))))))))
(defn- redis-pub (defn- redis-pub
"Publish a message to the redis server. Asynchronous operation, "Publish a message to the redis server. Asynchronous operation,

View file

@ -51,7 +51,7 @@
(s/def ::timer (s/def ::timer
#(instance? Timer %)) #(instance? Timer %))
(s/def ::connection (s/def ::default-connection
#(or (instance? StatefulRedisConnection %) #(or (instance? StatefulRedisConnection %)
(and (instance? IDeref %) (and (instance? IDeref %)
(instance? StatefulRedisConnection (deref %))))) (instance? StatefulRedisConnection (deref %)))))
@ -61,6 +61,13 @@
(and (instance? IDeref %) (and (instance? IDeref %)
(instance? StatefulRedisPubSubConnection (deref %))))) (instance? StatefulRedisPubSubConnection (deref %)))))
(s/def ::connection
(s/or :default ::default-connection
:pubsub ::pubsub-connection))
(s/def ::connection-holder
(s/keys :req [::connection]))
(s/def ::redis-uri (s/def ::redis-uri
#(instance? RedisURI %)) #(instance? RedisURI %))
@ -146,19 +153,20 @@
(.stop ^Timer timer))) (.stop ^Timer timer)))
(defn connect (defn connect
[{:keys [::resources ::redis-uri] :as cfg} [{:keys [::resources ::redis-uri] :as state}
& {:keys [timeout codec type] :or {codec default-codec type :default}}] & {:keys [timeout codec type]
:or {codec default-codec type :default}}]
(us/assert! ::resources resources) (us/assert! ::resources resources)
(let [client (RedisClient/create ^ClientResources resources ^RedisURI redis-uri) (let [client (RedisClient/create ^ClientResources resources ^RedisURI redis-uri)
timeout (or timeout (:timeout cfg)) timeout (or timeout (:timeout state))
conn (case type conn (case type
:default (.connect ^RedisClient client ^RedisCodec codec) :default (.connect ^RedisClient client ^RedisCodec codec)
:pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))] :pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))]
(.setTimeout ^StatefulConnection conn ^Duration timeout) (.setTimeout ^StatefulConnection conn ^Duration timeout)
(assoc state ::connection
(reify (reify
IDeref IDeref
(deref [_] conn) (deref [_] conn)
@ -166,7 +174,7 @@
AutoCloseable AutoCloseable
(close [_] (close [_]
(.close ^StatefulConnection conn) (.close ^StatefulConnection conn)
(.shutdown ^RedisClient client))))) (.shutdown ^RedisClient client))))))
(defn get-or-connect (defn get-or-connect
[{:keys [::cache] :as state} key options] [{:keys [::cache] :as state} key options]
@ -179,42 +187,47 @@
(get key))))) (get key)))))
(defn add-listener! (defn add-listener!
[conn listener] [{:keys [::connection] :as conn} listener]
(us/assert! ::pubsub-connection @conn) (us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(us/assert! ::pubsub-listener listener) (us/assert! ::pubsub-listener listener)
(.addListener ^StatefulRedisPubSubConnection @connection
(.addListener ^StatefulRedisPubSubConnection @conn
^RedisPubSubListener listener) ^RedisPubSubListener listener)
conn) conn)
(defn publish! (defn publish!
[conn topic message] [{:keys [::connection] :as conn} topic message]
(us/assert! ::us/string topic) (us/assert! ::us/string topic)
(us/assert! ::us/bytes message) (us/assert! ::us/bytes message)
(us/assert! ::connection @conn) (us/assert! ::connection-holder conn)
(us/assert! ::default-connection connection)
(let [pcomm (.async ^StatefulRedisConnection @conn)] (let [pcomm (.async ^StatefulRedisConnection @connection)]
(.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message))) (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message)))
(defn subscribe! (defn subscribe!
"Blocking operation, intended to be used on a worker/agent thread." "Blocking operation, intended to be used on a thread/agent thread."
[conn & topics] [{:keys [::connection] :as conn} & topics]
(us/assert! ::pubsub-connection @conn) (us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(let [topics (into-array String (map str topics)) (let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @conn)] cmd (.sync ^StatefulRedisPubSubConnection @connection)]
(.subscribe ^RedisPubSubCommands cmd topics))) (.subscribe ^RedisPubSubCommands cmd topics)))
(defn unsubscribe! (defn unsubscribe!
"Blocking operation, intended to be used on a worker/agent thread." "Blocking operation, intended to be used on a thread/agent thread."
[conn & topics] [{:keys [::connection] :as conn} & topics]
(us/assert! ::pubsub-connection @conn) (us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(let [topics (into-array String (map str topics)) (let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @conn)] cmd (.sync ^StatefulRedisPubSubConnection @connection)]
(.unsubscribe ^RedisPubSubCommands cmd topics))) (.unsubscribe ^RedisPubSubCommands cmd topics)))
(defn open? (defn open?
[conn] [{:keys [::connection] :as conn}]
(.isOpen ^StatefulConnection @conn)) (us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(.isOpen ^StatefulConnection @connection))
(defn pubsub-listener (defn pubsub-listener
[& {:keys [on-message on-subscribe on-unsubscribe]}] [& {:keys [on-message on-subscribe on-unsubscribe]}]
@ -244,8 +257,10 @@
(on-unsubscribe nil topic count))))) (on-unsubscribe nil topic count)))))
(defn close! (defn close!
[o] [{:keys [::connection] :as conn}]
(.close ^AutoCloseable o)) (us/assert! ::connection-holder conn)
(us/assert! ::connection connection)
(.close ^AutoCloseable connection))
(def ^:private scripts-cache (atom {})) (def ^:private scripts-cache (atom {}))
(def noop-fn (constantly nil)) (def noop-fn (constantly nil))
@ -262,12 +277,12 @@
::rscript/vals])) ::rscript/vals]))
(defn eval! (defn eval!
[{:keys [::mtx/metrics] :as state} script] [{:keys [::mtx/metrics ::connection] :as state} script]
(us/assert! ::rscript/script script)
(us/assert! ::redis state) (us/assert! ::redis state)
(us/assert! ::connection-holder state)
(us/assert! ::rscript/script script)
(let [rconn (-> state ::connection deref) (let [cmd (.async ^StatefulRedisConnection @connection)
cmd (.async ^StatefulRedisConnection rconn)
keys (into-array String (map str (::rscript/keys script))) keys (into-array String (map str (::rscript/keys script)))
vals (into-array String (map str (::rscript/vals script))) vals (into-array String (map str (::rscript/vals script)))
sname (::rscript/name script)] sname (::rscript/name script)]
@ -276,20 +291,20 @@
(if (instance? io.lettuce.core.RedisNoScriptException cause) (if (instance? io.lettuce.core.RedisNoScriptException cause)
(do (do
(l/error :hint "no script found" :name sname :cause cause) (l/error :hint "no script found" :name sname :cause cause)
(-> (load-script) (->> (load-script)
(p/then eval-script))) (p/mapcat eval-script)))
(if-let [on-error (::rscript/on-error script)] (if-let [on-error (::rscript/on-error script)]
(on-error cause) (on-error cause)
(p/rejected cause)))) (p/rejected cause))))
(eval-script [sha] (eval-script [sha]
(let [tpoint (dt/tpoint)] (let [tpoint (dt/tpoint)]
(-> (.evalsha ^RedisScriptingAsyncCommands cmd (->> (.evalsha ^RedisScriptingAsyncCommands cmd
^String sha ^String sha
^ScriptOutputType ScriptOutputType/MULTI ^ScriptOutputType ScriptOutputType/MULTI
^"[Ljava.lang.String;" keys ^"[Ljava.lang.String;" keys
^"[Ljava.lang.String;" vals) ^"[Ljava.lang.String;" vals)
(p/then (fn [result] (p/map (fn [result]
(let [elapsed (tpoint)] (let [elapsed (tpoint)]
(mtx/run! metrics {:id :redis-eval-timing (mtx/run! metrics {:id :redis-eval-timing
:labels [(name sname)] :labels [(name sname)]
@ -300,20 +315,20 @@
:params (str/join "," (::rscript/vals script)) :params (str/join "," (::rscript/vals script))
:elapsed (dt/format-duration elapsed)) :elapsed (dt/format-duration elapsed))
result))) result)))
(p/catch on-error)))) (p/error on-error))))
(read-script [] (read-script []
(-> script ::rscript/path io/resource slurp)) (-> script ::rscript/path io/resource slurp))
(load-script [] (load-script []
(l/trace :hint "load script" :name sname) (l/trace :hint "load script" :name sname)
(-> (.scriptLoad ^RedisScriptingAsyncCommands cmd (->> (.scriptLoad ^RedisScriptingAsyncCommands cmd
^String (read-script)) ^String (read-script))
(p/then (fn [sha] (p/map (fn [sha]
(swap! scripts-cache assoc sname sha) (swap! scripts-cache assoc sname sha)
sha))))] sha))))]
(if-let [sha (get @scripts-cache sname)] (if-let [sha (get @scripts-cache sname)]
(eval-script sha) (eval-script sha)
(-> (load-script) (->> (load-script)
(p/then eval-script)))))) (p/mapcat eval-script))))))

View file

@ -23,7 +23,7 @@
com.cognitect/transit-cljs {:mvn/version "0.8.280"} com.cognitect/transit-cljs {:mvn/version "0.8.280"}
java-http-clj/java-http-clj {:mvn/version "0.4.3"} java-http-clj/java-http-clj {:mvn/version "0.4.3"}
funcool/promesa {:mvn/version "9.0.507"} funcool/promesa {:mvn/version "9.1.539"}
funcool/cuerdas {:mvn/version "2022.06.16-403"} funcool/cuerdas {:mvn/version "2022.06.16-403"}
lambdaisland/uri {:mvn/version "1.13.95" lambdaisland/uri {:mvn/version "1.13.95"