diff --git a/go.mod b/go.mod index 41248b79..160c4f0b 100644 --- a/go.mod +++ b/go.mod @@ -58,11 +58,13 @@ require ( github.com/google/go-tspi v0.3.0 // indirect github.com/google/pprof v0.0.0-20231212022811-ec68065c825e // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/onsi/ginkgo/v2 v2.13.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/quic-go/qpack v0.4.0 // indirect github.com/smallstep/go-attestation v0.4.4-0.20230627102604-cf579e53cbd2 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/blake3 v0.2.3 // indirect go.opentelemetry.io/contrib/propagators/aws v1.17.0 // indirect @@ -125,6 +127,7 @@ require ( github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/rosedblabs/wal v1.3.6 github.com/rs/xid v1.5.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shopspring/decimal v1.2.0 // indirect diff --git a/go.sum b/go.sum index 3a7cd85d..0368fb5a 100644 --- a/go.sum +++ b/go.sum @@ -259,6 +259,8 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09 github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU= +github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= @@ -494,6 +496,8 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rosedblabs/wal v1.3.6 h1:oxZYTPX/u4JuGDW98wQ1YamWqerlrlSUFKhgP6Gd/Ao= +github.com/rosedblabs/wal v1.3.6/go.mod h1:wdq54KJUyVTOv1uddMc6Cdh2d/YCIo8yjcwJAb1RCEM= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -582,6 +586,8 @@ github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijb github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= diff --git a/modules/logging/netwriter.go b/modules/logging/netwriter.go index dc2b0922..a363e4b3 100644 --- a/modules/logging/netwriter.go +++ b/modules/logging/netwriter.go @@ -15,8 +15,10 @@ package logging import ( + "errors" "fmt" "io" + "log" "net" "os" "sync" @@ -24,6 +26,8 @@ import ( "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" + + "github.com/rosedblabs/wal" ) func init() { @@ -46,6 +50,8 @@ type NetWriter struct { SoftStart bool `json:"soft_start,omitempty"` addr caddy.NetworkAddress + w *wal.WAL + wr *wal.Reader } // CaddyModule returns the Caddy module information. @@ -89,8 +95,25 @@ func (nw NetWriter) WriterKey() string { return nw.addr.String() } +func getPool() []byte { + return make([]byte, 1024) +} + // OpenWriter opens a new network connection. -func (nw NetWriter) OpenWriter() (io.WriteCloser, error) { +func (nw *NetWriter) OpenWriter() (io.WriteCloser, error) { + if err := os.MkdirAll(caddy.AppDataDir()+"/wal", 0o755); err != nil { + return nil, err + } + opts := wal.DefaultOptions + opts.DirPath = caddy.AppDataDir() + "/wal" + opts.Sync = true + opts.SegmentSize = wal.KB + opts.BlockCache = wal.KB / 2 + w, err := wal.Open(opts) + if err != nil { + return nil, err + } + nw.w, nw.wr = w, w.NewReader() reconn := &redialerConn{ nw: nw, timeout: time.Duration(nw.DialTimeout), @@ -107,9 +130,33 @@ func (nw NetWriter) OpenWriter() (io.WriteCloser, error) { reconn.connMu.Lock() reconn.Conn = conn reconn.connMu.Unlock() + go reconn.readWal() return reconn, nil } +func (rc *redialerConn) readWal() { + for { + data, _, err := rc.nw.wr.Next() + if err == io.EOF { + continue + } + if err == wal.ErrClosed { + log.Printf("wal closed") + return + } + if err != nil { + log.Printf("error reading from wal: %v", err) + continue + } + log.Printf("trying to write") + log.Printf("data is: %s", string(data)) + for _, err := rc.write(data); err != nil; _, err = rc.write(data) { + log.Printf("failed to write: %s", err) + time.Sleep(time.Second) + } + } +} + // UnmarshalCaddyfile sets up the handler from Caddyfile tokens. Syntax: // // net
{ @@ -156,14 +203,20 @@ func (nw *NetWriter) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { type redialerConn struct { net.Conn connMu sync.RWMutex - nw NetWriter + nw *NetWriter timeout time.Duration lastRedial time.Time } +func (reconn *redialerConn) Write(b []byte) (n int, err error) { + _, err = reconn.nw.w.Write(b) + reconn.nw.w.Sync() + return len(b), err +} + // Write wraps the underlying Conn.Write method, but if that fails, // it will re-dial the connection anew and try writing again. -func (reconn *redialerConn) Write(b []byte) (n int, err error) { +func (reconn *redialerConn) write(b []byte) (n int, err error) { reconn.connMu.RLock() conn := reconn.Conn reconn.connMu.RUnlock() @@ -195,6 +248,7 @@ func (reconn *redialerConn) Write(b []byte) (n int, err error) { if err2 != nil { // logger socket still offline; instead of discarding the log, dump it to stderr os.Stderr.Write(b) + err = err2 return } if n, err = conn2.Write(b); err == nil { @@ -203,14 +257,20 @@ func (reconn *redialerConn) Write(b []byte) (n int, err error) { } reconn.Conn = conn2 } - } else { - // last redial attempt was too recent; just dump to stderr for now - os.Stderr.Write(b) } return } +func (reconn *redialerConn) Close() error { + return errors.Join( + reconn.nw.w.Sync(), + reconn.nw.w.Close(), + reconn.nw.w.Delete(), + reconn.Conn.Close(), + ) +} + func (reconn *redialerConn) dial() (net.Conn, error) { return net.DialTimeout(reconn.nw.addr.Network, reconn.nw.addr.JoinHostPort(0), reconn.timeout) }