0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-08 16:18:11 -05:00

♻️ Refactor local in-memory cache api

This commit is contained in:
Andrey Antukh 2023-03-09 23:00:28 +01:00
parent 76b931108e
commit c9ec5234d3
4 changed files with 102 additions and 50 deletions

View file

@ -39,7 +39,7 @@
buddy/buddy-hashers {:mvn/version "1.8.158"} buddy/buddy-hashers {:mvn/version "1.8.158"}
buddy/buddy-sign {:mvn/version "3.4.333"} buddy/buddy-sign {:mvn/version "3.4.333"}
com.github.ben-manes.caffeine/caffeine {:mvn/version "3.1.2"} com.github.ben-manes.caffeine/caffeine {:mvn/version "3.1.5"}
org.jsoup/jsoup {:mvn/version "1.15.3"} org.jsoup/jsoup {:mvn/version "1.15.3"}
org.im4java/im4java org.im4java/im4java

View file

@ -13,6 +13,7 @@
[app.common.spec :as us] [app.common.spec :as us]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.redis.script :as-alias rscript] [app.redis.script :as-alias rscript]
[app.util.cache :as cache]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[clojure.core :as c] [clojure.core :as c]
@ -47,10 +48,6 @@
io.lettuce.core.resource.DefaultClientResources io.lettuce.core.resource.DefaultClientResources
io.netty.util.HashedWheelTimer io.netty.util.HashedWheelTimer
io.netty.util.Timer io.netty.util.Timer
java.util.function.Function
com.github.benmanes.caffeine.cache.Cache
com.github.benmanes.caffeine.cache.Caffeine
com.github.benmanes.caffeine.cache.RemovalListener
java.lang.AutoCloseable java.lang.AutoCloseable
java.time.Duration)) java.time.Duration))
@ -138,17 +135,12 @@
(defn- create-cache (defn- create-cache
[{:keys [::wrk/executor] :as cfg}] [{:keys [::wrk/executor] :as cfg}]
(let [listener (reify RemovalListener (letfn [(on-remove [key val cause]
(onRemoval [_ key cache cause] (l/trace :hint "evict connection (cache)" :key key :reason cause)
(l/trace :hint "cache: remove" :key key :reason (str cause) :repr (pr-str cache)) (some-> val d/close!))]
(some-> cache d/close!))) (cache/create :executor executor
] :on-remove on-remove
:keepalive "5m")))
(.. (Caffeine/newBuilder)
(weakValues)
(executor executor)
(removalListener listener)
(build))))
(defn- initialize-resources (defn- initialize-resources
"Initialize redis connection resources" "Initialize redis connection resources"
@ -176,10 +168,11 @@
(defn- shutdown-resources (defn- shutdown-resources
[{:keys [::resources ::cache ::timer]}] [{:keys [::resources ::cache ::timer]}]
(.invalidateAll ^Cache cache) (cache/invalidate-all! cache)
(when resources (when resources
(.shutdown ^ClientResources resources)) (.shutdown ^ClientResources resources))
(when timer (when timer
(.stop ^Timer timer))) (.stop ^Timer timer)))
@ -218,13 +211,7 @@
(defn get-or-connect (defn get-or-connect
[{:keys [::cache] :as state} key options] [{:keys [::cache] :as state} key options]
(us/assert! ::redis state) (us/assert! ::redis state)
;; FIXME: the cache causes vthread pinning (let [connection (cache/get cache key (fn [_] (connect* state options)))]
(let [connection (.get ^Cache cache
^Object key
^Function (reify
Function
(apply [_ _key]
(connect* state options))))]
(-> state (-> state
(dissoc ::cache) (dissoc ::cache)
(assoc ::connection connection)))) (assoc ::connection connection))))

View file

@ -15,6 +15,7 @@
[app.metrics :as mtx] [app.metrics :as mtx]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
[app.rpc.climit.config :as-alias config] [app.rpc.climit.config :as-alias config]
[app.util.cache :as cache]
[app.util.services :as-alias sv] [app.util.services :as-alias sv]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
@ -26,33 +27,28 @@
[promesa.exec :as px] [promesa.exec :as px]
[promesa.exec.bulkhead :as pbh]) [promesa.exec.bulkhead :as pbh])
(:import (:import
clojure.lang.ExceptionInfo clojure.lang.ExceptionInfo))
com.github.benmanes.caffeine.cache.LoadingCache
com.github.benmanes.caffeine.cache.CacheLoader
com.github.benmanes.caffeine.cache.Caffeine
com.github.benmanes.caffeine.cache.RemovalListener))
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
(defn- create-cache (defn- create-bulkhead-cache
[{:keys [::wrk/executor] :as params} config] [{:keys [::wrk/executor]} config]
(let [listener (reify RemovalListener (letfn [(load-fn [key]
(onRemoval [_ key _val cause]
(l/trace :hint "cache: remove" :key key :reason (str cause))))
loader (reify CacheLoader
(load [_ key]
(let [config (get config (nth key 0))] (let [config (get config (nth key 0))]
(l/trace :hint "insert into cache" :key key)
(pbh/create :permits (or (:permits config) (:concurrency config)) (pbh/create :permits (or (:permits config) (:concurrency config))
:queue (or (:queue config) (:queue-size config)) :queue (or (:queue config) (:queue-size config))
:timeout (:timeout config) :timeout (:timeout config)
:executor executor :executor executor
:type (:type config :semaphore)))))] :type (:type config :semaphore))))
(.. (Caffeine/newBuilder)
(weakValues) (on-remove [_ _ cause]
(executor executor) (l/trace :hint "evict from cache" :key key :reason (str cause)))]
(removalListener listener)
(build loader)))) (cache/create :executor :same-thread
:on-remove on-remove
:keepalive "5m"
:load-fn load-fn)))
(s/def ::config/permits ::us/integer) (s/def ::config/permits ::us/integer)
(s/def ::config/queue ::us/integer) (s/def ::config/queue ::us/integer)
@ -77,12 +73,12 @@
(when-let [params (some->> path slurp edn/read-string)] (when-let [params (some->> path slurp edn/read-string)]
(l/info :hint "initializing concurrency limit" :config (str path)) (l/info :hint "initializing concurrency limit" :config (str path))
(us/verify! ::config params) (us/verify! ::config params)
{::cache (create-cache cfg params) {::cache (create-bulkhead-cache cfg params)
::config params ::config params
::wrk/executor executor ::wrk/executor executor
::mtx/metrics metrics}))) ::mtx/metrics metrics})))
(s/def ::cache #(instance? LoadingCache %)) (s/def ::cache cache/cache?)
(s/def ::instance (s/def ::instance
(s/keys :req [::cache ::config ::wrk/executor])) (s/keys :req [::cache ::config ::wrk/executor]))
@ -95,7 +91,7 @@
(defn invoke! (defn invoke!
[cache metrics id key f] [cache metrics id key f]
(let [limiter (.get ^LoadingCache cache [id key]) (let [limiter (cache/get cache [id key])
tpoint (dt/tpoint) tpoint (dt/tpoint)
labels (into-array String [(name id)]) labels (into-array String [(name id)])

View file

@ -0,0 +1,69 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.util.cache
"In-memory cache backed by Caffeine"
(:refer-clojure :exclude [get])
(:require
[app.util.time :as dt]
[promesa.core :as p]
[promesa.exec :as px])
(:import
com.github.benmanes.caffeine.cache.AsyncCache
com.github.benmanes.caffeine.cache.AsyncLoadingCache
com.github.benmanes.caffeine.cache.CacheLoader
com.github.benmanes.caffeine.cache.Caffeine
com.github.benmanes.caffeine.cache.RemovalListener
java.time.Duration
java.util.concurrent.Executor
java.util.function.Function))
(set! *warn-on-reflection* true)
(defn create-listener
[f]
(reify RemovalListener
(onRemoval [_ key val cause]
(when val
(f key val cause)))))
(defn create-loader
[f]
(reify CacheLoader
(load [_ key]
(f key))))
(defn create
[& {:keys [executor on-remove load-fn keepalive]}]
(as-> (Caffeine/newBuilder) builder
(if on-remove (.removalListener builder (create-listener on-remove)) builder)
(if executor (.executor builder ^Executor (px/resolve-executor executor)) builder)
(if keepalive (.expireAfterAccess builder ^Duration (dt/duration keepalive)) builder)
(if load-fn
(.buildAsync builder ^CacheLoader (create-loader load-fn))
(.buildAsync builder))))
(defn invalidate-all!
[^AsyncCache cache]
(.invalidateAll (.synchronous cache)))
(defn get
([cache key]
(assert (instance? AsyncLoadingCache cache) "should be AsyncLoadingCache instance")
(p/await! (.get ^AsyncLoadingCache cache ^Object key)))
([cache key not-found-fn]
(assert (instance? AsyncCache cache) "should be AsyncCache instance")
(p/await! (.get ^AsyncCache cache
^Object key
^Function (reify
Function
(apply [_ key]
(not-found-fn key)))))))
(defn cache?
[o]
(or (instance? AsyncCache o)
(instance? AsyncLoadingCache o)))