From 92643b29c146a50be657ef83ab0d1d010e69ab86 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Sun, 14 Jan 2024 20:45:11 +0100 Subject: [PATCH] :sparkles: Improve internal cache api --- backend/resources/log4j2-devenv.xml | 2 +- backend/src/app/main.clj | 3 +- backend/src/app/redis.clj | 7 +- backend/src/app/rpc/climit.clj | 115 +++++++++++++++------------- backend/src/app/util/cache.clj | 80 ++++++++++--------- 5 files changed, 112 insertions(+), 95 deletions(-) diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml index 7abb7a188..ca1ab6739 100644 --- a/backend/resources/log4j2-devenv.xml +++ b/backend/resources/log4j2-devenv.xml @@ -30,7 +30,7 @@ - + diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index c80210a06..c2f20015c 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -301,7 +301,8 @@ ::sto/storage (ig/ref ::sto/storage)} :app.rpc/climit - {::mtx/metrics (ig/ref ::mtx/metrics)} + {::mtx/metrics (ig/ref ::mtx/metrics) + ::wrk/executor (ig/ref ::wrk/executor)} :app.rpc/rlimit {::wrk/executor (ig/ref ::wrk/executor)} diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj index b730ab106..58023fe00 100644 --- a/backend/src/app/redis.clj +++ b/backend/src/app/redis.clj @@ -91,7 +91,7 @@ (s/def ::connect? ::us/boolean) (s/def ::io-threads ::us/integer) (s/def ::worker-threads ::us/integer) -(s/def ::cache some?) +(s/def ::cache cache/cache?) (s/def ::redis (s/keys :req [::resources @@ -168,7 +168,7 @@ (defn- shutdown-resources [{:keys [::resources ::cache ::timer]}] - (cache/invalidate-all! cache) + (cache/invalidate! cache) (when resources (.shutdown ^ClientResources resources)) @@ -211,7 +211,8 @@ (defn get-or-connect [{:keys [::cache] :as state} key options] (us/assert! ::redis state) - (let [connection (cache/get cache key (fn [_] (connect* state options)))] + (let [create (fn [_] (connect* state options)) + connection (cache/get cache key create)] (-> state (dissoc ::cache) (assoc ::connection connection)))) diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj index d6e4ccb51..71c64b596 100644 --- a/backend/src/app/rpc/climit.clj +++ b/backend/src/app/rpc/climit.clj @@ -36,24 +36,14 @@ (-> (str id) (subs 1))) -(defn- create-bulkhead-cache - [config] - (letfn [(load-fn [[id skey]] - (when-let [config (get config id)] - (l/trc :hint "insert into cache" :id (id->str id) :key skey) - (pbh/create :permits (or (:permits config) (:concurrency config)) - :queue (or (:queue config) (:queue-size config)) - :timeout (:timeout config) - :type :semaphore))) - - (on-remove [key _ cause] +(defn- create-cache + [{:keys [::wrk/executor]}] + (letfn [(on-remove [key _ cause] (let [[id skey] key] - (l/trc :hint "evict from cache" :id (id->str id) :key skey :reason (str cause))))] - - (cache/create :executor :same-thread + (l/dbg :hint "destroy limiter" :id (id->str id) :key skey :reason (str cause))))] + (cache/create :executor executor :on-remove on-remove - :keepalive "5m" - :load-fn load-fn))) + :keepalive "5m"))) (s/def ::config/permits ::us/integer) (s/def ::config/queue ::us/integer) @@ -70,7 +60,7 @@ (s/def ::path ::fs/path) (defmethod ig/pre-init-spec ::rpc/climit [_] - (s/keys :req [::mtx/metrics ::path])) + (s/keys :req [::mtx/metrics ::wrk/executor ::path])) (defmethod ig/init-key ::rpc/climit [_ {:keys [::path ::mtx/metrics] :as cfg}] @@ -78,7 +68,7 @@ (when-let [params (some->> path slurp edn/read-string)] (l/inf :hint "initializing concurrency limit" :config (str path)) (us/verify! ::config params) - {::cache (create-bulkhead-cache params) + {::cache (create-cache cfg) ::config params ::mtx/metrics metrics}))) @@ -89,13 +79,17 @@ (s/def ::rpc/climit (s/nilable ::instance)) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; PUBLIC API -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(defn- create-limiter + [config [id skey]] + (l/dbg :hint "create limiter" :id (id->str id) :key skey) + (pbh/create :permits (or (:permits config) (:concurrency config)) + :queue (or (:queue config) (:queue-size config)) + :timeout (:timeout config) + :type :semaphore)) -(defn invoke! - [cache metrics id key f] - (if-let [limiter (cache/get cache [id key])] +(defn- invoke! + [config cache metrics id key f] + (if-let [limiter (cache/get cache [id key] (partial create-limiter config))] (let [tpoint (dt/tpoint) labels (into-array String [(id->str id)]) wrapped (fn [] @@ -147,7 +141,7 @@ :queue (:queue stats) :max-permits (:max-permits stats) :max-queue (:max-queue stats)) - (pbh/invoke! limiter wrapped)) + (px/invoke! limiter wrapped)) (catch ExceptionInfo cause (let [{:keys [type code]} (ex-data cause)] (if (= :bulkhead-error type) @@ -160,9 +154,43 @@ (measure! (pbh/get-stats limiter))))) (do - (l/wrn :hint "unable to load limiter" :id (id->str id)) + (l/wrn :hint "no limiter found" :id (id->str id)) (f)))) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; MIDDLEWARE +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(def noop-fn (constantly nil)) + +(defn wrap + [{:keys [::rpc/climit ::mtx/metrics]} f {:keys [::id ::key-fn] :or {key-fn noop-fn} :as mdata}] + (if (and (some? climit) (some? id)) + (let [cache (::cache climit) + config (::config climit)] + (if-let [config (get config id)] + (do + (l/dbg :hint "instrumenting method" + :limit (id->str id) + :service-name (::sv/name mdata) + :timeout (:timeout config) + :permits (:permits config) + :queue (:queue config) + :keyed? (not= key-fn noop-fn)) + + (fn [cfg params] + (invoke! config cache metrics id (key-fn params) (partial f cfg params)))) + + (do + (l/wrn :hint "no config found for specified queue" :id (id->str id)) + f))) + + f)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; PUBLIC API +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + (defn configure [{:keys [::rpc/climit]} id] (us/assert! ::rpc/climit climit) @@ -171,37 +199,14 @@ (defn run! "Run a function in context of climit. Intended to be used in virtual threads." - ([{:keys [::id ::cache ::mtx/metrics]} f] - (if (and cache id) - (invoke! cache metrics id nil f) + ([{:keys [::id ::cache ::config ::mtx/metrics]} f] + (if-let [config (get config id)] + (invoke! config cache metrics id nil f) (f))) - ([{:keys [::id ::cache ::mtx/metrics]} f executor] + ([{:keys [::id ::cache ::config ::mtx/metrics]} f executor] (let [f #(p/await! (px/submit! executor f))] - (if (and cache id) - (invoke! cache metrics id nil f) + (if-let [config (get config id)] + (invoke! config cache metrics id nil f) (f))))) -(def noop-fn (constantly nil)) - -(defn wrap - [{:keys [::rpc/climit ::mtx/metrics]} f {:keys [::id ::key-fn] :or {key-fn noop-fn} :as mdata}] - (if (and (some? climit) (some? id)) - (if-let [config (get-in climit [::config id])] - (let [cache (::cache climit)] - (l/dbg :hint "instrumenting method" - :limit (id->str id) - :service-name (::sv/name mdata) - :timeout (:timeout config) - :permits (:permits config) - :queue (:queue config) - :keyed? (not= key-fn noop-fn)) - - (fn [cfg params] - (invoke! cache metrics id (key-fn params) (partial f cfg params)))) - - (do - (l/wrn :hint "no config found for specified queue" :id (id->str id)) - f)) - - f)) diff --git a/backend/src/app/util/cache.clj b/backend/src/app/util/cache.clj index c5aa733e6..65861e179 100644 --- a/backend/src/app/util/cache.clj +++ b/backend/src/app/util/cache.clj @@ -9,61 +9,71 @@ (:refer-clojure :exclude [get]) (:require [app.util.time :as dt] - [promesa.core :as p] [promesa.exec :as px]) (:import com.github.benmanes.caffeine.cache.AsyncCache - com.github.benmanes.caffeine.cache.AsyncLoadingCache - com.github.benmanes.caffeine.cache.CacheLoader + com.github.benmanes.caffeine.cache.Cache com.github.benmanes.caffeine.cache.Caffeine com.github.benmanes.caffeine.cache.RemovalListener + com.github.benmanes.caffeine.cache.stats.CacheStats java.time.Duration java.util.concurrent.Executor java.util.function.Function)) (set! *warn-on-reflection* true) -(defn create-listener +(defprotocol ICache + (get [_ k] [_ k load-fn] "get cache entry") + (invalidate! [_] [_ k] "invalidate cache")) + +(defprotocol ICacheStats + (stats [_] "get stats")) + +(defn- create-listener [f] (reify RemovalListener (onRemoval [_ key val cause] (when val (f key val cause))))) -(defn create-loader - [f] - (reify CacheLoader - (load [_ key] - (f key)))) +(defn- get-stats + [^Cache cache] + (let [^CacheStats stats (.stats cache)] + {:hit-rate (.hitRate stats) + :hit-count (.hitCount stats) + :req-count (.requestCount stats) + :miss-count (.missCount stats) + :miss-rate (.missRate stats)})) (defn create - [& {:keys [executor on-remove load-fn keepalive]}] - (as-> (Caffeine/newBuilder) builder - (if on-remove (.removalListener builder (create-listener on-remove)) builder) - (if executor (.executor builder ^Executor (px/resolve-executor executor)) builder) - (if keepalive (.expireAfterAccess builder ^Duration (dt/duration keepalive)) builder) - (if load-fn - (.buildAsync builder ^CacheLoader (create-loader load-fn)) - (.buildAsync builder)))) + [& {:keys [executor on-remove max-size keepalive]}] + (let [cache (as-> (Caffeine/newBuilder) builder + (if (fn? on-remove) (.removalListener builder (create-listener on-remove)) builder) + (if executor (.executor builder ^Executor (px/resolve-executor executor)) builder) + (if keepalive (.expireAfterAccess builder ^Duration (dt/duration keepalive)) builder) + (if (int? max-size) (.maximumSize builder (long max-size)) builder) + (.recordStats builder) + (.buildAsync builder)) + cache (.synchronous ^AsyncCache cache)] + (reify + ICache + (get [_ k] + (.getIfPresent ^Cache cache ^Object k)) + (get [_ k load-fn] + (.get ^Cache cache + ^Object k + ^Function (reify Function + (apply [_ k] + (load-fn k))))) + (invalidate! [_] + (.invalidateAll ^Cache cache)) + (invalidate! [_ k] + (.invalidateAll ^Cache cache ^Object k)) -(defn invalidate-all! - [^AsyncCache cache] - (.invalidateAll (.synchronous cache))) - -(defn get - ([cache key] - (assert (instance? AsyncLoadingCache cache) "should be AsyncLoadingCache instance") - (p/await! (.get ^AsyncLoadingCache cache ^Object key))) - ([cache key not-found-fn] - (assert (instance? AsyncCache cache) "should be AsyncCache instance") - (p/await! (.get ^AsyncCache cache - ^Object key - ^Function (reify - Function - (apply [_ key] - (not-found-fn key))))))) + ICacheStats + (stats [_] + (get-stats cache))))) (defn cache? [o] - (or (instance? AsyncCache o) - (instance? AsyncLoadingCache o))) + (satisfies? ICache o))