diff --git a/backend/deps.edn b/backend/deps.edn index 06b4689b8..c93092fd4 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -39,7 +39,7 @@ buddy/buddy-hashers {:mvn/version "1.8.158"} buddy/buddy-sign {:mvn/version "3.4.333"} - com.github.ben-manes.caffeine/caffeine {:mvn/version "3.1.2"} + com.github.ben-manes.caffeine/caffeine {:mvn/version "3.1.5"} org.jsoup/jsoup {:mvn/version "1.15.3"} org.im4java/im4java diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj index c0436fb1d..b730ab106 100644 --- a/backend/src/app/redis.clj +++ b/backend/src/app/redis.clj @@ -13,6 +13,7 @@ [app.common.spec :as us] [app.metrics :as mtx] [app.redis.script :as-alias rscript] + [app.util.cache :as cache] [app.util.time :as dt] [app.worker :as-alias wrk] [clojure.core :as c] @@ -47,10 +48,6 @@ io.lettuce.core.resource.DefaultClientResources io.netty.util.HashedWheelTimer io.netty.util.Timer - java.util.function.Function - com.github.benmanes.caffeine.cache.Cache - com.github.benmanes.caffeine.cache.Caffeine - com.github.benmanes.caffeine.cache.RemovalListener java.lang.AutoCloseable java.time.Duration)) @@ -138,17 +135,12 @@ (defn- create-cache [{:keys [::wrk/executor] :as cfg}] - (let [listener (reify RemovalListener - (onRemoval [_ key cache cause] - (l/trace :hint "cache: remove" :key key :reason (str cause) :repr (pr-str cache)) - (some-> cache d/close!))) - ] - - (.. (Caffeine/newBuilder) - (weakValues) - (executor executor) - (removalListener listener) - (build)))) + (letfn [(on-remove [key val cause] + (l/trace :hint "evict connection (cache)" :key key :reason cause) + (some-> val d/close!))] + (cache/create :executor executor + :on-remove on-remove + :keepalive "5m"))) (defn- initialize-resources "Initialize redis connection resources" @@ -176,10 +168,11 @@ (defn- shutdown-resources [{:keys [::resources ::cache ::timer]}] - (.invalidateAll ^Cache cache) + (cache/invalidate-all! cache) (when resources (.shutdown ^ClientResources resources)) + (when timer (.stop ^Timer timer))) @@ -218,13 +211,7 @@ (defn get-or-connect [{:keys [::cache] :as state} key options] (us/assert! ::redis state) - ;; FIXME: the cache causes vthread pinning - (let [connection (.get ^Cache cache - ^Object key - ^Function (reify - Function - (apply [_ _key] - (connect* state options))))] + (let [connection (cache/get cache key (fn [_] (connect* state options)))] (-> state (dissoc ::cache) (assoc ::connection connection)))) diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj index 8314469ec..bbd5179d4 100644 --- a/backend/src/app/rpc/climit.clj +++ b/backend/src/app/rpc/climit.clj @@ -15,6 +15,7 @@ [app.metrics :as mtx] [app.rpc :as-alias rpc] [app.rpc.climit.config :as-alias config] + [app.util.cache :as cache] [app.util.services :as-alias sv] [app.util.time :as dt] [app.worker :as-alias wrk] @@ -26,33 +27,28 @@ [promesa.exec :as px] [promesa.exec.bulkhead :as pbh]) (:import - clojure.lang.ExceptionInfo - com.github.benmanes.caffeine.cache.LoadingCache - com.github.benmanes.caffeine.cache.CacheLoader - com.github.benmanes.caffeine.cache.Caffeine - com.github.benmanes.caffeine.cache.RemovalListener)) + clojure.lang.ExceptionInfo)) (set! *warn-on-reflection* true) -(defn- create-cache - [{:keys [::wrk/executor] :as params} config] - (let [listener (reify RemovalListener - (onRemoval [_ key _val cause] - (l/trace :hint "cache: remove" :key key :reason (str cause)))) +(defn- create-bulkhead-cache + [{:keys [::wrk/executor]} config] + (letfn [(load-fn [key] + (let [config (get config (nth key 0))] + (l/trace :hint "insert into cache" :key key) + (pbh/create :permits (or (:permits config) (:concurrency config)) + :queue (or (:queue config) (:queue-size config)) + :timeout (:timeout config) + :executor executor + :type (:type config :semaphore)))) - loader (reify CacheLoader - (load [_ key] - (let [config (get config (nth key 0))] - (pbh/create :permits (or (:permits config) (:concurrency config)) - :queue (or (:queue config) (:queue-size config)) - :timeout (:timeout config) - :executor executor - :type (:type config :semaphore)))))] - (.. (Caffeine/newBuilder) - (weakValues) - (executor executor) - (removalListener listener) - (build loader)))) + (on-remove [_ _ cause] + (l/trace :hint "evict from cache" :key key :reason (str cause)))] + + (cache/create :executor :same-thread + :on-remove on-remove + :keepalive "5m" + :load-fn load-fn))) (s/def ::config/permits ::us/integer) (s/def ::config/queue ::us/integer) @@ -77,12 +73,12 @@ (when-let [params (some->> path slurp edn/read-string)] (l/info :hint "initializing concurrency limit" :config (str path)) (us/verify! ::config params) - {::cache (create-cache cfg params) + {::cache (create-bulkhead-cache cfg params) ::config params ::wrk/executor executor ::mtx/metrics metrics}))) -(s/def ::cache #(instance? LoadingCache %)) +(s/def ::cache cache/cache?) (s/def ::instance (s/keys :req [::cache ::config ::wrk/executor])) @@ -95,7 +91,7 @@ (defn invoke! [cache metrics id key f] - (let [limiter (.get ^LoadingCache cache [id key]) + (let [limiter (cache/get cache [id key]) tpoint (dt/tpoint) labels (into-array String [(name id)]) diff --git a/backend/src/app/util/cache.clj b/backend/src/app/util/cache.clj new file mode 100644 index 000000000..c5aa733e6 --- /dev/null +++ b/backend/src/app/util/cache.clj @@ -0,0 +1,69 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.util.cache + "In-memory cache backed by Caffeine" + (:refer-clojure :exclude [get]) + (:require + [app.util.time :as dt] + [promesa.core :as p] + [promesa.exec :as px]) + (:import + com.github.benmanes.caffeine.cache.AsyncCache + com.github.benmanes.caffeine.cache.AsyncLoadingCache + com.github.benmanes.caffeine.cache.CacheLoader + com.github.benmanes.caffeine.cache.Caffeine + com.github.benmanes.caffeine.cache.RemovalListener + java.time.Duration + java.util.concurrent.Executor + java.util.function.Function)) + +(set! *warn-on-reflection* true) + +(defn create-listener + [f] + (reify RemovalListener + (onRemoval [_ key val cause] + (when val + (f key val cause))))) + +(defn create-loader + [f] + (reify CacheLoader + (load [_ key] + (f key)))) + +(defn create + [& {:keys [executor on-remove load-fn keepalive]}] + (as-> (Caffeine/newBuilder) builder + (if on-remove (.removalListener builder (create-listener on-remove)) builder) + (if executor (.executor builder ^Executor (px/resolve-executor executor)) builder) + (if keepalive (.expireAfterAccess builder ^Duration (dt/duration keepalive)) builder) + (if load-fn + (.buildAsync builder ^CacheLoader (create-loader load-fn)) + (.buildAsync builder)))) + +(defn invalidate-all! + [^AsyncCache cache] + (.invalidateAll (.synchronous cache))) + +(defn get + ([cache key] + (assert (instance? AsyncLoadingCache cache) "should be AsyncLoadingCache instance") + (p/await! (.get ^AsyncLoadingCache cache ^Object key))) + ([cache key not-found-fn] + (assert (instance? AsyncCache cache) "should be AsyncCache instance") + (p/await! (.get ^AsyncCache cache + ^Object key + ^Function (reify + Function + (apply [_ key] + (not-found-fn key))))))) + +(defn cache? + [o] + (or (instance? AsyncCache o) + (instance? AsyncLoadingCache o)))