0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-13 22:50:38 -05:00
zot/pkg/api/controller.go
Vishwas R 5ae7a028d9
feat(cluster): Add support for request proxying for scale out (#2385)
* feat(cluster): initial commit for scale-out cluster

Signed-off-by: Ramkumar Chinchani <rchincha@cisco.com>

* feat(cluster): support shared storage scale out

This change introduces support for shared storage backed
zot cluster scale out.

New feature
Multiple stateless zot instances can run using the same shared
storage backend where each instance looks at a specific set
of repositories based on a siphash of the repository name to improve
scale as the load is distributed across multiple instances.
For a given config, there will only be one instance that can perform
dist-spec read/write on a given repository.

What's changed?
- introduced a transparent request proxy for dist-spec endpoints based on
siphash of repository name.
- new config for scale out cluster that specifies list of
cluster members.

Signed-off-by: Vishwas Rajashekar <vrajashe@cisco.com>

---------

Signed-off-by: Ramkumar Chinchani <rchincha@cisco.com>
Signed-off-by: Vishwas Rajashekar <vrajashe@cisco.com>
Co-authored-by: Ramkumar Chinchani <rchincha@cisco.com>
2024-05-20 09:05:21 -07:00

515 lines
14 KiB
Go

package api
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"syscall"
"time"
"github.com/gorilla/mux"
"github.com/zitadel/oidc/pkg/client/rp"
"zotregistry.dev/zot/errors"
"zotregistry.dev/zot/pkg/api/config"
"zotregistry.dev/zot/pkg/common"
ext "zotregistry.dev/zot/pkg/extensions"
extconf "zotregistry.dev/zot/pkg/extensions/config"
"zotregistry.dev/zot/pkg/extensions/monitoring"
"zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/meta"
mTypes "zotregistry.dev/zot/pkg/meta/types"
"zotregistry.dev/zot/pkg/scheduler"
"zotregistry.dev/zot/pkg/storage"
"zotregistry.dev/zot/pkg/storage/gc"
)
const (
idleTimeout = 120 * time.Second
readHeaderTimeout = 5 * time.Second
)
type Controller struct {
Config *config.Config
Router *mux.Router
MetaDB mTypes.MetaDB
StoreController storage.StoreController
Log log.Logger
Audit *log.Logger
Server *http.Server
Metrics monitoring.MetricServer
CveScanner ext.CveScanner
SyncOnDemand SyncOnDemand
RelyingParties map[string]rp.RelyingParty
CookieStore *CookieStore
LDAPClient *LDAPClient
taskScheduler *scheduler.Scheduler
// runtime params
chosenPort int // kernel-chosen port
}
func NewController(appConfig *config.Config) *Controller {
var controller Controller
logger := log.NewLogger(appConfig.Log.Level, appConfig.Log.Output)
if appConfig.Cluster != nil {
// we need the set of local sockets (IP address:port) for identifying
// the local member cluster socket for logging and lookup.
localSockets, err := common.GetLocalSockets(appConfig.HTTP.Port)
if err != nil {
logger.Error().Err(err).Msg("failed to get local sockets")
panic("failed to get local sockets")
}
// memberSocket is the local member's socket
// the index is also fetched for quick lookups during proxying
memberSocketIdx, memberSocket, err := GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets)
if err != nil {
logger.Error().Err(err).Msg("failed to get member socket")
panic("failed to get member socket")
}
if memberSocket == "" {
// there is a misconfiguration if the memberSocket cannot be identified
logger.Error().
Str("members", strings.Join(appConfig.Cluster.Members, ",")).
Str("localSockets", strings.Join(localSockets, ",")).
Msg("failed to determine the local cluster socket")
panic("failed to determine the local cluster socket")
}
internalProxyConfig := &config.ClusterRequestProxyConfig{
LocalMemberClusterSocket: memberSocket,
LocalMemberClusterSocketIndex: uint64(memberSocketIdx),
}
appConfig.Cluster.Proxy = internalProxyConfig
logger.Logger = logger.Logger.With().
Str("clusterMember", memberSocket).
Str("clusterMemberIndex", strconv.Itoa(memberSocketIdx)).Logger()
}
controller.Config = appConfig
controller.Log = logger
if appConfig.Log.Audit != "" {
audit := log.NewAuditLogger(appConfig.Log.Level, appConfig.Log.Audit)
controller.Audit = audit
}
return &controller
}
func DumpRuntimeParams(log log.Logger) {
var rLimit syscall.Rlimit
evt := log.Info().Int("cpus", runtime.NumCPU()) //nolint: zerologlint
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err == nil {
evt = evt.Uint64("max. open files", uint64(rLimit.Cur)) //nolint: unconvert // required for *BSD
}
if content, err := os.ReadFile("/proc/sys/net/core/somaxconn"); err == nil {
evt = evt.Str("listen backlog", strings.TrimSuffix(string(content), "\n"))
}
if content, err := os.ReadFile("/proc/sys/user/max_inotify_watches"); err == nil {
evt = evt.Str("max. inotify watches", strings.TrimSuffix(string(content), "\n"))
}
evt.Msg("runtime params")
}
func (c *Controller) GetPort() int {
return c.chosenPort
}
func (c *Controller) Run() error {
if err := c.initCookieStore(); err != nil {
return err
}
c.StartBackgroundTasks()
// setup HTTP API router
engine := mux.NewRouter()
// rate-limit HTTP requests if enabled
if c.Config.HTTP.Ratelimit != nil {
if c.Config.HTTP.Ratelimit.Rate != nil {
engine.Use(RateLimiter(c, *c.Config.HTTP.Ratelimit.Rate))
}
for _, mrlim := range c.Config.HTTP.Ratelimit.Methods {
engine.Use(MethodRateLimiter(c, mrlim.Method, mrlim.Rate))
}
}
engine.Use(
SessionLogger(c),
RecoveryHandler(c.Log),
)
if c.Audit != nil {
engine.Use(SessionAuditLogger(c.Audit))
}
c.Router = engine
c.Router.UseEncodedPath()
monitoring.SetServerInfo(c.Metrics, c.Config.Commit, c.Config.BinaryType, c.Config.GoVersion,
c.Config.DistSpecVersion)
//nolint: contextcheck
_ = NewRouteHandler(c)
addr := fmt.Sprintf("%s:%s", c.Config.HTTP.Address, c.Config.HTTP.Port)
server := &http.Server{
Addr: addr,
Handler: c.Router,
IdleTimeout: idleTimeout,
ReadHeaderTimeout: readHeaderTimeout,
}
c.Server = server
// Create the listener
listener, err := net.Listen("tcp", addr)
if err != nil {
return err
}
if c.Config.HTTP.Port == "0" || c.Config.HTTP.Port == "" {
chosenAddr, ok := listener.Addr().(*net.TCPAddr)
if !ok {
c.Log.Error().Str("port", c.Config.HTTP.Port).Msg("invalid addr type")
return errors.ErrBadType
}
c.chosenPort = chosenAddr.Port
c.Log.Info().Int("port", chosenAddr.Port).IPAddr("address", chosenAddr.IP).Msg(
"port is unspecified, listening on kernel chosen port",
)
} else {
chosenPort, _ := strconv.ParseInt(c.Config.HTTP.Port, 10, 64)
c.chosenPort = int(chosenPort)
}
if c.Config.HTTP.TLS != nil && c.Config.HTTP.TLS.Key != "" && c.Config.HTTP.TLS.Cert != "" {
server.TLSConfig = &tls.Config{
CipherSuites: []uint16{
tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
},
CurvePreferences: []tls.CurveID{
tls.CurveP256,
tls.X25519,
},
PreferServerCipherSuites: true,
MinVersion: tls.VersionTLS12,
}
if c.Config.HTTP.TLS.CACert != "" {
clientAuth := tls.VerifyClientCertIfGiven
if c.Config.IsMTLSAuthEnabled() {
clientAuth = tls.RequireAndVerifyClientCert
}
caCert, err := os.ReadFile(c.Config.HTTP.TLS.CACert)
if err != nil {
c.Log.Error().Err(err).Str("caCert", c.Config.HTTP.TLS.CACert).Msg("failed to read file")
return err
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
c.Log.Error().Err(errors.ErrBadCACert).Msg("failed to append certs from pem")
return errors.ErrBadCACert
}
server.TLSConfig.ClientAuth = clientAuth
server.TLSConfig.ClientCAs = caCertPool
}
return server.ServeTLS(listener, c.Config.HTTP.TLS.Cert, c.Config.HTTP.TLS.Key)
}
return server.Serve(listener)
}
func (c *Controller) Init() error {
// print the current configuration, but strip secrets
c.Log.Info().Interface("params", c.Config.Sanitize()).Msg("configuration settings")
// print the current runtime environment
DumpRuntimeParams(c.Log)
var enabled bool
if c.Config != nil &&
c.Config.Extensions != nil &&
c.Config.Extensions.Metrics != nil &&
*c.Config.Extensions.Metrics.Enable {
enabled = true
}
c.Metrics = monitoring.NewMetricsServer(enabled, c.Log)
if err := c.InitImageStore(); err != nil { //nolint:contextcheck
return err
}
if err := c.InitMetaDB(); err != nil {
return err
}
c.InitCVEInfo()
return nil
}
func (c *Controller) InitCVEInfo() {
// Enable CVE extension if extension config is provided
if c.Config != nil && c.Config.Extensions != nil {
c.CveScanner = ext.GetCveScanner(c.Config, c.StoreController, c.MetaDB, c.Log)
}
}
func (c *Controller) InitImageStore() error {
linter := ext.GetLinter(c.Config, c.Log)
storeController, err := storage.New(c.Config, linter, c.Metrics, c.Log)
if err != nil {
return err
}
c.StoreController = storeController
return nil
}
func (c *Controller) initCookieStore() error {
// setup sessions cookie store used to preserve logged in user in web sessions
if c.Config.IsBasicAuthnEnabled() {
cookieStore, err := NewCookieStore(c.StoreController)
if err != nil {
return err
}
c.CookieStore = cookieStore
}
return nil
}
func (c *Controller) InitMetaDB() error {
// init metaDB if search is enabled or we need to store user profiles, api keys or signatures
if c.Config.IsSearchEnabled() || c.Config.IsBasicAuthnEnabled() || c.Config.IsImageTrustEnabled() ||
c.Config.IsRetentionEnabled() {
driver, err := meta.New(c.Config.Storage.StorageConfig, c.Log) //nolint:contextcheck
if err != nil {
return err
}
err = ext.SetupExtensions(c.Config, driver, c.Log) //nolint:contextcheck
if err != nil {
return err
}
err = driver.PatchDB()
if err != nil {
return err
}
err = meta.ParseStorage(driver, c.StoreController, c.Log) //nolint: contextcheck
if err != nil {
return err
}
c.MetaDB = driver
}
return nil
}
func (c *Controller) LoadNewConfig(newConfig *config.Config) {
// reload access control config
c.Config.HTTP.AccessControl = newConfig.HTTP.AccessControl
if c.Config.HTTP.Auth != nil {
c.Config.HTTP.Auth.LDAP = newConfig.HTTP.Auth.LDAP
if c.LDAPClient != nil {
c.LDAPClient.lock.Lock()
c.LDAPClient.BindDN = newConfig.HTTP.Auth.LDAP.BindDN()
c.LDAPClient.BindPassword = newConfig.HTTP.Auth.LDAP.BindPassword()
c.LDAPClient.lock.Unlock()
}
}
// reload periodical gc config
c.Config.Storage.GC = newConfig.Storage.GC
c.Config.Storage.Dedupe = newConfig.Storage.Dedupe
c.Config.Storage.GCDelay = newConfig.Storage.GCDelay
c.Config.Storage.GCInterval = newConfig.Storage.GCInterval
// only if we have a metaDB already in place
if c.Config.IsRetentionEnabled() {
c.Config.Storage.Retention = newConfig.Storage.Retention
}
for subPath, storageConfig := range newConfig.Storage.SubPaths {
subPathConfig, ok := c.Config.Storage.SubPaths[subPath]
if ok {
subPathConfig.GC = storageConfig.GC
subPathConfig.Dedupe = storageConfig.Dedupe
subPathConfig.GCDelay = storageConfig.GCDelay
subPathConfig.GCInterval = storageConfig.GCInterval
// only if we have a metaDB already in place
if c.Config.IsRetentionEnabled() {
subPathConfig.Retention = storageConfig.Retention
}
c.Config.Storage.SubPaths[subPath] = subPathConfig
}
}
// reload background tasks
if newConfig.Extensions != nil {
if c.Config.Extensions == nil {
c.Config.Extensions = &extconf.ExtensionConfig{}
}
// reload sync extension
c.Config.Extensions.Sync = newConfig.Extensions.Sync
// reload only if search is enabled and reloaded config has search extension (can't setup routes at this stage)
if c.Config.Extensions.Search != nil && *c.Config.Extensions.Search.Enable {
if newConfig.Extensions.Search != nil {
c.Config.Extensions.Search.CVE = newConfig.Extensions.Search.CVE
}
}
// reload scrub extension
c.Config.Extensions.Scrub = newConfig.Extensions.Scrub
} else {
c.Config.Extensions = nil
}
c.InitCVEInfo()
c.Log.Info().Interface("reloaded params", c.Config.Sanitize()).
Msg("loaded new configuration settings")
}
func (c *Controller) Shutdown() {
c.StopBackgroundTasks()
if c.Server != nil {
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}
}
// Will stop scheduler and wait for all tasks to finish their work.
func (c *Controller) StopBackgroundTasks() {
if c.taskScheduler != nil {
c.taskScheduler.Shutdown()
}
}
func (c *Controller) StartBackgroundTasks() {
c.taskScheduler = scheduler.NewScheduler(c.Config, c.Metrics, c.Log)
c.taskScheduler.RunScheduler()
// Enable running garbage-collect periodically for DefaultStore
if c.Config.Storage.GC {
gc := gc.NewGarbageCollect(c.StoreController.DefaultStore, c.MetaDB, gc.Options{
Delay: c.Config.Storage.GCDelay,
ImageRetention: c.Config.Storage.Retention,
}, c.Audit, c.Log)
gc.CleanImageStorePeriodically(c.Config.Storage.GCInterval, c.taskScheduler)
}
// Enable running dedupe blobs both ways (dedupe or restore deduped blobs)
c.StoreController.DefaultStore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)
// Enable extensions if extension config is provided for DefaultStore
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, c.taskScheduler, c.CveScanner, c.Log)
}
// runs once if metrics are enabled & imagestore is local
if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
c.StoreController.DefaultStore.PopulateStorageMetrics(time.Duration(0), c.taskScheduler)
}
if c.Config.Storage.SubPaths != nil {
for route, storageConfig := range c.Config.Storage.SubPaths {
// Enable running garbage-collect periodically for subImageStore
if storageConfig.GC {
gc := gc.NewGarbageCollect(c.StoreController.SubStore[route], c.MetaDB,
gc.Options{
Delay: storageConfig.GCDelay,
ImageRetention: storageConfig.Retention,
}, c.Audit, c.Log)
gc.CleanImageStorePeriodically(storageConfig.GCInterval, c.taskScheduler)
}
// Enable extensions if extension config is provided for subImageStore
if c.Config != nil && c.Config.Extensions != nil {
ext.EnableMetricsExtension(c.Config, c.Log, storageConfig.RootDirectory)
}
// Enable running dedupe blobs both ways (dedupe or restore deduped blobs) for subpaths
substore := c.StoreController.SubStore[route]
if substore != nil {
substore.RunDedupeBlobs(time.Duration(0), c.taskScheduler)
if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
substore.PopulateStorageMetrics(time.Duration(0), c.taskScheduler)
}
}
}
}
if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, c.taskScheduler)
//nolint: contextcheck
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log)
if err != nil {
c.Log.Error().Err(err).Msg("failed to start sync extension")
}
c.SyncOnDemand = syncOnDemand
}
if c.CookieStore != nil {
c.CookieStore.RunSessionCleaner(c.taskScheduler)
}
// we can later move enabling the other scheduled tasks inside the call below
ext.EnableScheduledTasks(c.Config, c.taskScheduler, c.MetaDB, c.Log) //nolint: contextcheck
}
type SyncOnDemand interface {
SyncImage(ctx context.Context, repo, reference string) error
SyncReference(ctx context.Context, repo string, subjectDigestStr string, referenceType string) error
}