0
Fork 0
mirror of https://codeberg.org/forgejo/forgejo.git synced 2024-12-22 15:23:14 -05:00

Fix artifact v4 upload above 8MB (#31664)

Multiple chunks are uploaded with type "block" without using
"appendBlock" and eventually out of order for bigger uploads.
8MB seems to be the chunk size

This change parses the blockList uploaded after all blocks to get the
final artifact size and order them correctly before calculating the
sha256 checksum over all blocks

Fixes #31354

(cherry picked from commit b594cec2bda6f861effedb2e8e0a7ebba191c0e9)

Conflicts:
	routers/api/actions/artifactsv4.go
  conflict because of Refactor AppURL usage (#30885) 67c1a07285008cc00036a87cef966c3bd519a50c
    that was not cherry-picked in Forgejo
    the resolution consist of removing the extra ctx argument
This commit is contained in:
ChristopherHX 2024-09-22 13:01:09 +02:00 committed by Earl Warren
parent 76a0dcd372
commit 8f0a05a7e4
No known key found for this signature in database
GPG key ID: 0579CB2928A78A00
3 changed files with 285 additions and 39 deletions

View file

@ -123,6 +123,54 @@ func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chun
return chunksMap, nil
}
func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
storageDir := fmt.Sprintf("tmpv4%d", runID)
var chunks []*chunkFileItem
chunkMap := map[string]*chunkFileItem{}
dummy := &chunkFileItem{}
for _, name := range blist.Latest {
chunkMap[name] = dummy
}
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
baseName := filepath.Base(fpath)
if !strings.HasPrefix(baseName, "block-") {
return nil
}
// when read chunks from storage, it only contains storage dir and basename,
// no matter the subdirectory setting in storage config
item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
var size int64
var b64chunkName string
if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil {
return fmt.Errorf("parse content range error: %v", err)
}
rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
if err != nil {
return fmt.Errorf("failed to parse chunkName: %v", err)
}
chunkName := string(rchunkName)
item.End = item.Start + size - 1
if _, ok := chunkMap[chunkName]; ok {
chunkMap[chunkName] = &item
}
return nil
}); err != nil {
return nil, err
}
for i, name := range blist.Latest {
chunk, ok := chunkMap[name]
if !ok || chunk.Path == "" {
return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
}
chunks = append(chunks, chunk)
if i > 0 {
chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
chunk.End += chunk.Start
}
}
return chunks, nil
}
func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
// read all db artifacts by name
artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
@ -230,7 +278,7 @@ func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st st
rawChecksum := hash.Sum(nil)
actualChecksum := hex.EncodeToString(rawChecksum)
if !strings.HasSuffix(checksum, actualChecksum) {
return fmt.Errorf("update artifact error checksum is invalid")
return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
}
}

View file

@ -24,8 +24,15 @@ package actions
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=block
// 1.3. Continue Upload Zip Content to Blobstorage (unauthenticated request), repeat until everything is uploaded
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=appendBlock
// 1.4. Unknown xml payload to Blobstorage (unauthenticated request), ignored for now
// 1.4. BlockList xml payload to Blobstorage (unauthenticated request)
// Files of about 800MB are parallel in parallel and / or out of order, this file is needed to enshure the correct order
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=blockList
// Request
// <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
// <BlockList>
// <Latest>blockId1</Latest>
// <Latest>blockId2</Latest>
// </BlockList>
// 1.5. FinalizeArtifact
// Post: /twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact
// Request
@ -82,6 +89,7 @@ import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/xml"
"fmt"
"io"
"net/http"
@ -153,31 +161,34 @@ func ArtifactsV4Routes(prefix string) *web.Route {
return m
}
func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID int64) []byte {
func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID, artifactID int64) []byte {
mac := hmac.New(sha256.New, setting.GetGeneralTokenSigningSecret())
mac.Write([]byte(endp))
mac.Write([]byte(expires))
mac.Write([]byte(artifactName))
mac.Write([]byte(fmt.Sprint(taskID)))
mac.Write([]byte(fmt.Sprint(artifactID)))
return mac.Sum(nil)
}
func (r artifactV4Routes) buildArtifactURL(endp, artifactName string, taskID int64) string {
func (r artifactV4Routes) buildArtifactURL(endp, artifactName string, taskID, artifactID int64) string {
expires := time.Now().Add(60 * time.Minute).Format("2006-01-02 15:04:05.999999999 -0700 MST")
uploadURL := strings.TrimSuffix(setting.AppURL, "/") + strings.TrimSuffix(r.prefix, "/") +
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + fmt.Sprint(taskID)
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID, artifactID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + fmt.Sprint(taskID) + "&artifactID=" + fmt.Sprint(artifactID)
return uploadURL
}
func (r artifactV4Routes) verifySignature(ctx *ArtifactContext, endp string) (*actions.ActionTask, string, bool) {
rawTaskID := ctx.Req.URL.Query().Get("taskID")
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
sig := ctx.Req.URL.Query().Get("sig")
expires := ctx.Req.URL.Query().Get("expires")
artifactName := ctx.Req.URL.Query().Get("artifactName")
dsig, _ := base64.URLEncoding.DecodeString(sig)
taskID, _ := strconv.ParseInt(rawTaskID, 10, 64)
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
expecedsig := r.buildSignature(endp, expires, artifactName, taskID)
expecedsig := r.buildSignature(endp, expires, artifactName, taskID, artifactID)
if !hmac.Equal(dsig, expecedsig) {
log.Error("Error unauthorized")
ctx.Error(http.StatusUnauthorized, "Error unauthorized")
@ -272,6 +283,8 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
return
}
artifact.ContentEncoding = ArtifactV4ContentEncoding
artifact.FileSize = 0
artifact.FileCompressedSize = 0
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
log.Error("Error UpdateArtifactByID: %v", err)
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
@ -280,7 +293,7 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
respData := CreateArtifactResponse{
Ok: true,
SignedUploadUrl: r.buildArtifactURL("UploadArtifact", artifactName, ctx.ActionTask.ID),
SignedUploadUrl: r.buildArtifactURL("UploadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID),
}
r.sendProtbufBody(ctx, &respData)
}
@ -306,38 +319,77 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
comp := ctx.Req.URL.Query().Get("comp")
switch comp {
case "block", "appendBlock":
// get artifact by name
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
if err != nil {
log.Error("Error artifact not found: %v", err)
ctx.Error(http.StatusNotFound, "Error artifact not found")
return
}
blockid := ctx.Req.URL.Query().Get("blockid")
if blockid == "" {
// get artifact by name
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
if err != nil {
log.Error("Error artifact not found: %v", err)
ctx.Error(http.StatusNotFound, "Error artifact not found")
return
}
if comp == "block" {
artifact.FileSize = 0
artifact.FileCompressedSize = 0
_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
}
artifact.FileCompressedSize += ctx.Req.ContentLength
artifact.FileSize += ctx.Req.ContentLength
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
log.Error("Error UpdateArtifactByID: %v", err)
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
return
}
} else {
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/block-%d-%d-%s", task.Job.RunID, task.Job.RunID, ctx.Req.ContentLength, base64.URLEncoding.EncodeToString([]byte(blockid))), ctx.Req.Body, -1)
if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
}
}
_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
ctx.JSON(http.StatusCreated, "appended")
case "blocklist":
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/%d-%d-blocklist", task.Job.RunID, task.Job.RunID, artifactID), ctx.Req.Body, -1)
if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
}
artifact.FileCompressedSize += ctx.Req.ContentLength
artifact.FileSize += ctx.Req.ContentLength
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
log.Error("Error UpdateArtifactByID: %v", err)
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
return
}
ctx.JSON(http.StatusCreated, "appended")
case "blocklist":
ctx.JSON(http.StatusCreated, "created")
}
}
type BlockList struct {
Latest []string `xml:"Latest"`
}
type Latest struct {
Value string `xml:",chardata"`
}
func (r *artifactV4Routes) readBlockList(runID, artifactID int64) (*BlockList, error) {
blockListName := fmt.Sprintf("tmpv4%d/%d-%d-blocklist", runID, runID, artifactID)
s, err := r.fs.Open(blockListName)
if err != nil {
return nil, err
}
xdec := xml.NewDecoder(s)
blockList := &BlockList{}
err = xdec.Decode(blockList)
delerr := r.fs.Delete(blockListName)
if delerr != nil {
log.Warn("Failed to delete blockList %s: %v", blockListName, delerr)
}
return blockList, err
}
func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
var req FinalizeArtifactRequest
@ -356,18 +408,34 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
ctx.Error(http.StatusNotFound, "Error artifact not found")
return
}
chunkMap, err := listChunksByRunID(r.fs, runID)
var chunks []*chunkFileItem
blockList, err := r.readBlockList(runID, artifact.ID)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
chunks, ok := chunkMap[artifact.ID]
if !ok {
log.Error("Error merge chunks")
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
log.Warn("Failed to read BlockList, fallback to old behavior: %v", err)
chunkMap, err := listChunksByRunID(r.fs, runID)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
chunks, ok = chunkMap[artifact.ID]
if !ok {
log.Error("Error merge chunks")
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
} else {
chunks, err = listChunksByRunIDV4(r.fs, runID, artifact.ID, blockList)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
artifact.FileSize = chunks[len(chunks)-1].End + 1
artifact.FileCompressedSize = chunks[len(chunks)-1].End + 1
}
checksum := ""
if req.Hash != nil {
checksum = req.Hash.Value
@ -468,7 +536,7 @@ func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) {
}
}
if respData.SignedUrl == "" {
respData.SignedUrl = r.buildArtifactURL("DownloadArtifact", artifactName, ctx.ActionTask.ID)
respData.SignedUrl = r.buildArtifactURL("DownloadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID)
}
r.sendProtbufBody(ctx, &respData)
}

View file

@ -7,12 +7,14 @@ import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/xml"
"io"
"net/http"
"strings"
"testing"
"time"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/routers/api/actions"
actions_service "code.gitea.io/gitea/services/actions"
"code.gitea.io/gitea/tests"
@ -175,6 +177,134 @@ func TestActionsArtifactV4UploadSingleFileWithRetentionDays(t *testing.T) {
assert.True(t, finalizeResp.Ok)
}
func TestActionsArtifactV4UploadSingleFileWithPotentialHarmfulBlockID(t *testing.T) {
defer tests.PrepareTestEnv(t)()
token, err := actions_service.CreateAuthorizationToken(48, 792, 193)
assert.NoError(t, err)
// acquire artifact upload url
req := NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact", toProtoJSON(&actions.CreateArtifactRequest{
Version: 4,
Name: "artifactWithPotentialHarmfulBlockID",
WorkflowRunBackendId: "792",
WorkflowJobRunBackendId: "193",
})).AddTokenAuth(token)
resp := MakeRequest(t, req, http.StatusOK)
var uploadResp actions.CreateArtifactResponse
protojson.Unmarshal(resp.Body.Bytes(), &uploadResp)
assert.True(t, uploadResp.Ok)
assert.Contains(t, uploadResp.SignedUploadUrl, "/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact")
// get upload urls
idx := strings.Index(uploadResp.SignedUploadUrl, "/twirp/")
url := uploadResp.SignedUploadUrl[idx:] + "&comp=block&blockid=%2f..%2fmyfile"
blockListURL := uploadResp.SignedUploadUrl[idx:] + "&comp=blocklist"
// upload artifact chunk
body := strings.Repeat("A", 1024)
req = NewRequestWithBody(t, "PUT", url, strings.NewReader(body))
MakeRequest(t, req, http.StatusCreated)
// verify that the exploit didn't work
_, err = storage.Actions.Stat("myfile")
assert.Error(t, err)
// upload artifact blockList
blockList := &actions.BlockList{
Latest: []string{
"/../myfile",
},
}
rawBlockList, err := xml.Marshal(blockList)
assert.NoError(t, err)
req = NewRequestWithBody(t, "PUT", blockListURL, bytes.NewReader(rawBlockList))
MakeRequest(t, req, http.StatusCreated)
t.Logf("Create artifact confirm")
sha := sha256.Sum256([]byte(body))
// confirm artifact upload
req = NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact", toProtoJSON(&actions.FinalizeArtifactRequest{
Name: "artifactWithPotentialHarmfulBlockID",
Size: 1024,
Hash: wrapperspb.String("sha256:" + hex.EncodeToString(sha[:])),
WorkflowRunBackendId: "792",
WorkflowJobRunBackendId: "193",
})).
AddTokenAuth(token)
resp = MakeRequest(t, req, http.StatusOK)
var finalizeResp actions.FinalizeArtifactResponse
protojson.Unmarshal(resp.Body.Bytes(), &finalizeResp)
assert.True(t, finalizeResp.Ok)
}
func TestActionsArtifactV4UploadSingleFileWithChunksOutOfOrder(t *testing.T) {
defer tests.PrepareTestEnv(t)()
token, err := actions_service.CreateAuthorizationToken(48, 792, 193)
assert.NoError(t, err)
// acquire artifact upload url
req := NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/CreateArtifact", toProtoJSON(&actions.CreateArtifactRequest{
Version: 4,
Name: "artifactWithChunksOutOfOrder",
WorkflowRunBackendId: "792",
WorkflowJobRunBackendId: "193",
})).AddTokenAuth(token)
resp := MakeRequest(t, req, http.StatusOK)
var uploadResp actions.CreateArtifactResponse
protojson.Unmarshal(resp.Body.Bytes(), &uploadResp)
assert.True(t, uploadResp.Ok)
assert.Contains(t, uploadResp.SignedUploadUrl, "/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact")
// get upload urls
idx := strings.Index(uploadResp.SignedUploadUrl, "/twirp/")
block1URL := uploadResp.SignedUploadUrl[idx:] + "&comp=block&blockid=block1"
block2URL := uploadResp.SignedUploadUrl[idx:] + "&comp=block&blockid=block2"
blockListURL := uploadResp.SignedUploadUrl[idx:] + "&comp=blocklist"
// upload artifact chunks
bodyb := strings.Repeat("B", 1024)
req = NewRequestWithBody(t, "PUT", block2URL, strings.NewReader(bodyb))
MakeRequest(t, req, http.StatusCreated)
bodya := strings.Repeat("A", 1024)
req = NewRequestWithBody(t, "PUT", block1URL, strings.NewReader(bodya))
MakeRequest(t, req, http.StatusCreated)
// upload artifact blockList
blockList := &actions.BlockList{
Latest: []string{
"block1",
"block2",
},
}
rawBlockList, err := xml.Marshal(blockList)
assert.NoError(t, err)
req = NewRequestWithBody(t, "PUT", blockListURL, bytes.NewReader(rawBlockList))
MakeRequest(t, req, http.StatusCreated)
t.Logf("Create artifact confirm")
sha := sha256.Sum256([]byte(bodya + bodyb))
// confirm artifact upload
req = NewRequestWithBody(t, "POST", "/twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact", toProtoJSON(&actions.FinalizeArtifactRequest{
Name: "artifactWithChunksOutOfOrder",
Size: 2048,
Hash: wrapperspb.String("sha256:" + hex.EncodeToString(sha[:])),
WorkflowRunBackendId: "792",
WorkflowJobRunBackendId: "193",
})).
AddTokenAuth(token)
resp = MakeRequest(t, req, http.StatusOK)
var finalizeResp actions.FinalizeArtifactResponse
protojson.Unmarshal(resp.Body.Bytes(), &finalizeResp)
assert.True(t, finalizeResp.Ok)
}
func TestActionsArtifactV4DownloadSingle(t *testing.T) {
defer tests.PrepareTestEnv(t)()