0
Fork 0
mirror of https://codeberg.org/forgejo/forgejo.git synced 2024-12-22 15:23:14 -05:00

Implement systemd-notify protocol (#21151)

This PR adds support for the systemd notify protocol. Several status
messagess are provided. We should likely add a common notify/status
message for graceful.

Replaces #21140

Signed-off-by: Andrew Thornton <art27@cantab.net>

---------

Signed-off-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: ltdk <usr@ltdk.xyz>
Co-authored-by: Giteabot <teabot@gitea.io>
This commit is contained in:
zeripath 2023-05-15 23:20:30 +01:00 committed by GitHub
parent a5be7f300b
commit 7565e5c3de
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 161 additions and 13 deletions

View file

@ -52,7 +52,7 @@ After=network.target
# Uncomment the next line if you have repos with lots of files and get a HTTP 500 error because of that # Uncomment the next line if you have repos with lots of files and get a HTTP 500 error because of that
# LimitNOFILE=524288:524288 # LimitNOFILE=524288:524288
RestartSec=2s RestartSec=2s
Type=simple Type=notify
User=git User=git
Group=git Group=git
WorkingDirectory=/var/lib/gitea/ WorkingDirectory=/var/lib/gitea/
@ -62,6 +62,7 @@ WorkingDirectory=/var/lib/gitea/
ExecStart=/usr/local/bin/gitea web --config /etc/gitea/app.ini ExecStart=/usr/local/bin/gitea web --config /etc/gitea/app.ini
Restart=always Restart=always
Environment=USER=git HOME=/home/git GITEA_WORK_DIR=/var/lib/gitea Environment=USER=git HOME=/home/git GITEA_WORK_DIR=/var/lib/gitea
WatchdogSec=30s
# If you install Git to directory prefix other than default PATH (which happens # If you install Git to directory prefix other than default PATH (which happens
# for example if you install other versions of Git side-to-side with # for example if you install other versions of Git side-to-side with
# distribution version), uncomment below line and add that prefix to PATH # distribution version), uncomment below line and add that prefix to PATH

View file

@ -11,6 +11,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"runtime/pprof" "runtime/pprof"
"strconv"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@ -45,7 +46,7 @@ type Manager struct {
func newGracefulManager(ctx context.Context) *Manager { func newGracefulManager(ctx context.Context) *Manager {
manager := &Manager{ manager := &Manager{
isChild: len(os.Getenv(listenFDs)) > 0 && os.Getppid() > 1, isChild: len(os.Getenv(listenFDsEnv)) > 0 && os.Getppid() > 1,
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
} }
manager.createServerWaitGroup.Add(numberOfServersToCreate) manager.createServerWaitGroup.Add(numberOfServersToCreate)
@ -53,6 +54,41 @@ func newGracefulManager(ctx context.Context) *Manager {
return manager return manager
} }
type systemdNotifyMsg string
const (
readyMsg systemdNotifyMsg = "READY=1"
stoppingMsg systemdNotifyMsg = "STOPPING=1"
reloadingMsg systemdNotifyMsg = "RELOADING=1"
watchdogMsg systemdNotifyMsg = "WATCHDOG=1"
)
func statusMsg(msg string) systemdNotifyMsg {
return systemdNotifyMsg("STATUS=" + msg)
}
func pidMsg() systemdNotifyMsg {
return systemdNotifyMsg("MAINPID=" + strconv.Itoa(os.Getpid()))
}
// Notify systemd of status via the notify protocol
func (g *Manager) notify(msg systemdNotifyMsg) {
conn, err := getNotifySocket()
if err != nil {
// the err is logged in getNotifySocket
return
}
if conn == nil {
return
}
defer conn.Close()
if _, err = conn.Write([]byte(msg)); err != nil {
log.Warn("Failed to notify NOTIFY_SOCKET: %v", err)
return
}
}
func (g *Manager) start(ctx context.Context) { func (g *Manager) start(ctx context.Context) {
// Make contexts // Make contexts
g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx) g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx)
@ -72,6 +108,8 @@ func (g *Manager) start(ctx context.Context) {
// Set the running state & handle signals // Set the running state & handle signals
g.setState(stateRunning) g.setState(stateRunning)
g.notify(statusMsg("Starting Gitea"))
g.notify(pidMsg())
go g.handleSignals(g.managerCtx) go g.handleSignals(g.managerCtx)
// Handle clean up of unused provided listeners and delayed start-up // Handle clean up of unused provided listeners and delayed start-up
@ -84,6 +122,7 @@ func (g *Manager) start(ctx context.Context) {
// Ignore the error here there's not much we can do with it // Ignore the error here there's not much we can do with it
// They're logged in the CloseProvidedListeners function // They're logged in the CloseProvidedListeners function
_ = CloseProvidedListeners() _ = CloseProvidedListeners()
g.notify(readyMsg)
}() }()
if setting.StartupTimeout > 0 { if setting.StartupTimeout > 0 {
go func() { go func() {
@ -104,6 +143,8 @@ func (g *Manager) start(ctx context.Context) {
return return
case <-time.After(setting.StartupTimeout): case <-time.After(setting.StartupTimeout):
log.Error("Startup took too long! Shutting down") log.Error("Startup took too long! Shutting down")
g.notify(statusMsg("Startup took too long! Shutting down"))
g.notify(stoppingMsg)
g.doShutdown() g.doShutdown()
} }
}() }()
@ -126,6 +167,13 @@ func (g *Manager) handleSignals(ctx context.Context) {
syscall.SIGTSTP, syscall.SIGTSTP,
) )
watchdogTimeout := getWatchdogTimeout()
t := &time.Ticker{}
if watchdogTimeout != 0 {
g.notify(watchdogMsg)
t = time.NewTicker(watchdogTimeout / 2)
}
pid := syscall.Getpid() pid := syscall.Getpid()
for { for {
select { select {
@ -136,6 +184,7 @@ func (g *Manager) handleSignals(ctx context.Context) {
g.DoGracefulRestart() g.DoGracefulRestart()
case syscall.SIGUSR1: case syscall.SIGUSR1:
log.Warn("PID %d. Received SIGUSR1. Releasing and reopening logs", pid) log.Warn("PID %d. Received SIGUSR1. Releasing and reopening logs", pid)
g.notify(statusMsg("Releasing and reopening logs"))
if err := log.ReleaseReopen(); err != nil { if err := log.ReleaseReopen(); err != nil {
log.Error("Error whilst releasing and reopening logs: %v", err) log.Error("Error whilst releasing and reopening logs: %v", err)
} }
@ -153,6 +202,8 @@ func (g *Manager) handleSignals(ctx context.Context) {
default: default:
log.Info("PID %d. Received %v.", pid, sig) log.Info("PID %d. Received %v.", pid, sig)
} }
case <-t.C:
g.notify(watchdogMsg)
case <-ctx.Done(): case <-ctx.Done():
log.Warn("PID: %d. Background context for manager closed - %v - Shutting down...", pid, ctx.Err()) log.Warn("PID: %d. Background context for manager closed - %v - Shutting down...", pid, ctx.Err())
g.DoGracefulShutdown() g.DoGracefulShutdown()
@ -169,6 +220,9 @@ func (g *Manager) doFork() error {
} }
g.forked = true g.forked = true
g.lock.Unlock() g.lock.Unlock()
g.notify(reloadingMsg)
// We need to move the file logs to append pids // We need to move the file logs to append pids
setting.RestartLogsWithPIDSuffix() setting.RestartLogsWithPIDSuffix()
@ -191,18 +245,27 @@ func (g *Manager) DoGracefulRestart() {
} }
} else { } else {
log.Info("PID: %d. Not set restartable. Shutting down...", os.Getpid()) log.Info("PID: %d. Not set restartable. Shutting down...", os.Getpid())
g.notify(stoppingMsg)
g.doShutdown() g.doShutdown()
} }
} }
// DoImmediateHammer causes an immediate hammer // DoImmediateHammer causes an immediate hammer
func (g *Manager) DoImmediateHammer() { func (g *Manager) DoImmediateHammer() {
g.notify(statusMsg("Sending immediate hammer"))
g.doHammerTime(0 * time.Second) g.doHammerTime(0 * time.Second)
} }
// DoGracefulShutdown causes a graceful shutdown // DoGracefulShutdown causes a graceful shutdown
func (g *Manager) DoGracefulShutdown() { func (g *Manager) DoGracefulShutdown() {
g.lock.Lock()
if !g.forked {
g.lock.Unlock()
g.notify(stoppingMsg)
} else {
g.lock.Unlock()
g.notify(statusMsg("Shutting down after fork"))
}
g.doShutdown() g.doShutdown()
} }

View file

@ -14,6 +14,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
@ -21,9 +22,12 @@ import (
) )
const ( const (
listenFDs = "LISTEN_FDS" listenFDsEnv = "LISTEN_FDS"
startFD = 3 startFD = 3
unlinkFDs = "GITEA_UNLINK_FDS" unlinkFDsEnv = "GITEA_UNLINK_FDS"
notifySocketEnv = "NOTIFY_SOCKET"
watchdogTimeoutEnv = "WATCHDOG_USEC"
) )
// In order to keep the working directory the same as when we started we record // In order to keep the working directory the same as when we started we record
@ -38,6 +42,9 @@ var (
activeListenersToUnlink = []bool{} activeListenersToUnlink = []bool{}
providedListeners = []net.Listener{} providedListeners = []net.Listener{}
activeListeners = []net.Listener{} activeListeners = []net.Listener{}
notifySocketAddr string
watchdogTimeout time.Duration
) )
func getProvidedFDs() (savedErr error) { func getProvidedFDs() (savedErr error) {
@ -45,18 +52,52 @@ func getProvidedFDs() (savedErr error) {
once.Do(func() { once.Do(func() {
mutex.Lock() mutex.Lock()
defer mutex.Unlock() defer mutex.Unlock()
// now handle some additional systemd provided things
notifySocketAddr = os.Getenv(notifySocketEnv)
if notifySocketAddr != "" {
log.Debug("Systemd Notify Socket provided: %s", notifySocketAddr)
savedErr = os.Unsetenv(notifySocketEnv)
if savedErr != nil {
log.Warn("Unable to Unset the NOTIFY_SOCKET environment variable: %v", savedErr)
return
}
// FIXME: We don't handle WATCHDOG_PID
timeoutStr := os.Getenv(watchdogTimeoutEnv)
if timeoutStr != "" {
savedErr = os.Unsetenv(watchdogTimeoutEnv)
if savedErr != nil {
log.Warn("Unable to Unset the WATCHDOG_USEC environment variable: %v", savedErr)
return
}
numFDs := os.Getenv(listenFDs) s, err := strconv.ParseInt(timeoutStr, 10, 64)
if err != nil {
log.Error("Unable to parse the provided WATCHDOG_USEC: %v", err)
savedErr = fmt.Errorf("unable to parse the provided WATCHDOG_USEC: %w", err)
return
}
if s <= 0 {
log.Error("Unable to parse the provided WATCHDOG_USEC: %s should be a positive number", timeoutStr)
savedErr = fmt.Errorf("unable to parse the provided WATCHDOG_USEC: %s should be a positive number", timeoutStr)
return
}
watchdogTimeout = time.Duration(s) * time.Microsecond
}
} else {
log.Trace("No Systemd Notify Socket provided")
}
numFDs := os.Getenv(listenFDsEnv)
if numFDs == "" { if numFDs == "" {
return return
} }
n, err := strconv.Atoi(numFDs) n, err := strconv.Atoi(numFDs)
if err != nil { if err != nil {
savedErr = fmt.Errorf("%s is not a number: %s. Err: %w", listenFDs, numFDs, err) savedErr = fmt.Errorf("%s is not a number: %s. Err: %w", listenFDsEnv, numFDs, err)
return return
} }
fdsToUnlinkStr := strings.Split(os.Getenv(unlinkFDs), ",") fdsToUnlinkStr := strings.Split(os.Getenv(unlinkFDsEnv), ",")
providedListenersToUnlink = make([]bool, n) providedListenersToUnlink = make([]bool, n)
for _, fdStr := range fdsToUnlinkStr { for _, fdStr := range fdsToUnlinkStr {
i, err := strconv.Atoi(fdStr) i, err := strconv.Atoi(fdStr)
@ -73,7 +114,7 @@ func getProvidedFDs() (savedErr error) {
if err == nil { if err == nil {
// Close the inherited file if it's a listener // Close the inherited file if it's a listener
if err = file.Close(); err != nil { if err = file.Close(); err != nil {
savedErr = fmt.Errorf("error closing provided socket fd %d: %s", i, err) savedErr = fmt.Errorf("error closing provided socket fd %d: %w", i, err)
return return
} }
providedListeners = append(providedListeners, l) providedListeners = append(providedListeners, l)
@ -255,3 +296,36 @@ func getActiveListenersToUnlink() []bool {
copy(listenersToUnlink, activeListenersToUnlink) copy(listenersToUnlink, activeListenersToUnlink)
return listenersToUnlink return listenersToUnlink
} }
func getNotifySocket() (*net.UnixConn, error) {
if err := getProvidedFDs(); err != nil {
// This error will be logged elsewhere
return nil, nil
}
if notifySocketAddr == "" {
return nil, nil
}
socketAddr := &net.UnixAddr{
Name: notifySocketAddr,
Net: "unixgram",
}
notifySocket, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
if err != nil {
log.Warn("failed to dial NOTIFY_SOCKET %s: %v", socketAddr, err)
return nil, err
}
return notifySocket, nil
}
func getWatchdogTimeout() time.Duration {
if err := getProvidedFDs(); err != nil {
// This error will be logged elsewhere
return 0
}
return watchdogTimeout
}

View file

@ -16,6 +16,7 @@ import (
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
"time"
) )
var killParent sync.Once var killParent sync.Once
@ -70,11 +71,20 @@ func RestartProcess() (int, error) {
// Pass on the environment and replace the old count key with the new one. // Pass on the environment and replace the old count key with the new one.
var env []string var env []string
for _, v := range os.Environ() { for _, v := range os.Environ() {
if !strings.HasPrefix(v, listenFDs+"=") { if !strings.HasPrefix(v, listenFDsEnv+"=") {
env = append(env, v) env = append(env, v)
} }
} }
env = append(env, fmt.Sprintf("%s=%d", listenFDs, len(listeners))) env = append(env, fmt.Sprintf("%s=%d", listenFDsEnv, len(listeners)))
if notifySocketAddr != "" {
env = append(env, fmt.Sprintf("%s=%s", notifySocketEnv, notifySocketAddr))
}
if watchdogTimeout != 0 {
watchdogStr := strconv.FormatInt(int64(watchdogTimeout/time.Millisecond), 10)
env = append(env, fmt.Sprintf("%s=%s", watchdogTimeoutEnv, watchdogStr))
}
sb := &strings.Builder{} sb := &strings.Builder{}
for i, unlink := range getActiveListenersToUnlink() { for i, unlink := range getActiveListenersToUnlink() {
@ -87,7 +97,7 @@ func RestartProcess() (int, error) {
unlinkStr := sb.String() unlinkStr := sb.String()
if len(unlinkStr) > 0 { if len(unlinkStr) > 0 {
unlinkStr = unlinkStr[:len(unlinkStr)-1] unlinkStr = unlinkStr[:len(unlinkStr)-1]
env = append(env, fmt.Sprintf("%s=%s", unlinkFDs, unlinkStr)) env = append(env, fmt.Sprintf("%s=%s", unlinkFDsEnv, unlinkStr))
} }
allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...) allFiles := append([]*os.File{os.Stdin, os.Stdout, os.Stderr}, files...)