mirror of
https://github.com/caddyserver/caddy.git
synced 2025-01-06 22:40:31 -05:00
332 lines
10 KiB
Go
332 lines
10 KiB
Go
// Copyright 2015 Light Code Labs, LLC
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// Package diagnostics implements the client for server-side diagnostics
|
|
// of the network. Functions in this package are synchronous and blocking
|
|
// unless otherwise specified. For convenience, most functions here do
|
|
// not return errors, but errors are logged to the standard logger.
|
|
//
|
|
// To use this package, first call Init(). You can then call any of the
|
|
// collection/aggregation functions. Call StartEmitting() when you are
|
|
// ready to begin sending diagnostic updates.
|
|
//
|
|
// When collecting metrics (functions like Set, AppendUnique, or Increment),
|
|
// it may be desirable and even recommended to invoke them in a new
|
|
// goroutine (use the go keyword) in case there is lock contention;
|
|
// they are thread-safe (unless noted), and you may not want them to
|
|
// block the main thread of execution. However, sometimes blocking
|
|
// may be necessary too; for example, adding startup metrics to the
|
|
// buffer before the call to StartEmitting().
|
|
//
|
|
// This package is designed to be as fast and space-efficient as reasonably
|
|
// possible, so that it does not disrupt the flow of execution.
|
|
package diagnostics
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// logEmit calls emit and then logs the error, if any.
|
|
// See docs for emit.
|
|
func logEmit(final bool) {
|
|
err := emit(final)
|
|
if err != nil {
|
|
log.Printf("[ERROR] Sending diagnostics: %v", err)
|
|
}
|
|
}
|
|
|
|
// emit sends an update to the diagnostics server.
|
|
// Set final to true if this is the last call to emit.
|
|
// If final is true, no future updates will be scheduled.
|
|
// Otherwise, the next update will be scheduled.
|
|
func emit(final bool) error {
|
|
if !enabled {
|
|
return fmt.Errorf("diagnostics not enabled")
|
|
}
|
|
|
|
// ensure only one update happens at a time;
|
|
// skip update if previous one still in progress
|
|
updateMu.Lock()
|
|
if updating {
|
|
updateMu.Unlock()
|
|
log.Println("[NOTICE] Skipping this diagnostics update because previous one is still working")
|
|
return nil
|
|
}
|
|
updating = true
|
|
updateMu.Unlock()
|
|
defer func() {
|
|
updateMu.Lock()
|
|
updating = false
|
|
updateMu.Unlock()
|
|
}()
|
|
|
|
// terminate any pending update if this is the last one
|
|
if final {
|
|
updateTimerMu.Lock()
|
|
updateTimer.Stop()
|
|
updateTimer = nil
|
|
updateTimerMu.Unlock()
|
|
}
|
|
|
|
payloadBytes, err := makePayloadAndResetBuffer()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// this will hold the server's reply
|
|
var reply Response
|
|
|
|
// transmit the payload - use a loop to retry in case of failure
|
|
for i := 0; i < 4; i++ {
|
|
if i > 0 && err != nil {
|
|
// don't hammer the server; first failure might have been
|
|
// a fluke, but back off more after that
|
|
log.Printf("[WARNING] Sending diagnostics (attempt %d): %v - backing off and retrying", i, err)
|
|
time.Sleep(time.Duration((i+1)*(i+1)*(i+1)) * time.Second)
|
|
}
|
|
|
|
// send it
|
|
var resp *http.Response
|
|
resp, err = httpClient.Post(endpoint+instanceUUID.String(), "application/json", bytes.NewReader(payloadBytes))
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// ensure we can read the response
|
|
if ct := resp.Header.Get("Content-Type"); (resp.StatusCode < 300 || resp.StatusCode >= 400) &&
|
|
!strings.Contains(ct, "json") {
|
|
err = fmt.Errorf("diagnostics server replied with unknown content-type: '%s' and HTTP %s", ct, resp.Status)
|
|
resp.Body.Close()
|
|
continue
|
|
}
|
|
|
|
// read the response body
|
|
err = json.NewDecoder(resp.Body).Decode(&reply)
|
|
resp.Body.Close() // close response body as soon as we're done with it
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// make sure we didn't send the update too soon; if so,
|
|
// just wait and try again -- this is a special case of
|
|
// error that we handle differently, as you can see
|
|
if resp.StatusCode == http.StatusTooManyRequests {
|
|
if reply.NextUpdate <= 0 {
|
|
raStr := resp.Header.Get("Retry-After")
|
|
if ra, err := strconv.Atoi(raStr); err == nil {
|
|
reply.NextUpdate = time.Duration(ra) * time.Second
|
|
}
|
|
}
|
|
if !final {
|
|
log.Printf("[NOTICE] Sending diagnostics: we were too early; waiting %s before trying again", reply.NextUpdate)
|
|
time.Sleep(reply.NextUpdate)
|
|
continue
|
|
}
|
|
} else if resp.StatusCode >= 400 {
|
|
err = fmt.Errorf("diagnostics server returned status code %d", resp.StatusCode)
|
|
continue
|
|
}
|
|
|
|
break
|
|
}
|
|
if err == nil && !final {
|
|
// (remember, if there was an error, we return it
|
|
// below, so it WILL get logged if it's supposed to)
|
|
log.Println("[INFO] Sending diagnostics: success")
|
|
}
|
|
|
|
// even if there was an error after all retries, we should
|
|
// schedule the next update using our default update
|
|
// interval because the server might be healthy later
|
|
|
|
// ensure we won't slam the diagnostics server
|
|
if reply.NextUpdate < 1*time.Second {
|
|
reply.NextUpdate = defaultUpdateInterval
|
|
}
|
|
|
|
// schedule the next update (if this wasn't the last one and
|
|
// if the remote server didn't tell us to stop sending)
|
|
if !final && !reply.Stop {
|
|
updateTimerMu.Lock()
|
|
updateTimer = time.AfterFunc(reply.NextUpdate, func() {
|
|
logEmit(false)
|
|
})
|
|
updateTimerMu.Unlock()
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// makePayloadAndResetBuffer prepares a payload
|
|
// by emptying the collection buffer. It returns
|
|
// the bytes of the payload to send to the server.
|
|
// Since the buffer is reset by this, if the
|
|
// resulting byte slice is lost, the payload is
|
|
// gone with it.
|
|
func makePayloadAndResetBuffer() ([]byte, error) {
|
|
bufCopy := resetBuffer()
|
|
|
|
// encode payload in preparation for transmission
|
|
payload := Payload{
|
|
InstanceID: instanceUUID.String(),
|
|
Timestamp: time.Now().UTC(),
|
|
Data: bufCopy,
|
|
}
|
|
return json.Marshal(payload)
|
|
}
|
|
|
|
// resetBuffer makes a local pointer to the buffer,
|
|
// then resets the buffer by assigning to be a newly-
|
|
// made value to clear it out, then sets the buffer
|
|
// item count to 0. It returns the copied pointer to
|
|
// the original map so the old buffer value can be
|
|
// used locally.
|
|
func resetBuffer() map[string]interface{} {
|
|
bufferMu.Lock()
|
|
bufCopy := buffer
|
|
buffer = make(map[string]interface{})
|
|
bufferItemCount = 0
|
|
bufferMu.Unlock()
|
|
return bufCopy
|
|
}
|
|
|
|
// Response contains the body of a response from the
|
|
// diagnostics server.
|
|
type Response struct {
|
|
// NextUpdate is how long to wait before the next update.
|
|
NextUpdate time.Duration `json:"next_update"`
|
|
|
|
// Stop instructs the diagnostics server to stop sending
|
|
// diagnostics. This would only be done under extenuating
|
|
// circumstances, but we are prepared for it nonetheless.
|
|
Stop bool `json:"stop,omitempty"`
|
|
|
|
// Error will be populated with an error message, if any.
|
|
// This field should be empty if the status code is < 400.
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// Payload is the data that gets sent to the diagnostics server.
|
|
type Payload struct {
|
|
// The universally unique ID of the instance
|
|
InstanceID string `json:"instance_id"`
|
|
|
|
// The UTC timestamp of the transmission
|
|
Timestamp time.Time `json:"timestamp"`
|
|
|
|
// The timestamp before which the next update is expected
|
|
// (NOT populated by client - the server fills this in
|
|
// before it stores the data)
|
|
ExpectNext time.Time `json:"expect_next,omitempty"`
|
|
|
|
// The metrics
|
|
Data map[string]interface{} `json:"data,omitempty"`
|
|
}
|
|
|
|
// Int returns the value of the data keyed by key
|
|
// if it is an integer; otherwise it returns 0.
|
|
func (p Payload) Int(key string) int {
|
|
val, _ := p.Data[key]
|
|
switch p.Data[key].(type) {
|
|
case int:
|
|
return val.(int)
|
|
case float64: // after JSON-decoding, int becomes float64...
|
|
return int(val.(float64))
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// countingSet implements a set that counts how many
|
|
// times a key is inserted. It marshals to JSON in a
|
|
// way such that keys are converted to values next
|
|
// to their associated counts.
|
|
type countingSet map[interface{}]int
|
|
|
|
// MarshalJSON implements the json.Marshaler interface.
|
|
// It converts the set to an array so that the values
|
|
// are JSON object values instead of keys, since keys
|
|
// are difficult to query in databases.
|
|
func (s countingSet) MarshalJSON() ([]byte, error) {
|
|
type Item struct {
|
|
Value interface{} `json:"value"`
|
|
Count int `json:"count"`
|
|
}
|
|
var list []Item
|
|
|
|
for k, v := range s {
|
|
list = append(list, Item{Value: k, Count: v})
|
|
}
|
|
|
|
return json.Marshal(list)
|
|
}
|
|
|
|
var (
|
|
// httpClient should be used for HTTP requests. It
|
|
// is configured with a timeout for reliability.
|
|
httpClient = http.Client{Timeout: 1 * time.Minute}
|
|
|
|
// buffer holds the data that we are building up to send.
|
|
buffer = make(map[string]interface{})
|
|
bufferItemCount = 0
|
|
bufferMu sync.RWMutex // protects both the buffer and its count
|
|
|
|
// updating is used to ensure only one
|
|
// update happens at a time.
|
|
updating bool
|
|
updateMu sync.Mutex
|
|
|
|
// updateTimer fires off the next update.
|
|
// If no update is scheduled, this is nil.
|
|
updateTimer *time.Timer
|
|
updateTimerMu sync.Mutex
|
|
|
|
// instanceUUID is the ID of the current instance.
|
|
// This MUST be set to emit diagnostics.
|
|
// This MUST NOT be openly exposed to clients, for privacy.
|
|
instanceUUID uuid.UUID
|
|
|
|
// enabled indicates whether the package has
|
|
// been initialized and can be actively used.
|
|
enabled bool
|
|
|
|
// maxBufferItems is the maximum number of items we'll allow
|
|
// in the buffer before we start dropping new ones, in a
|
|
// rough (simple) attempt to keep memory use under control.
|
|
maxBufferItems = 100000
|
|
)
|
|
|
|
const (
|
|
// endpoint is the base URL to remote diagnostics server;
|
|
// the instance ID will be appended to it.
|
|
endpoint = "https://diagnostics-staging.caddyserver.com/update/" // TODO: make configurable, "http://localhost:8085/update/"
|
|
|
|
// defaultUpdateInterval is how long to wait before emitting
|
|
// more diagnostic data if all retires fail. This value is
|
|
// only used if the client receives a nonsensical value, or
|
|
// doesn't send one at all, or if a connection can't be made,
|
|
// likely indicating a problem with the server. Thus, this
|
|
// value should be a long duration to help alleviate extra
|
|
// load on the server.
|
|
defaultUpdateInterval = 1 * time.Hour
|
|
)
|