diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml
index 7abb7a188..ca1ab6739 100644
--- a/backend/resources/log4j2-devenv.xml
+++ b/backend/resources/log4j2-devenv.xml
@@ -30,7 +30,7 @@
diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj
index c80210a06..c2f20015c 100644
--- a/backend/src/app/main.clj
+++ b/backend/src/app/main.clj
@@ -301,7 +301,8 @@
::sto/storage (ig/ref ::sto/storage)}
- {::mtx/metrics (ig/ref ::mtx/metrics)}
+ {::mtx/metrics (ig/ref ::mtx/metrics)
+ ::wrk/executor (ig/ref ::wrk/executor)}
{::wrk/executor (ig/ref ::wrk/executor)}
diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj
index b730ab106..58023fe00 100644
--- a/backend/src/app/redis.clj
+++ b/backend/src/app/redis.clj
@@ -91,7 +91,7 @@
(s/def ::connect? ::us/boolean)
(s/def ::io-threads ::us/integer)
(s/def ::worker-threads ::us/integer)
-(s/def ::cache some?)
+(s/def ::cache cache/cache?)
(s/def ::redis
(s/keys :req [::resources
@@ -168,7 +168,7 @@
(defn- shutdown-resources
[{:keys [::resources ::cache ::timer]}]
- (cache/invalidate-all! cache)
+ (cache/invalidate! cache)
(when resources
(.shutdown ^ClientResources resources))
@@ -211,7 +211,8 @@
(defn get-or-connect
[{:keys [::cache] :as state} key options]
(us/assert! ::redis state)
- (let [connection (cache/get cache key (fn [_] (connect* state options)))]
+ (let [create (fn [_] (connect* state options))
+ connection (cache/get cache key create)]
(-> state
(dissoc ::cache)
(assoc ::connection connection))))
diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj
index d6e4ccb51..71c64b596 100644
--- a/backend/src/app/rpc/climit.clj
+++ b/backend/src/app/rpc/climit.clj
@@ -36,24 +36,14 @@
(-> (str id)
(subs 1)))
-(defn- create-bulkhead-cache
- [config]
- (letfn [(load-fn [[id skey]]
- (when-let [config (get config id)]
- (l/trc :hint "insert into cache" :id (id->str id) :key skey)
- (pbh/create :permits (or (:permits config) (:concurrency config))
- :queue (or (:queue config) (:queue-size config))
- :timeout (:timeout config)
- :type :semaphore)))
- (on-remove [key _ cause]
+(defn- create-cache
+ [{:keys [::wrk/executor]}]
+ (letfn [(on-remove [key _ cause]
(let [[id skey] key]
- (l/trc :hint "evict from cache" :id (id->str id) :key skey :reason (str cause))))]
- (cache/create :executor :same-thread
+ (l/dbg :hint "destroy limiter" :id (id->str id) :key skey :reason (str cause))))]
+ (cache/create :executor executor
:on-remove on-remove
- :keepalive "5m"
- :load-fn load-fn)))
+ :keepalive "5m")))
(s/def ::config/permits ::us/integer)
(s/def ::config/queue ::us/integer)
@@ -70,7 +60,7 @@
(s/def ::path ::fs/path)
(defmethod ig/pre-init-spec ::rpc/climit [_]
- (s/keys :req [::mtx/metrics ::path]))
+ (s/keys :req [::mtx/metrics ::wrk/executor ::path]))
(defmethod ig/init-key ::rpc/climit
[_ {:keys [::path ::mtx/metrics] :as cfg}]
@@ -78,7 +68,7 @@
(when-let [params (some->> path slurp edn/read-string)]
(l/inf :hint "initializing concurrency limit" :config (str path))
(us/verify! ::config params)
- {::cache (create-bulkhead-cache params)
+ {::cache (create-cache cfg)
::config params
::mtx/metrics metrics})))
@@ -89,13 +79,17 @@
(s/def ::rpc/climit
(s/nilable ::instance))
+(defn- create-limiter
+ [config [id skey]]
+ (l/dbg :hint "create limiter" :id (id->str id) :key skey)
+ (pbh/create :permits (or (:permits config) (:concurrency config))
+ :queue (or (:queue config) (:queue-size config))
+ :timeout (:timeout config)
+ :type :semaphore))
-(defn invoke!
- [cache metrics id key f]
- (if-let [limiter (cache/get cache [id key])]
+(defn- invoke!
+ [config cache metrics id key f]
+ (if-let [limiter (cache/get cache [id key] (partial create-limiter config))]
(let [tpoint (dt/tpoint)
labels (into-array String [(id->str id)])
wrapped (fn []
@@ -147,7 +141,7 @@
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats))
- (pbh/invoke! limiter wrapped))
+ (px/invoke! limiter wrapped))
(catch ExceptionInfo cause
(let [{:keys [type code]} (ex-data cause)]
(if (= :bulkhead-error type)
@@ -160,9 +154,43 @@
(measure! (pbh/get-stats limiter)))))
- (l/wrn :hint "unable to load limiter" :id (id->str id))
+ (l/wrn :hint "no limiter found" :id (id->str id))
+(def noop-fn (constantly nil))
+(defn wrap
+ [{:keys [::rpc/climit ::mtx/metrics]} f {:keys [::id ::key-fn] :or {key-fn noop-fn} :as mdata}]
+ (if (and (some? climit) (some? id))
+ (let [cache (::cache climit)
+ config (::config climit)]
+ (if-let [config (get config id)]
+ (do
+ (l/dbg :hint "instrumenting method"
+ :limit (id->str id)
+ :service-name (::sv/name mdata)
+ :timeout (:timeout config)
+ :permits (:permits config)
+ :queue (:queue config)
+ :keyed? (not= key-fn noop-fn))
+ (fn [cfg params]
+ (invoke! config cache metrics id (key-fn params) (partial f cfg params))))
+ (do
+ (l/wrn :hint "no config found for specified queue" :id (id->str id))
+ f)))
+ f))
(defn configure
[{:keys [::rpc/climit]} id]
(us/assert! ::rpc/climit climit)
@@ -171,37 +199,14 @@
(defn run!
"Run a function in context of climit.
Intended to be used in virtual threads."
- ([{:keys [::id ::cache ::mtx/metrics]} f]
- (if (and cache id)
- (invoke! cache metrics id nil f)
+ ([{:keys [::id ::cache ::config ::mtx/metrics]} f]
+ (if-let [config (get config id)]
+ (invoke! config cache metrics id nil f)
- ([{:keys [::id ::cache ::mtx/metrics]} f executor]
+ ([{:keys [::id ::cache ::config ::mtx/metrics]} f executor]
(let [f #(p/await! (px/submit! executor f))]
- (if (and cache id)
- (invoke! cache metrics id nil f)
+ (if-let [config (get config id)]
+ (invoke! config cache metrics id nil f)
-(def noop-fn (constantly nil))
-(defn wrap
- [{:keys [::rpc/climit ::mtx/metrics]} f {:keys [::id ::key-fn] :or {key-fn noop-fn} :as mdata}]
- (if (and (some? climit) (some? id))
- (if-let [config (get-in climit [::config id])]
- (let [cache (::cache climit)]
- (l/dbg :hint "instrumenting method"
- :limit (id->str id)
- :service-name (::sv/name mdata)
- :timeout (:timeout config)
- :permits (:permits config)
- :queue (:queue config)
- :keyed? (not= key-fn noop-fn))
- (fn [cfg params]
- (invoke! cache metrics id (key-fn params) (partial f cfg params))))
- (do
- (l/wrn :hint "no config found for specified queue" :id (id->str id))
- f))
- f))
diff --git a/backend/src/app/util/cache.clj b/backend/src/app/util/cache.clj
index c5aa733e6..65861e179 100644
--- a/backend/src/app/util/cache.clj
+++ b/backend/src/app/util/cache.clj
@@ -9,61 +9,71 @@
(:refer-clojure :exclude [get])
[app.util.time :as dt]
- [promesa.core :as p]
[promesa.exec :as px])
- com.github.benmanes.caffeine.cache.AsyncLoadingCache
- com.github.benmanes.caffeine.cache.CacheLoader
+ com.github.benmanes.caffeine.cache.Cache
+ com.github.benmanes.caffeine.cache.stats.CacheStats
(set! *warn-on-reflection* true)
-(defn create-listener
+(defprotocol ICache
+ (get [_ k] [_ k load-fn] "get cache entry")
+ (invalidate! [_] [_ k] "invalidate cache"))
+(defprotocol ICacheStats
+ (stats [_] "get stats"))
+(defn- create-listener
(reify RemovalListener
(onRemoval [_ key val cause]
(when val
(f key val cause)))))
-(defn create-loader
- [f]
- (reify CacheLoader
- (load [_ key]
- (f key))))
+(defn- get-stats
+ [^Cache cache]
+ (let [^CacheStats stats (.stats cache)]
+ {:hit-rate (.hitRate stats)
+ :hit-count (.hitCount stats)
+ :req-count (.requestCount stats)
+ :miss-count (.missCount stats)
+ :miss-rate (.missRate stats)}))
(defn create
- [& {:keys [executor on-remove load-fn keepalive]}]
- (as-> (Caffeine/newBuilder) builder
- (if on-remove (.removalListener builder (create-listener on-remove)) builder)
- (if executor (.executor builder ^Executor (px/resolve-executor executor)) builder)
- (if keepalive (.expireAfterAccess builder ^Duration (dt/duration keepalive)) builder)
- (if load-fn
- (.buildAsync builder ^CacheLoader (create-loader load-fn))
- (.buildAsync builder))))
+ [& {:keys [executor on-remove max-size keepalive]}]
+ (let [cache (as-> (Caffeine/newBuilder) builder
+ (if (fn? on-remove) (.removalListener builder (create-listener on-remove)) builder)
+ (if executor (.executor builder ^Executor (px/resolve-executor executor)) builder)
+ (if keepalive (.expireAfterAccess builder ^Duration (dt/duration keepalive)) builder)
+ (if (int? max-size) (.maximumSize builder (long max-size)) builder)
+ (.recordStats builder)
+ (.buildAsync builder))
+ cache (.synchronous ^AsyncCache cache)]
+ (reify
+ ICache
+ (get [_ k]
+ (.getIfPresent ^Cache cache ^Object k))
+ (get [_ k load-fn]
+ (.get ^Cache cache
+ ^Object k
+ ^Function (reify Function
+ (apply [_ k]
+ (load-fn k)))))
+ (invalidate! [_]
+ (.invalidateAll ^Cache cache))
+ (invalidate! [_ k]
+ (.invalidateAll ^Cache cache ^Object k))
-(defn invalidate-all!
- [^AsyncCache cache]
- (.invalidateAll (.synchronous cache)))
-(defn get
- ([cache key]
- (assert (instance? AsyncLoadingCache cache) "should be AsyncLoadingCache instance")
- (p/await! (.get ^AsyncLoadingCache cache ^Object key)))
- ([cache key not-found-fn]
- (assert (instance? AsyncCache cache) "should be AsyncCache instance")
- (p/await! (.get ^AsyncCache cache
- ^Object key
- ^Function (reify
- Function
- (apply [_ key]
- (not-found-fn key)))))))
+ ICacheStats
+ (stats [_]
+ (get-stats cache)))))
(defn cache?
- (or (instance? AsyncCache o)
- (instance? AsyncLoadingCache o)))
+ (satisfies? ICache o))