From e0714fdd53ff1db4708c8cf8f2f0d55d7ff151cd Mon Sep 17 00:00:00 2001
From: HFO4 <912394456@qq.com>
Date: Thu, 3 Mar 2022 19:17:25 +0800
Subject: [PATCH] Feat: process upload callback sent from slave node

---
 middleware/auth.go                      | 287 ++++++++++++------------
 models/file.go                          |   3 +-
 pkg/cluster/slave.go                    |   9 +-
 pkg/filesystem/driver/remote/handler.go |  22 +-
 pkg/filesystem/filesystem.go            |   2 +-
 pkg/filesystem/hooks.go                 |  17 +-
 pkg/filesystem/upload.go                |  10 +-
 pkg/serializer/upload.go                |   4 +-
 routers/router.go                       |   3 +-
 service/callback/upload.go              |  97 ++++----
 service/explorer/upload.go              |   3 +-
 11 files changed, 222 insertions(+), 235 deletions(-)

diff --git a/middleware/auth.go b/middleware/auth.go
index 3e7cbe7..83f972b 100644
--- a/middleware/auth.go
+++ b/middleware/auth.go
@@ -1,24 +1,19 @@
 package middleware
 
 import (
-	"bytes"
-	"context"
-	"crypto/md5"
-	"fmt"
-	"io/ioutil"
+	"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
 	"net/http"
 
 	model "github.com/cloudreve/Cloudreve/v3/models"
 	"github.com/cloudreve/Cloudreve/v3/pkg/auth"
 	"github.com/cloudreve/Cloudreve/v3/pkg/cache"
-	"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/onedrive"
-	"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/oss"
-	"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/upyun"
 	"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
-	"github.com/cloudreve/Cloudreve/v3/pkg/util"
 	"github.com/gin-contrib/sessions"
 	"github.com/gin-gonic/gin"
-	"github.com/qiniu/api.v7/v7/auth/qbox"
+)
+
+const (
+	CallbackFailedStatusCode = http.StatusUnauthorized
 )
 
 // SignRequired 验证请求签名
@@ -117,48 +112,60 @@ func WebDAVAuth() gin.HandlerFunc {
 	}
 }
 
+// 对上传会话进行验证
+func UseUploadSession(policyType string) gin.HandlerFunc {
+	return func(c *gin.Context) {
+		// 验证key并查找用户
+		resp := uploadCallbackCheck(c, policyType)
+		if resp.Code != 0 {
+			c.JSON(CallbackFailedStatusCode, resp)
+			c.Abort()
+			return
+		}
+
+		c.Next()
+	}
+}
+
 // uploadCallbackCheck 对上传回调请求的 callback key 进行验证,如果成功则返回上传用户
-func uploadCallbackCheck(c *gin.Context) (serializer.Response, *model.User) {
+func uploadCallbackCheck(c *gin.Context, policyType string) serializer.Response {
 	// 验证 Callback Key
-	callbackKey := c.Param("key")
-	if callbackKey == "" {
-		return serializer.ParamErr("Callback Key 不能为空", nil), nil
+	sessionID := c.Param("sessionID")
+	if sessionID == "" {
+		return serializer.ParamErr("Session ID 不能为空", nil)
 	}
-	callbackSessionRaw, exist := cache.Get("callback_" + callbackKey)
+
+	callbackSessionRaw, exist := cache.Get(filesystem.UploadSessionCachePrefix + sessionID)
 	if !exist {
-		return serializer.ParamErr("回调会话不存在或已过期", nil), nil
+		return serializer.ParamErr("上传会话不存在或已过期", nil)
 	}
+
 	callbackSession := callbackSessionRaw.(serializer.UploadSession)
-	c.Set("callbackSession", &callbackSession)
+	c.Set(filesystem.UploadSessionCtx, &callbackSession)
+	if callbackSession.Policy.Type != policyType {
+		return serializer.Err(serializer.CodePolicyNotAllowed, "Policy not supported", nil)
+	}
 
 	// 清理回调会话
-	_ = cache.Deletes([]string{callbackKey}, "callback_")
+	_ = cache.Deletes([]string{sessionID}, filesystem.UploadSessionCachePrefix)
 
 	// 查找用户
 	user, err := model.GetActiveUserByID(callbackSession.UID)
 	if err != nil {
-		return serializer.Err(serializer.CodeCheckLogin, "找不到用户", err), nil
+		return serializer.Err(serializer.CodeCheckLogin, "找不到用户", err)
 	}
-	c.Set("user", &user)
-
-	return serializer.Response{}, &user
+	c.Set(filesystem.UserCtx, &user)
+	return serializer.Response{}
 }
 
 // RemoteCallbackAuth 远程回调签名验证
 func RemoteCallbackAuth() gin.HandlerFunc {
 	return func(c *gin.Context) {
-		// 验证key并查找用户
-		resp, user := uploadCallbackCheck(c)
-		if resp.Code != 0 {
-			c.JSON(200, resp)
-			c.Abort()
-			return
-		}
-
 		// 验证签名
-		authInstance := auth.HMACAuth{SecretKey: []byte(user.Policy.SecretKey)}
+		session := c.MustGet(filesystem.UploadSessionCtx).(*serializer.UploadSession)
+		authInstance := auth.HMACAuth{SecretKey: []byte(session.Policy.SecretKey)}
 		if err := auth.CheckRequest(authInstance, c.Request); err != nil {
-			c.JSON(200, serializer.Err(serializer.CodeCheckLogin, err.Error(), err))
+			c.JSON(CallbackFailedStatusCode, serializer.Err(serializer.CodeCredentialInvalid, err.Error(), err))
 			c.Abort()
 			return
 		}
@@ -171,28 +178,28 @@ func RemoteCallbackAuth() gin.HandlerFunc {
 // QiniuCallbackAuth 七牛回调签名验证
 func QiniuCallbackAuth() gin.HandlerFunc {
 	return func(c *gin.Context) {
-		// 验证key并查找用户
-		resp, user := uploadCallbackCheck(c)
-		if resp.Code != 0 {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
-			c.Abort()
-			return
-		}
-
-		// 验证回调是否来自qiniu
-		mac := qbox.NewMac(user.Policy.AccessKey, user.Policy.SecretKey)
-		ok, err := mac.VerifyCallback(c.Request)
-		if err != nil {
-			util.Log().Debug("无法验证回调请求,%s", err)
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "无法验证回调请求"})
-			c.Abort()
-			return
-		}
-		if !ok {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "回调签名无效"})
-			c.Abort()
-			return
-		}
+		//// 验证key并查找用户
+		//resp, user := uploadCallbackCheck(c)
+		//if resp.Code != 0 {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
+		//	c.Abort()
+		//	return
+		//}
+		//
+		//// 验证回调是否来自qiniu
+		//mac := qbox.NewMac(user.Policy.AccessKey, user.Policy.SecretKey)
+		//ok, err := mac.VerifyCallback(c.Request)
+		//if err != nil {
+		//	util.Log().Debug("无法验证回调请求,%s", err)
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "无法验证回调请求"})
+		//	c.Abort()
+		//	return
+		//}
+		//if !ok {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "回调签名无效"})
+		//	c.Abort()
+		//	return
+		//}
 
 		c.Next()
 	}
@@ -201,21 +208,21 @@ func QiniuCallbackAuth() gin.HandlerFunc {
 // OSSCallbackAuth 阿里云OSS回调签名验证
 func OSSCallbackAuth() gin.HandlerFunc {
 	return func(c *gin.Context) {
-		// 验证key并查找用户
-		resp, _ := uploadCallbackCheck(c)
-		if resp.Code != 0 {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
-			c.Abort()
-			return
-		}
-
-		err := oss.VerifyCallbackSignature(c.Request)
-		if err != nil {
-			util.Log().Debug("回调签名验证失败,%s", err)
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "回调签名验证失败"})
-			c.Abort()
-			return
-		}
+		//// 验证key并查找用户
+		//resp, _ := uploadCallbackCheck(c)
+		//if resp.Code != 0 {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
+		//	c.Abort()
+		//	return
+		//}
+		//
+		//err := oss.VerifyCallbackSignature(c.Request)
+		//if err != nil {
+		//	util.Log().Debug("回调签名验证失败,%s", err)
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "回调签名验证失败"})
+		//	c.Abort()
+		//	return
+		//}
 
 		c.Next()
 	}
@@ -224,53 +231,53 @@ func OSSCallbackAuth() gin.HandlerFunc {
 // UpyunCallbackAuth 又拍云回调签名验证
 func UpyunCallbackAuth() gin.HandlerFunc {
 	return func(c *gin.Context) {
-		// 验证key并查找用户
-		resp, user := uploadCallbackCheck(c)
-		if resp.Code != 0 {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
-			c.Abort()
-			return
-		}
-
-		// 获取请求正文
-		body, err := ioutil.ReadAll(c.Request.Body)
-		c.Request.Body.Close()
-		if err != nil {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: err.Error()})
-			c.Abort()
-			return
-		}
-
-		c.Request.Body = ioutil.NopCloser(bytes.NewReader(body))
-
-		// 准备验证Upyun回调签名
-		handler := upyun.Driver{Policy: &user.Policy}
-		contentMD5 := c.Request.Header.Get("Content-Md5")
-		date := c.Request.Header.Get("Date")
-		actualSignature := c.Request.Header.Get("Authorization")
-
-		// 计算正文MD5
-		actualContentMD5 := fmt.Sprintf("%x", md5.Sum(body))
-		if actualContentMD5 != contentMD5 {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "MD5不一致"})
-			c.Abort()
-			return
-		}
-
-		// 计算理论签名
-		signature := handler.Sign(context.Background(), []string{
-			"POST",
-			c.Request.URL.Path,
-			date,
-			contentMD5,
-		})
-
-		// 对比签名
-		if signature != actualSignature {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "鉴权失败"})
-			c.Abort()
-			return
-		}
+		//// 验证key并查找用户
+		//resp, user := uploadCallbackCheck(c)
+		//if resp.Code != 0 {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
+		//	c.Abort()
+		//	return
+		//}
+		//
+		//// 获取请求正文
+		//body, err := ioutil.ReadAll(c.Request.Body)
+		//c.Request.Body.Close()
+		//if err != nil {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: err.Error()})
+		//	c.Abort()
+		//	return
+		//}
+		//
+		//c.Request.Body = ioutil.NopCloser(bytes.NewReader(body))
+		//
+		//// 准备验证Upyun回调签名
+		//handler := upyun.Driver{Policy: &user.Policy}
+		//contentMD5 := c.Request.Header.Get("Content-Md5")
+		//date := c.Request.Header.Get("Date")
+		//actualSignature := c.Request.Header.Get("Authorization")
+		//
+		//// 计算正文MD5
+		//actualContentMD5 := fmt.Sprintf("%x", md5.Sum(body))
+		//if actualContentMD5 != contentMD5 {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "MD5不一致"})
+		//	c.Abort()
+		//	return
+		//}
+		//
+		//// 计算理论签名
+		//signature := handler.Sign(context.Background(), []string{
+		//	"POST",
+		//	c.Request.URL.Path,
+		//	date,
+		//	contentMD5,
+		//})
+		//
+		//// 对比签名
+		//if signature != actualSignature {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: "鉴权失败"})
+		//	c.Abort()
+		//	return
+		//}
 
 		c.Next()
 	}
@@ -280,16 +287,16 @@ func UpyunCallbackAuth() gin.HandlerFunc {
 // TODO 解耦
 func OneDriveCallbackAuth() gin.HandlerFunc {
 	return func(c *gin.Context) {
-		// 验证key并查找用户
-		resp, _ := uploadCallbackCheck(c)
-		if resp.Code != 0 {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
-			c.Abort()
-			return
-		}
-
-		// 发送回调结束信号
-		onedrive.FinishCallback(c.Param("key"))
+		//// 验证key并查找用户
+		//resp, _ := uploadCallbackCheck(c)
+		//if resp.Code != 0 {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
+		//	c.Abort()
+		//	return
+		//}
+		//
+		//// 发送回调结束信号
+		//onedrive.FinishCallback(c.Param("key"))
 
 		c.Next()
 	}
@@ -299,13 +306,13 @@ func OneDriveCallbackAuth() gin.HandlerFunc {
 // TODO 解耦 测试
 func COSCallbackAuth() gin.HandlerFunc {
 	return func(c *gin.Context) {
-		// 验证key并查找用户
-		resp, _ := uploadCallbackCheck(c)
-		if resp.Code != 0 {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
-			c.Abort()
-			return
-		}
+		//// 验证key并查找用户
+		//resp, _ := uploadCallbackCheck(c)
+		//if resp.Code != 0 {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
+		//	c.Abort()
+		//	return
+		//}
 
 		c.Next()
 	}
@@ -314,13 +321,13 @@ func COSCallbackAuth() gin.HandlerFunc {
 // S3CallbackAuth Amazon S3回调签名验证
 func S3CallbackAuth() gin.HandlerFunc {
 	return func(c *gin.Context) {
-		// 验证key并查找用户
-		resp, _ := uploadCallbackCheck(c)
-		if resp.Code != 0 {
-			c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
-			c.Abort()
-			return
-		}
+		//// 验证key并查找用户
+		//resp, _ := uploadCallbackCheck(c)
+		//if resp.Code != 0 {
+		//	c.JSON(401, serializer.GeneralUploadCallbackFailed{Error: resp.Msg})
+		//	c.Abort()
+		//	return
+		//}
 
 		c.Next()
 	}
diff --git a/models/file.go b/models/file.go
index 34918bb..3eb81e8 100644
--- a/models/file.go
+++ b/models/file.go
@@ -299,7 +299,7 @@ func (file *File) UpdateSourceName(value string) error {
 	return DB.Model(&file).Set("gorm:association_autoupdate", false).Update("source_name", value).Error
 }
 
-func (file *File) PopChunkToFile(lastModified *time.Time) error {
+func (file *File) PopChunkToFile(lastModified *time.Time, picInfo string) error {
 	file.UploadSessionID = nil
 	if lastModified != nil {
 		file.UpdatedAt = *lastModified
@@ -308,6 +308,7 @@ func (file *File) PopChunkToFile(lastModified *time.Time) error {
 	return DB.Model(file).UpdateColumns(map[string]interface{}{
 		"upload_session_id": file.UploadSessionID,
 		"updated_at":        file.UpdatedAt,
+		"pic_info":          picInfo,
 	}).Error
 }
 
diff --git a/pkg/cluster/slave.go b/pkg/cluster/slave.go
index 7f5bd20..cb059c6 100644
--- a/pkg/cluster/slave.go
+++ b/pkg/cluster/slave.go
@@ -4,6 +4,7 @@ import (
 	"bytes"
 	"encoding/json"
 	"errors"
+	"fmt"
 	model "github.com/cloudreve/Cloudreve/v3/models"
 	"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
 	"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
@@ -437,14 +438,12 @@ func RemoteCallback(url string, body serializer.UploadCallback) error {
 	}
 
 	// 解析回调服务端响应
-	resp = resp.CheckHTTPResponse(200)
-	if resp.Err != nil {
-		return serializer.NewError(serializer.CodeCallbackError, "主机服务器返回异常响应", resp.Err)
-	}
 	response, err := resp.DecodeResponse()
 	if err != nil {
-		return serializer.NewError(serializer.CodeCallbackError, "从机无法解析主机返回的响应", err)
+		msg := fmt.Sprintf("从机无法解析主机返回的响应 (StatusCode=%d)", resp.Response.StatusCode)
+		return serializer.NewError(serializer.CodeCallbackError, msg, err)
 	}
+
 	if response.Code != 0 {
 		return serializer.NewError(response.Code, response.Msg, errors.New(response.Error))
 	}
diff --git a/pkg/filesystem/driver/remote/handler.go b/pkg/filesystem/driver/remote/handler.go
index ce869ac..00909c3 100644
--- a/pkg/filesystem/driver/remote/handler.go
+++ b/pkg/filesystem/driver/remote/handler.go
@@ -45,7 +45,7 @@ func NewDriver(policy *model.Policy) (*Driver, error) {
 }
 
 // List 列取文件
-func (handler Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
+func (handler *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
 	var res []response.Object
 
 	reqBody := serializer.ListRequest{
@@ -87,7 +87,7 @@ func (handler Driver) List(ctx context.Context, path string, recursive bool) ([]
 }
 
 // getAPIUrl 获取接口请求地址
-func (handler Driver) getAPIUrl(scope string, routes ...string) string {
+func (handler *Driver) getAPIUrl(scope string, routes ...string) string {
 	serverURL, err := url.Parse(handler.Policy.Server)
 	if err != nil {
 		return ""
@@ -113,7 +113,7 @@ func (handler Driver) getAPIUrl(scope string, routes ...string) string {
 }
 
 // Get 获取文件内容
-func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
+func (handler *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
 	// 尝试获取速度限制
 	speedLimit := 0
 	if user, ok := ctx.Value(fsctx.UserCtx).(model.User); ok {
@@ -150,7 +150,7 @@ func (handler Driver) Get(ctx context.Context, path string) (response.RSCloser,
 }
 
 // Put 将文件流保存到指定目录
-func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
+func (handler *Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
 	defer file.Close()
 
 	// 凭证有效期
@@ -206,7 +206,7 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
 
 // Delete 删除一个或多个文件,
 // 返回未删除的文件,及遇到的最后一个错误
-func (handler Driver) Delete(ctx context.Context, files []string) ([]string, error) {
+func (handler *Driver) Delete(ctx context.Context, files []string) ([]string, error) {
 	// 封装接口请求正文
 	reqBody := serializer.RemoteDeleteRequest{
 		Files: files,
@@ -252,7 +252,7 @@ func (handler Driver) Delete(ctx context.Context, files []string) ([]string, err
 }
 
 // Thumb 获取文件缩略图
-func (handler Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
+func (handler *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
 	sourcePath := base64.RawURLEncoding.EncodeToString([]byte(path))
 	thumbURL := handler.getAPIUrl("thumb") + "/" + sourcePath
 	ttl := model.GetIntSetting("preview_timeout", 60)
@@ -268,7 +268,7 @@ func (handler Driver) Thumb(ctx context.Context, path string) (*response.Content
 }
 
 // Source 获取外链URL
-func (handler Driver) Source(
+func (handler *Driver) Source(
 	ctx context.Context,
 	path string,
 	baseURL url.URL,
@@ -322,9 +322,9 @@ func (handler Driver) Source(
 }
 
 // Token 获取上传策略和认证Token
-func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) {
+func (handler *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) {
 	siteURL := model.GetSiteURL()
-	apiBaseURI, _ := url.Parse(path.Join("/api/v3/callback/remote" + uploadSession.Key + uploadSession.CallbackSecret))
+	apiBaseURI, _ := url.Parse(path.Join("/api/v3/callback/remote", uploadSession.Key, uploadSession.CallbackSecret))
 	apiURL := siteURL.ResolveReference(apiBaseURI)
 
 	// 在从机端创建上传会话
@@ -347,7 +347,7 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria
 	}, nil
 }
 
-func (handler Driver) getUploadCredential(ctx context.Context, policy serializer.UploadPolicy, TTL int64) (serializer.UploadCredential, error) {
+func (handler *Driver) getUploadCredential(ctx context.Context, policy serializer.UploadPolicy, TTL int64) (serializer.UploadCredential, error) {
 	policyEncoded, err := policy.EncodeUploadPolicy()
 	if err != nil {
 		return serializer.UploadCredential{}, err
@@ -371,6 +371,6 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy serializer
 }
 
 // 取消上传凭证
-func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
+func (handler *Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
 	return nil
 }
diff --git a/pkg/filesystem/filesystem.go b/pkg/filesystem/filesystem.go
index 5c12ad1..48e0378 100644
--- a/pkg/filesystem/filesystem.go
+++ b/pkg/filesystem/filesystem.go
@@ -207,7 +207,7 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) {
 	}
 
 	// 获取回调会话
-	callbackSessionRaw, ok := c.Get("callbackSession")
+	callbackSessionRaw, ok := c.Get(UploadSessionCtx)
 	if !ok {
 		return nil, errors.New("找不到回调会话")
 	}
diff --git a/pkg/filesystem/hooks.go b/pkg/filesystem/hooks.go
index f83e259..e73fa33 100644
--- a/pkg/filesystem/hooks.go
+++ b/pkg/filesystem/hooks.go
@@ -194,9 +194,7 @@ func SlaveAfterUpload(session *serializer.UploadSession) Hook {
 
 		// 发送回调请求
 		callbackBody := serializer.UploadCallback{
-			SourceName: file.SourceName,
-			PicInfo:    file.PicInfo,
-			Size:       fileInfo.Size,
+			PicInfo: file.PicInfo,
 		}
 
 		return cluster.RemoteCallback(session.Callback, callbackBody)
@@ -287,12 +285,13 @@ func HookChunkUploadFailed(ctx context.Context, fs *FileSystem, fileHeader fsctx
 	return fileInfo.Model.(*model.File).UpdateSize(fileInfo.AppendStart)
 }
 
-// HookChunkUploadFinished 分片上传结束后处理文件
-func HookChunkUploadFinished(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error {
-	fileInfo := fileHeader.Info()
-	fileModel := fileInfo.Model.(*model.File)
-
-	return fileModel.PopChunkToFile(fileInfo.LastModified)
+// HookPopPlaceholderToFile 将占位文件提升为正式文件
+func HookPopPlaceholderToFile(picInfo string) Hook {
+	return func(ctx context.Context, fs *FileSystem, fileHeader fsctx.FileHeader) error {
+		fileInfo := fileHeader.Info()
+		fileModel := fileInfo.Model.(*model.File)
+		return fileModel.PopChunkToFile(fileInfo.LastModified, picInfo)
+	}
 }
 
 // HookChunkUploadFinished 分片上传结束后处理文件
diff --git a/pkg/filesystem/upload.go b/pkg/filesystem/upload.go
index 0416ef3..f6ec0d2 100644
--- a/pkg/filesystem/upload.go
+++ b/pkg/filesystem/upload.go
@@ -23,6 +23,8 @@ import (
 
 const (
 	UploadSessionMetaKey     = "upload_session"
+	UploadSessionCtx         = "uploadSession"
+	UserCtx                  = "user"
 	UploadSessionCachePrefix = "callback_"
 )
 
@@ -47,11 +49,11 @@ func (fs *FileSystem) Upload(ctx context.Context, file *fsctx.FileStream) (err e
 		file.SavePath = savePath
 	}
 
-	// 处理客户端未完成上传时,关闭连接
-	go fs.CancelUpload(ctx, savePath, file)
-
 	// 保存文件
 	if file.Mode&fsctx.Nop != fsctx.Nop {
+		// 处理客户端未完成上传时,关闭连接
+		go fs.CancelUpload(ctx, savePath, file)
+
 		err = fs.Handler.Put(ctx, file)
 		if err != nil {
 			fs.Trigger(ctx, "AfterUploadFailed", file)
@@ -202,7 +204,7 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS
 	// 创建回调会话
 	err = cache.Set(
 		UploadSessionCachePrefix+callbackKey,
-		uploadSession,
+		*uploadSession,
 		callBackSessionTTL,
 	)
 	if err != nil {
diff --git a/pkg/serializer/upload.go b/pkg/serializer/upload.go
index 12dce42..225c493 100644
--- a/pkg/serializer/upload.go
+++ b/pkg/serializer/upload.go
@@ -51,9 +51,7 @@ type UploadSession struct {
 
 // UploadCallback 上传回调正文
 type UploadCallback struct {
-	SourceName string `json:"source_name"`
-	PicInfo    string `json:"pic_info"`
-	Size       uint64 `json:"size"`
+	PicInfo string `json:"pic_info"`
 }
 
 // GeneralUploadCallbackFailed 存储策略上传回调失败响应
diff --git a/routers/router.go b/routers/router.go
index 2b9052d..bf8eda1 100644
--- a/routers/router.go
+++ b/routers/router.go
@@ -223,7 +223,8 @@ func InitMasterRouter() *gin.Engine {
 		{
 			// 远程策略上传回调
 			callback.POST(
-				"remote/:key",
+				"remote/:sessionID/:key",
+				middleware.UseUploadSession("remote"),
 				middleware.RemoteCallbackAuth(),
 				controllers.RemoteCallback,
 			)
diff --git a/service/callback/upload.go b/service/callback/upload.go
index 59f7a4f..df5d180 100644
--- a/service/callback/upload.go
+++ b/service/callback/upload.go
@@ -3,6 +3,7 @@ package callback
 import (
 	"context"
 	"fmt"
+	model "github.com/cloudreve/Cloudreve/v3/models"
 	"strings"
 
 	"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
@@ -11,13 +12,12 @@ import (
 	"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/s3"
 	"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
 	"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
-	"github.com/cloudreve/Cloudreve/v3/pkg/util"
 	"github.com/gin-gonic/gin"
 )
 
 // CallbackProcessService 上传请求回调正文接口
 type CallbackProcessService interface {
-	GetBody(*serializer.UploadSession) serializer.UploadCallback
+	GetBody() serializer.UploadCallback
 }
 
 // RemoteUploadCallbackService 远程存储上传回调请求服务
@@ -26,7 +26,7 @@ type RemoteUploadCallbackService struct {
 }
 
 // GetBody 返回回调正文
-func (service RemoteUploadCallbackService) GetBody(session *serializer.UploadSession) serializer.UploadCallback {
+func (service RemoteUploadCallbackService) GetBody() serializer.UploadCallback {
 	return service.Data
 }
 
@@ -68,11 +68,8 @@ type S3Callback struct {
 }
 
 // GetBody 返回回调正文
-func (service UpyunCallbackService) GetBody(session *serializer.UploadSession) serializer.UploadCallback {
-	res := serializer.UploadCallback{
-		SourceName: service.SourceName,
-		Size:       service.Size,
-	}
+func (service UpyunCallbackService) GetBody() serializer.UploadCallback {
+	res := serializer.UploadCallback{}
 	if service.Width != "" {
 		res.PicInfo = service.Width + "," + service.Height
 	}
@@ -81,47 +78,41 @@ func (service UpyunCallbackService) GetBody(session *serializer.UploadSession) s
 }
 
 // GetBody 返回回调正文
-func (service UploadCallbackService) GetBody(session *serializer.UploadSession) serializer.UploadCallback {
+func (service UploadCallbackService) GetBody() serializer.UploadCallback {
 	return serializer.UploadCallback{
-		SourceName: service.SourceName,
-		PicInfo:    service.PicInfo,
-		Size:       service.Size,
+		PicInfo: service.PicInfo,
 	}
 }
 
 // GetBody 返回回调正文
-func (service OneDriveCallback) GetBody(session *serializer.UploadSession) serializer.UploadCallback {
+func (service OneDriveCallback) GetBody() serializer.UploadCallback {
 	var picInfo = "0,0"
 	if service.Meta.Image.Width != 0 {
 		picInfo = fmt.Sprintf("%d,%d", service.Meta.Image.Width, service.Meta.Image.Height)
 	}
 	return serializer.UploadCallback{
-		SourceName: session.SavePath,
-		PicInfo:    picInfo,
-		Size:       session.Size,
+		PicInfo: picInfo,
 	}
 }
 
 // GetBody 返回回调正文
-func (service COSCallback) GetBody(session *serializer.UploadSession) serializer.UploadCallback {
+func (service COSCallback) GetBody() serializer.UploadCallback {
 	return serializer.UploadCallback{
-		SourceName: session.SavePath,
-		PicInfo:    "",
-		Size:       session.Size,
+		PicInfo: "",
 	}
 }
 
 // GetBody 返回回调正文
-func (service S3Callback) GetBody(session *serializer.UploadSession) serializer.UploadCallback {
+func (service S3Callback) GetBody() serializer.UploadCallback {
 	return serializer.UploadCallback{
-		SourceName: session.SavePath,
-		PicInfo:    "",
-		Size:       session.Size,
+		PicInfo: "",
 	}
 }
 
 // ProcessCallback 处理上传结果回调
 func ProcessCallback(service CallbackProcessService, c *gin.Context) serializer.Response {
+	callbackBody := service.GetBody()
+
 	// 创建文件系统
 	fs, err := filesystem.NewFileSystemFromCallback(c)
 	if err != nil {
@@ -129,51 +120,39 @@ func ProcessCallback(service CallbackProcessService, c *gin.Context) serializer.
 	}
 	defer fs.Recycle()
 
-	// 获取回调会话
-	callbackSessionRaw, _ := c.Get("callbackSession")
-	callbackSession := callbackSessionRaw.(*serializer.UploadSession)
-	callbackBody := service.GetBody(callbackSession)
+	// 获取上传会话
+	uploadSession := c.MustGet(filesystem.UploadSessionCtx).(*serializer.UploadSession)
 
-	// 获取父目录
-	exist, parentFolder := fs.IsPathExist(callbackSession.VirtualPath)
-	if !exist {
-		newFolder, err := fs.CreateDirectory(context.Background(), callbackSession.VirtualPath)
-		if err != nil {
-			return serializer.Err(serializer.CodeParamErr, "指定目录不存在", err)
-		}
-		parentFolder = newFolder
+	// 查找上传会话创建的占位文件
+	file, err := model.GetFilesByUploadSession(uploadSession.Key, fs.User.ID)
+	if err != nil {
+		return serializer.Err(serializer.CodeUploadSessionExpired, "LocalUpload session file placeholder not exist", err)
 	}
 
-	// 创建文件头
-	fileHeader := fsctx.FileStream{
-		Size:        callbackBody.Size,
-		VirtualPath: callbackSession.VirtualPath,
-		Name:        callbackSession.Name,
-		SavePath:    callbackBody.SourceName,
+	fileData := fsctx.FileStream{
+		Size:         uploadSession.Size,
+		Name:         uploadSession.Name,
+		VirtualPath:  uploadSession.VirtualPath,
+		SavePath:     uploadSession.SavePath,
+		Mode:         fsctx.Nop,
+		Model:        file,
+		LastModified: uploadSession.LastModified,
 	}
 
-	// 添加钩子
-	fs.Use("BeforeAddFile", filesystem.HookValidateFile)
-	fs.Use("BeforeAddFile", filesystem.HookValidateCapacity)
+	// 占位符未扣除容量需要校验和扣除
+	if !fs.Policy.IsUploadPlaceholderWithSize() {
+		fs.Use("AfterUpload", filesystem.HookValidateCapacity)
+		fs.Use("AfterUpload", filesystem.HookChunkUploaded)
+	}
+
+	fs.Use("AfterUpload", filesystem.HookPopPlaceholderToFile(callbackBody.PicInfo))
 	fs.Use("AfterValidateFailed", filesystem.HookDeleteTempFile)
-	fs.Use("BeforeAddFileFailed", filesystem.HookDeleteTempFile)
-
-	// 向数据库中添加文件
-	file, err := fs.AddFile(context.Background(), parentFolder, &fileHeader)
+	err = fs.Upload(context.Background(), &fileData)
 	if err != nil {
 		return serializer.Err(serializer.CodeUploadFailed, err.Error(), err)
 	}
 
-	// 如果是图片,则更新图片信息
-	if callbackBody.PicInfo != "" {
-		if err := file.UpdatePicInfo(callbackBody.PicInfo); err != nil {
-			util.Log().Debug("无法更新回调文件的图片信息:%s", err)
-		}
-	}
-
-	return serializer.Response{
-		Code: 0,
-	}
+	return serializer.Response{}
 }
 
 // PreProcess 对OneDrive客户端回调进行预处理验证
diff --git a/service/explorer/upload.go b/service/explorer/upload.go
index 754d897..4db5f1c 100644
--- a/service/explorer/upload.go
+++ b/service/explorer/upload.go
@@ -192,13 +192,14 @@ func processChunkUpload(ctx context.Context, c *gin.Context, fs *filesystem.File
 		fs.Use("AfterUpload", filesystem.HookChunkUploaded)
 		fs.Use("AfterValidateFailed", filesystem.HookChunkUploadFailed)
 		if isLastChunk {
-			fs.Use("AfterUpload", filesystem.HookChunkUploadFinished)
+			fs.Use("AfterUpload", filesystem.HookPopPlaceholderToFile(""))
 			fs.Use("AfterUpload", filesystem.HookGenerateThumb)
 			fs.Use("AfterUpload", filesystem.HookDeleteUploadSession(session.Key))
 		}
 	} else {
 		if isLastChunk {
 			fs.Use("AfterUpload", filesystem.SlaveAfterUpload(session))
+			fs.Use("AfterUpload", filesystem.HookDeleteUploadSession(session.Key))
 		}
 	}