mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-12-22 23:33:15 -05:00
a627b885c7
Support compression for Actions logs to save storage space and bandwidth. Inspired by https://github.com/go-gitea/gitea/issues/24256#issuecomment-1521153015 The biggest challenge is that the compression format should support [seekable](https://github.com/facebook/zstd/blob/dev/contrib/seekable_format/zstd_seekable_compression_format.md). So when users are viewing a part of the log lines, Gitea doesn't need to download the whole compressed file and decompress it. That means gzip cannot help here. And I did research, there aren't too many choices, like bgzip and xz, but I think zstd is the most popular one. It has an implementation in Golang with [zstd](https://github.com/klauspost/compress/tree/master/zstd) and [zstd-seekable-format-go](https://github.com/SaveTheRbtz/zstd-seekable-format-go), and what is better is that it has good compatibility: a seekable format zstd file can be read by a regular zstd reader. This PR introduces a new package `zstd` to combine and wrap the two packages, to provide a unified and easy-to-use API. And a new setting `LOG_COMPRESSION` is added to the config, although I don't see any reason why not to use compression, I think's it's a good idea to keep the default with `none` to be consistent with old versions. `LOG_COMPRESSION` takes effect for only new log files, it adds `.zst` as an extension to the file name, so Gitea can determine if it needs decompression according to the file name when reading. Old files will keep the format since it's not worth converting them, as they will be cleared after #31735. <img width="541" alt="image" src="https://github.com/user-attachments/assets/e9598764-a4e0-4b68-8c2b-f769265183c9"> (cherry picked from commit 33cc5837a655ad544b936d4d040ca36d74092588) Conflicts: assets/go-licenses.json go.mod go.sum resolved with make tidy
224 lines
6.3 KiB
Go
224 lines
6.3 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package actions
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/models/dbfs"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/storage"
|
|
"code.gitea.io/gitea/modules/zstd"
|
|
|
|
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
const (
|
|
MaxLineSize = 64 * 1024
|
|
DBFSPrefix = "actions_log/"
|
|
|
|
timeFormat = "2006-01-02T15:04:05.0000000Z07:00"
|
|
defaultBufSize = MaxLineSize
|
|
)
|
|
|
|
// WriteLogs appends logs to DBFS file for temporary storage.
|
|
// It doesn't respect the file format in the filename like ".zst", since it's difficult to reopen a closed compressed file and append new content.
|
|
// Why doesn't it store logs in object storage directly? Because it's not efficient to append content to object storage.
|
|
func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) {
|
|
flag := os.O_WRONLY
|
|
if offset == 0 {
|
|
// Create file only if offset is 0, or it could result in content holes if the file doesn't exist.
|
|
flag |= os.O_CREATE
|
|
}
|
|
name := DBFSPrefix + filename
|
|
f, err := dbfs.OpenFile(ctx, name, flag)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
stat, err := f.Stat()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dbfs Stat %q: %w", name, err)
|
|
}
|
|
if stat.Size() < offset {
|
|
// If the size is less than offset, refuse to write, or it could result in content holes.
|
|
// However, if the size is greater than offset, we can still write to overwrite the content.
|
|
return nil, fmt.Errorf("size of %q is less than offset", name)
|
|
}
|
|
|
|
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
|
return nil, fmt.Errorf("dbfs Seek %q: %w", name, err)
|
|
}
|
|
|
|
writer := bufio.NewWriterSize(f, defaultBufSize)
|
|
|
|
ns := make([]int, 0, len(rows))
|
|
for _, row := range rows {
|
|
n, err := writer.WriteString(FormatLog(row.Time.AsTime(), row.Content) + "\n")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ns = append(ns, n)
|
|
}
|
|
|
|
if err := writer.Flush(); err != nil {
|
|
return nil, err
|
|
}
|
|
return ns, nil
|
|
}
|
|
|
|
func ReadLogs(ctx context.Context, inStorage bool, filename string, offset, limit int64) ([]*runnerv1.LogRow, error) {
|
|
f, err := OpenLogs(ctx, inStorage, filename)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer f.Close()
|
|
|
|
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
|
return nil, fmt.Errorf("file seek: %w", err)
|
|
}
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
maxLineSize := len(timeFormat) + MaxLineSize + 1
|
|
scanner.Buffer(make([]byte, maxLineSize), maxLineSize)
|
|
|
|
var rows []*runnerv1.LogRow
|
|
for scanner.Scan() && (int64(len(rows)) < limit || limit < 0) {
|
|
t, c, err := ParseLog(scanner.Text())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse log %q: %w", scanner.Text(), err)
|
|
}
|
|
rows = append(rows, &runnerv1.LogRow{
|
|
Time: timestamppb.New(t),
|
|
Content: c,
|
|
})
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return nil, fmt.Errorf("ReadLogs scan: %w", err)
|
|
}
|
|
|
|
return rows, nil
|
|
}
|
|
|
|
const (
|
|
// logZstdBlockSize is the block size for zstd compression.
|
|
// 128KB leads the compression ratio to be close to the regular zstd compression.
|
|
// And it means each read from the underlying object storage will be at least 128KB*(compression ratio).
|
|
// The compression ratio is about 30% for text files, so the actual read size is about 38KB, which should be acceptable.
|
|
logZstdBlockSize = 128 * 1024 // 128KB
|
|
)
|
|
|
|
// TransferLogs transfers logs from DBFS to object storage.
|
|
// It happens when the file is complete and no more logs will be appended.
|
|
// It respects the file format in the filename like ".zst", and compresses the content if needed.
|
|
func TransferLogs(ctx context.Context, filename string) (func(), error) {
|
|
name := DBFSPrefix + filename
|
|
remove := func() {
|
|
if err := dbfs.Remove(ctx, name); err != nil {
|
|
log.Warn("dbfs remove %q: %v", name, err)
|
|
}
|
|
}
|
|
f, err := dbfs.Open(ctx, name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dbfs open %q: %w", name, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
var reader io.Reader = f
|
|
if strings.HasSuffix(filename, ".zst") {
|
|
r, w := io.Pipe()
|
|
reader = r
|
|
zstdWriter, err := zstd.NewSeekableWriter(w, logZstdBlockSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("zstd NewSeekableWriter: %w", err)
|
|
}
|
|
go func() {
|
|
defer func() {
|
|
_ = w.CloseWithError(zstdWriter.Close())
|
|
}()
|
|
if _, err := io.Copy(zstdWriter, f); err != nil {
|
|
_ = w.CloseWithError(err)
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
|
|
if _, err := storage.Actions.Save(filename, reader, -1); err != nil {
|
|
return nil, fmt.Errorf("storage save %q: %w", filename, err)
|
|
}
|
|
return remove, nil
|
|
}
|
|
|
|
func RemoveLogs(ctx context.Context, inStorage bool, filename string) error {
|
|
if !inStorage {
|
|
name := DBFSPrefix + filename
|
|
err := dbfs.Remove(ctx, name)
|
|
if err != nil {
|
|
return fmt.Errorf("dbfs remove %q: %w", name, err)
|
|
}
|
|
return nil
|
|
}
|
|
err := storage.Actions.Delete(filename)
|
|
if err != nil {
|
|
return fmt.Errorf("storage delete %q: %w", filename, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func OpenLogs(ctx context.Context, inStorage bool, filename string) (io.ReadSeekCloser, error) {
|
|
if !inStorage {
|
|
name := DBFSPrefix + filename
|
|
f, err := dbfs.Open(ctx, name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dbfs open %q: %w", name, err)
|
|
}
|
|
return f, nil
|
|
}
|
|
|
|
f, err := storage.Actions.Open(filename)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage open %q: %w", filename, err)
|
|
}
|
|
|
|
var reader io.ReadSeekCloser = f
|
|
if strings.HasSuffix(filename, ".zst") {
|
|
r, err := zstd.NewSeekableReader(f)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("zstd NewSeekableReader: %w", err)
|
|
}
|
|
reader = r
|
|
}
|
|
|
|
return reader, nil
|
|
}
|
|
|
|
func FormatLog(timestamp time.Time, content string) string {
|
|
// Content shouldn't contain new line, it will break log indexes, other control chars are safe.
|
|
content = strings.ReplaceAll(content, "\n", `\n`)
|
|
if len(content) > MaxLineSize {
|
|
content = content[:MaxLineSize]
|
|
}
|
|
return fmt.Sprintf("%s %s", timestamp.UTC().Format(timeFormat), content)
|
|
}
|
|
|
|
func ParseLog(in string) (time.Time, string, error) {
|
|
index := strings.IndexRune(in, ' ')
|
|
if index < 0 {
|
|
return time.Time{}, "", fmt.Errorf("invalid log: %q", in)
|
|
}
|
|
timestamp, err := time.Parse(timeFormat, in[:index])
|
|
if err != nil {
|
|
return time.Time{}, "", err
|
|
}
|
|
return timestamp, in[index+1:], nil
|
|
}
|