0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-18 10:41:29 -05:00

Improve internal cache api

This commit is contained in:
Andrey Antukh 2024-01-14 20:45:11 +01:00
parent db5c16fb1d
commit 92643b29c1
5 changed files with 112 additions and 95 deletions

View file

@ -30,7 +30,7 @@
<Logger name="app.util.websocket" level="info" />
<Logger name="app.redis" level="info" />
<Logger name="app.rpc.rlimit" level="info" />
<Logger name="app.rpc.climit" level="info" />
<Logger name="app.rpc.climit" level="debug" />
<Logger name="app.common.files.migrations" level="info" />
<Logger name="app.loggers" level="debug" additivity="false">

View file

@ -301,7 +301,8 @@
::sto/storage (ig/ref ::sto/storage)}
:app.rpc/climit
{::mtx/metrics (ig/ref ::mtx/metrics)}
{::mtx/metrics (ig/ref ::mtx/metrics)
::wrk/executor (ig/ref ::wrk/executor)}
:app.rpc/rlimit
{::wrk/executor (ig/ref ::wrk/executor)}

View file

@ -91,7 +91,7 @@
(s/def ::connect? ::us/boolean)
(s/def ::io-threads ::us/integer)
(s/def ::worker-threads ::us/integer)
(s/def ::cache some?)
(s/def ::cache cache/cache?)
(s/def ::redis
(s/keys :req [::resources
@ -168,7 +168,7 @@
(defn- shutdown-resources
[{:keys [::resources ::cache ::timer]}]
(cache/invalidate-all! cache)
(cache/invalidate! cache)
(when resources
(.shutdown ^ClientResources resources))
@ -211,7 +211,8 @@
(defn get-or-connect
[{:keys [::cache] :as state} key options]
(us/assert! ::redis state)
(let [connection (cache/get cache key (fn [_] (connect* state options)))]
(let [create (fn [_] (connect* state options))
connection (cache/get cache key create)]
(-> state
(dissoc ::cache)
(assoc ::connection connection))))

View file

@ -36,24 +36,14 @@
(-> (str id)
(subs 1)))
(defn- create-bulkhead-cache
[config]
(letfn [(load-fn [[id skey]]
(when-let [config (get config id)]
(l/trc :hint "insert into cache" :id (id->str id) :key skey)
(pbh/create :permits (or (:permits config) (:concurrency config))
:queue (or (:queue config) (:queue-size config))
:timeout (:timeout config)
:type :semaphore)))
(on-remove [key _ cause]
(defn- create-cache
[{:keys [::wrk/executor]}]
(letfn [(on-remove [key _ cause]
(let [[id skey] key]
(l/trc :hint "evict from cache" :id (id->str id) :key skey :reason (str cause))))]
(cache/create :executor :same-thread
(l/dbg :hint "destroy limiter" :id (id->str id) :key skey :reason (str cause))))]
(cache/create :executor executor
:on-remove on-remove
:keepalive "5m"
:load-fn load-fn)))
:keepalive "5m")))
(s/def ::config/permits ::us/integer)
(s/def ::config/queue ::us/integer)
@ -70,7 +60,7 @@
(s/def ::path ::fs/path)
(defmethod ig/pre-init-spec ::rpc/climit [_]
(s/keys :req [::mtx/metrics ::path]))
(s/keys :req [::mtx/metrics ::wrk/executor ::path]))
(defmethod ig/init-key ::rpc/climit
[_ {:keys [::path ::mtx/metrics] :as cfg}]
@ -78,7 +68,7 @@
(when-let [params (some->> path slurp edn/read-string)]
(l/inf :hint "initializing concurrency limit" :config (str path))
(us/verify! ::config params)
{::cache (create-bulkhead-cache params)
{::cache (create-cache cfg)
::config params
::mtx/metrics metrics})))
@ -89,13 +79,17 @@
(s/def ::rpc/climit
(s/nilable ::instance))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- create-limiter
[config [id skey]]
(l/dbg :hint "create limiter" :id (id->str id) :key skey)
(pbh/create :permits (or (:permits config) (:concurrency config))
:queue (or (:queue config) (:queue-size config))
:timeout (:timeout config)
:type :semaphore))
(defn invoke!
[cache metrics id key f]
(if-let [limiter (cache/get cache [id key])]
(defn- invoke!
[config cache metrics id key f]
(if-let [limiter (cache/get cache [id key] (partial create-limiter config))]
(let [tpoint (dt/tpoint)
labels (into-array String [(id->str id)])
wrapped (fn []
@ -147,7 +141,7 @@
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats))
(pbh/invoke! limiter wrapped))
(px/invoke! limiter wrapped))
(catch ExceptionInfo cause
(let [{:keys [type code]} (ex-data cause)]
(if (= :bulkhead-error type)
@ -160,9 +154,43 @@
(measure! (pbh/get-stats limiter)))))
(do
(l/wrn :hint "unable to load limiter" :id (id->str id))
(l/wrn :hint "no limiter found" :id (id->str id))
(f))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; MIDDLEWARE
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def noop-fn (constantly nil))
(defn wrap
[{:keys [::rpc/climit ::mtx/metrics]} f {:keys [::id ::key-fn] :or {key-fn noop-fn} :as mdata}]
(if (and (some? climit) (some? id))
(let [cache (::cache climit)
config (::config climit)]
(if-let [config (get config id)]
(do
(l/dbg :hint "instrumenting method"
:limit (id->str id)
:service-name (::sv/name mdata)
:timeout (:timeout config)
:permits (:permits config)
:queue (:queue config)
:keyed? (not= key-fn noop-fn))
(fn [cfg params]
(invoke! config cache metrics id (key-fn params) (partial f cfg params))))
(do
(l/wrn :hint "no config found for specified queue" :id (id->str id))
f)))
f))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn configure
[{:keys [::rpc/climit]} id]
(us/assert! ::rpc/climit climit)
@ -171,37 +199,14 @@
(defn run!
"Run a function in context of climit.
Intended to be used in virtual threads."
([{:keys [::id ::cache ::mtx/metrics]} f]
(if (and cache id)
(invoke! cache metrics id nil f)
([{:keys [::id ::cache ::config ::mtx/metrics]} f]
(if-let [config (get config id)]
(invoke! config cache metrics id nil f)
(f)))
([{:keys [::id ::cache ::mtx/metrics]} f executor]
([{:keys [::id ::cache ::config ::mtx/metrics]} f executor]
(let [f #(p/await! (px/submit! executor f))]
(if (and cache id)
(invoke! cache metrics id nil f)
(if-let [config (get config id)]
(invoke! config cache metrics id nil f)
(f)))))
(def noop-fn (constantly nil))
(defn wrap
[{:keys [::rpc/climit ::mtx/metrics]} f {:keys [::id ::key-fn] :or {key-fn noop-fn} :as mdata}]
(if (and (some? climit) (some? id))
(if-let [config (get-in climit [::config id])]
(let [cache (::cache climit)]
(l/dbg :hint "instrumenting method"
:limit (id->str id)
:service-name (::sv/name mdata)
:timeout (:timeout config)
:permits (:permits config)
:queue (:queue config)
:keyed? (not= key-fn noop-fn))
(fn [cfg params]
(invoke! cache metrics id (key-fn params) (partial f cfg params))))
(do
(l/wrn :hint "no config found for specified queue" :id (id->str id))
f))
f))

View file

@ -9,61 +9,71 @@
(:refer-clojure :exclude [get])
(:require
[app.util.time :as dt]
[promesa.core :as p]
[promesa.exec :as px])
(:import
com.github.benmanes.caffeine.cache.AsyncCache
com.github.benmanes.caffeine.cache.AsyncLoadingCache
com.github.benmanes.caffeine.cache.CacheLoader
com.github.benmanes.caffeine.cache.Cache
com.github.benmanes.caffeine.cache.Caffeine
com.github.benmanes.caffeine.cache.RemovalListener
com.github.benmanes.caffeine.cache.stats.CacheStats
java.time.Duration
java.util.concurrent.Executor
java.util.function.Function))
(set! *warn-on-reflection* true)
(defn create-listener
(defprotocol ICache
(get [_ k] [_ k load-fn] "get cache entry")
(invalidate! [_] [_ k] "invalidate cache"))
(defprotocol ICacheStats
(stats [_] "get stats"))
(defn- create-listener
[f]
(reify RemovalListener
(onRemoval [_ key val cause]
(when val
(f key val cause)))))
(defn create-loader
[f]
(reify CacheLoader
(load [_ key]
(f key))))
(defn- get-stats
[^Cache cache]
(let [^CacheStats stats (.stats cache)]
{:hit-rate (.hitRate stats)
:hit-count (.hitCount stats)
:req-count (.requestCount stats)
:miss-count (.missCount stats)
:miss-rate (.missRate stats)}))
(defn create
[& {:keys [executor on-remove load-fn keepalive]}]
(as-> (Caffeine/newBuilder) builder
(if on-remove (.removalListener builder (create-listener on-remove)) builder)
(if executor (.executor builder ^Executor (px/resolve-executor executor)) builder)
(if keepalive (.expireAfterAccess builder ^Duration (dt/duration keepalive)) builder)
(if load-fn
(.buildAsync builder ^CacheLoader (create-loader load-fn))
(.buildAsync builder))))
[& {:keys [executor on-remove max-size keepalive]}]
(let [cache (as-> (Caffeine/newBuilder) builder
(if (fn? on-remove) (.removalListener builder (create-listener on-remove)) builder)
(if executor (.executor builder ^Executor (px/resolve-executor executor)) builder)
(if keepalive (.expireAfterAccess builder ^Duration (dt/duration keepalive)) builder)
(if (int? max-size) (.maximumSize builder (long max-size)) builder)
(.recordStats builder)
(.buildAsync builder))
cache (.synchronous ^AsyncCache cache)]
(reify
ICache
(get [_ k]
(.getIfPresent ^Cache cache ^Object k))
(get [_ k load-fn]
(.get ^Cache cache
^Object k
^Function (reify Function
(apply [_ k]
(load-fn k)))))
(invalidate! [_]
(.invalidateAll ^Cache cache))
(invalidate! [_ k]
(.invalidateAll ^Cache cache ^Object k))
(defn invalidate-all!
[^AsyncCache cache]
(.invalidateAll (.synchronous cache)))
(defn get
([cache key]
(assert (instance? AsyncLoadingCache cache) "should be AsyncLoadingCache instance")
(p/await! (.get ^AsyncLoadingCache cache ^Object key)))
([cache key not-found-fn]
(assert (instance? AsyncCache cache) "should be AsyncCache instance")
(p/await! (.get ^AsyncCache cache
^Object key
^Function (reify
Function
(apply [_ key]
(not-found-fn key)))))))
ICacheStats
(stats [_]
(get-stats cache)))))
(defn cache?
[o]
(or (instance? AsyncCache o)
(instance? AsyncLoadingCache o)))
(satisfies? ICache o))