1
Fork 0
mirror of https://github.com/caddyserver/caddy.git synced 2024-12-16 21:56:40 -05:00

log: implement WAL for net writer (WIP)

This commit is contained in:
Mohammed Al Sahaf 2024-03-22 21:15:16 +03:00
parent 29f57faa86
commit 45d5d86773
3 changed files with 75 additions and 6 deletions

3
go.mod
View file

@ -58,11 +58,13 @@ require (
github.com/google/go-tspi v0.3.0 // indirect github.com/google/go-tspi v0.3.0 // indirect
github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/onsi/ginkgo/v2 v2.13.2 // indirect github.com/onsi/ginkgo/v2 v2.13.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/qpack v0.4.0 // indirect
github.com/smallstep/go-attestation v0.4.4-0.20230627102604-cf579e53cbd2 // indirect github.com/smallstep/go-attestation v0.4.4-0.20230627102604-cf579e53cbd2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect github.com/x448/float16 v0.8.4 // indirect
github.com/zeebo/blake3 v0.2.3 // indirect github.com/zeebo/blake3 v0.2.3 // indirect
go.opentelemetry.io/contrib/propagators/aws v1.17.0 // indirect go.opentelemetry.io/contrib/propagators/aws v1.17.0 // indirect
@ -125,6 +127,7 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect
github.com/rosedblabs/wal v1.3.6
github.com/rs/xid v1.5.0 // indirect github.com/rs/xid v1.5.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shopspring/decimal v1.2.0 // indirect github.com/shopspring/decimal v1.2.0 // indirect

6
go.sum
View file

@ -259,6 +259,8 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU=
github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
@ -494,6 +496,8 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rosedblabs/wal v1.3.6 h1:oxZYTPX/u4JuGDW98wQ1YamWqerlrlSUFKhgP6Gd/Ao=
github.com/rosedblabs/wal v1.3.6/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
@ -582,6 +586,8 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk=
github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=

View file

@ -15,8 +15,10 @@
package logging package logging
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"log"
"net" "net"
"os" "os"
"sync" "sync"
@ -24,6 +26,8 @@ import (
"github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2"
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
"github.com/rosedblabs/wal"
) )
func init() { func init() {
@ -46,6 +50,8 @@ type NetWriter struct {
SoftStart bool `json:"soft_start,omitempty"` SoftStart bool `json:"soft_start,omitempty"`
addr caddy.NetworkAddress addr caddy.NetworkAddress
w *wal.WAL
wr *wal.Reader
} }
// CaddyModule returns the Caddy module information. // CaddyModule returns the Caddy module information.
@ -89,8 +95,25 @@ func (nw NetWriter) WriterKey() string {
return nw.addr.String() return nw.addr.String()
} }
func getPool() []byte {
return make([]byte, 1024)
}
// OpenWriter opens a new network connection. // OpenWriter opens a new network connection.
func (nw NetWriter) OpenWriter() (io.WriteCloser, error) { func (nw *NetWriter) OpenWriter() (io.WriteCloser, error) {
if err := os.MkdirAll(caddy.AppDataDir()+"/wal", 0o755); err != nil {
return nil, err
}
opts := wal.DefaultOptions
opts.DirPath = caddy.AppDataDir() + "/wal"
opts.Sync = true
opts.SegmentSize = wal.KB
opts.BlockCache = wal.KB / 2
w, err := wal.Open(opts)
if err != nil {
return nil, err
}
nw.w, nw.wr = w, w.NewReader()
reconn := &redialerConn{ reconn := &redialerConn{
nw: nw, nw: nw,
timeout: time.Duration(nw.DialTimeout), timeout: time.Duration(nw.DialTimeout),
@ -107,9 +130,33 @@ func (nw NetWriter) OpenWriter() (io.WriteCloser, error) {
reconn.connMu.Lock() reconn.connMu.Lock()
reconn.Conn = conn reconn.Conn = conn
reconn.connMu.Unlock() reconn.connMu.Unlock()
go reconn.readWal()
return reconn, nil return reconn, nil
} }
func (rc *redialerConn) readWal() {
for {
data, _, err := rc.nw.wr.Next()
if err == io.EOF {
continue
}
if err == wal.ErrClosed {
log.Printf("wal closed")
return
}
if err != nil {
log.Printf("error reading from wal: %v", err)
continue
}
log.Printf("trying to write")
log.Printf("data is: %s", string(data))
for _, err := rc.write(data); err != nil; _, err = rc.write(data) {
log.Printf("failed to write: %s", err)
time.Sleep(time.Second)
}
}
}
// UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax:
// //
// net <address> { // net <address> {
@ -156,14 +203,20 @@ func (nw *NetWriter) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
type redialerConn struct { type redialerConn struct {
net.Conn net.Conn
connMu sync.RWMutex connMu sync.RWMutex
nw NetWriter nw *NetWriter
timeout time.Duration timeout time.Duration
lastRedial time.Time lastRedial time.Time
} }
func (reconn *redialerConn) Write(b []byte) (n int, err error) {
_, err = reconn.nw.w.Write(b)
reconn.nw.w.Sync()
return len(b), err
}
// Write wraps the underlying Conn.Write method, but if that fails, // Write wraps the underlying Conn.Write method, but if that fails,
// it will re-dial the connection anew and try writing again. // it will re-dial the connection anew and try writing again.
func (reconn *redialerConn) Write(b []byte) (n int, err error) { func (reconn *redialerConn) write(b []byte) (n int, err error) {
reconn.connMu.RLock() reconn.connMu.RLock()
conn := reconn.Conn conn := reconn.Conn
reconn.connMu.RUnlock() reconn.connMu.RUnlock()
@ -195,6 +248,7 @@ func (reconn *redialerConn) Write(b []byte) (n int, err error) {
if err2 != nil { if err2 != nil {
// logger socket still offline; instead of discarding the log, dump it to stderr // logger socket still offline; instead of discarding the log, dump it to stderr
os.Stderr.Write(b) os.Stderr.Write(b)
err = err2
return return
} }
if n, err = conn2.Write(b); err == nil { if n, err = conn2.Write(b); err == nil {
@ -203,14 +257,20 @@ func (reconn *redialerConn) Write(b []byte) (n int, err error) {
} }
reconn.Conn = conn2 reconn.Conn = conn2
} }
} else {
// last redial attempt was too recent; just dump to stderr for now
os.Stderr.Write(b)
} }
return return
} }
func (reconn *redialerConn) Close() error {
return errors.Join(
reconn.nw.w.Sync(),
reconn.nw.w.Close(),
reconn.nw.w.Delete(),
reconn.Conn.Close(),
)
}
func (reconn *redialerConn) dial() (net.Conn, error) { func (reconn *redialerConn) dial() (net.Conn, error) {
return net.DialTimeout(reconn.nw.addr.Network, reconn.nw.addr.JoinHostPort(0), reconn.timeout) return net.DialTimeout(reconn.nw.addr.Network, reconn.nw.addr.JoinHostPort(0), reconn.timeout)
} }