Feat: truncate file if uploaded chunk is overlapped
This commit is contained in:
parent
521c5c8dc4
commit
285611baf7
8 changed files with 69 additions and 34 deletions
|
@ -64,12 +64,12 @@ func CheckRequest(instance Auth, r *http.Request) error {
|
||||||
return instance.Check(getSignContent(r), sign[0])
|
return instance.Check(getSignContent(r), sign[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
// getSignContent 签名请求 path、正文、以`X-`开头的 Header. 如果 Header 中包含 `X-Policy`,
|
// getSignContent 签名请求 path、正文、以`X-`开头的 Header. 如果请求 path 为从机上传 API,
|
||||||
// 则不对正文签名。返回待签名/验证的字符串
|
// 则不对正文签名。返回待签名/验证的字符串
|
||||||
func getSignContent(r *http.Request) (rawSignString string) {
|
func getSignContent(r *http.Request) (rawSignString string) {
|
||||||
// 读取所有body正文
|
// 读取所有body正文
|
||||||
var body = []byte{}
|
var body = []byte{}
|
||||||
if _, ok := r.Header["X-Cr-Policy"]; !ok {
|
if strings.Contains(r.URL.Path, "/api/v3/slave/upload/") {
|
||||||
if r.Body != nil {
|
if r.Body != nil {
|
||||||
body, _ = ioutil.ReadAll(r.Body)
|
body, _ = ioutil.ReadAll(r.Body)
|
||||||
_ = r.Body.Close()
|
_ = r.Body.Close()
|
||||||
|
|
|
@ -136,8 +136,20 @@ func (handler Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if uint64(stat.Size()) != fileInfo.AppendStart {
|
if uint64(stat.Size()) < fileInfo.AppendStart {
|
||||||
return errors.New("未上传完成的文件分片与预期大小不一致")
|
return errors.New("未上传完成的文件分片与预期大小不一致")
|
||||||
|
} else if uint64(stat.Size()) > fileInfo.AppendStart {
|
||||||
|
out.Close()
|
||||||
|
if err := handler.Truncate(ctx, dst, fileInfo.AppendStart); err != nil {
|
||||||
|
return fmt.Errorf("覆盖分片时发生错误: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err = os.OpenFile(dst, os.O_APPEND|os.O_CREATE|os.O_WRONLY, Perm)
|
||||||
|
defer out.Close()
|
||||||
|
if err != nil {
|
||||||
|
util.Log().Warning("无法打开或创建文件,%s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,18 +1,26 @@
|
||||||
package remote
|
package remote
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
model "github.com/cloudreve/Cloudreve/v3/models"
|
model "github.com/cloudreve/Cloudreve/v3/models"
|
||||||
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
|
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
|
||||||
"github.com/cloudreve/Cloudreve/v3/pkg/request"
|
"github.com/cloudreve/Cloudreve/v3/pkg/request"
|
||||||
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
|
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
|
||||||
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
basePath = "/api/v3/slave"
|
||||||
|
)
|
||||||
|
|
||||||
// Client to operate remote slave server
|
// Client to operate remote slave server
|
||||||
type Client interface {
|
type Client interface {
|
||||||
CreateUploadSession(session *serializer.UploadSession, ttl int64) error
|
CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error
|
||||||
|
GetUploadURL(ttl int64, sessionID string) (string, string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient creates new Client from given policy
|
// NewClient creates new Client from given policy
|
||||||
|
@ -23,7 +31,7 @@ func NewClient(policy *model.Policy) (Client, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
base, _ := url.Parse("/api/v3/slave")
|
base, _ := url.Parse(basePath)
|
||||||
signTTL := model.GetIntSetting("slave_api_timeout", 60)
|
signTTL := model.GetIntSetting("slave_api_timeout", 60)
|
||||||
|
|
||||||
return &remoteClient{
|
return &remoteClient{
|
||||||
|
@ -43,7 +51,7 @@ type remoteClient struct {
|
||||||
httpClient request.Client
|
httpClient request.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *remoteClient) CreateUploadSession(session *serializer.UploadSession, ttl int64) error {
|
func (c *remoteClient) CreateUploadSession(ctx context.Context, session *serializer.UploadSession, ttl int64) error {
|
||||||
reqBodyEncoded, err := json.Marshal(map[string]interface{}{
|
reqBodyEncoded, err := json.Marshal(map[string]interface{}{
|
||||||
"session": session,
|
"session": session,
|
||||||
"ttl": ttl,
|
"ttl": ttl,
|
||||||
|
@ -57,6 +65,7 @@ func (c *remoteClient) CreateUploadSession(session *serializer.UploadSession, tt
|
||||||
"PUT",
|
"PUT",
|
||||||
"upload",
|
"upload",
|
||||||
bodyReader,
|
bodyReader,
|
||||||
|
request.WithContext(ctx),
|
||||||
).CheckHTTPResponse(200).DecodeResponse()
|
).CheckHTTPResponse(200).DecodeResponse()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -68,3 +77,22 @@ func (c *remoteClient) CreateUploadSession(session *serializer.UploadSession, tt
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *remoteClient) GetUploadURL(ttl int64, sessionID string) (string, string, error) {
|
||||||
|
base, err := url.Parse(c.policy.BaseURL)
|
||||||
|
if err != nil {
|
||||||
|
return "", "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
base.Path = path.Join(base.Path, "upload", sessionID)
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", base.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return "", "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header["X-Cr-Overwrite"] = []string{"true"}
|
||||||
|
|
||||||
|
req = auth.SignRequest(c.authInstance, req, ttl)
|
||||||
|
return req.URL.String(), req.Header["Authorization"][0], nil
|
||||||
|
}
|
||||||
|
|
|
@ -323,29 +323,23 @@ func (handler Driver) Source(
|
||||||
|
|
||||||
// Token 获取上传策略和认证Token
|
// Token 获取上传策略和认证Token
|
||||||
func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) {
|
func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) {
|
||||||
if err := handler.client.CreateUploadSession(uploadSession, ttl); err != nil {
|
// 在从机端创建上传会话
|
||||||
|
if err := handler.client.CreateUploadSession(ctx, uploadSession, ttl); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取上传地址
|
||||||
|
uploadURL, sign, err := handler.client.GetUploadURL(ttl, uploadSession.Key)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &serializer.UploadCredential{
|
return &serializer.UploadCredential{
|
||||||
SessionID: uploadSession.Key,
|
SessionID: uploadSession.Key,
|
||||||
ChunkSize: handler.Policy.OptionsSerialized.ChunkSize,
|
ChunkSize: handler.Policy.OptionsSerialized.ChunkSize,
|
||||||
|
UploadURLs: []string{uploadURL},
|
||||||
|
Credential: sign,
|
||||||
}, nil
|
}, nil
|
||||||
//// 生成回调地址
|
|
||||||
//siteURL := model.GetSiteURL()
|
|
||||||
//apiBaseURI, _ := url.Parse("/api/v3/callback/remote/" + uploadSession.Key)
|
|
||||||
//apiURL := siteURL.ResolveReference(apiBaseURI)
|
|
||||||
//
|
|
||||||
//// 生成上传策略
|
|
||||||
//policy := serializer.UploadPolicy{
|
|
||||||
// SavePath: handler.Policy.DirNameRule,
|
|
||||||
// FileName: handler.Policy.FileNameRule,
|
|
||||||
// AutoRename: handler.Policy.AutoRename,
|
|
||||||
// MaxSize: handler.Policy.MaxSize,
|
|
||||||
// AllowedExtension: handler.Policy.OptionsSerialized.FileType,
|
|
||||||
// CallbackURL: apiURL.String(),
|
|
||||||
//}
|
|
||||||
//return handler.getUploadCredential(ctx, policy, ttl)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (handler Driver) getUploadCredential(ctx context.Context, policy serializer.UploadPolicy, TTL int64) (serializer.UploadCredential, error) {
|
func (handler Driver) getUploadCredential(ctx context.Context, policy serializer.UploadPolicy, TTL int64) (serializer.UploadCredential, error) {
|
||||||
|
|
|
@ -301,7 +301,7 @@ func HookChunkUploaded(ctx context.Context, fs *FileSystem, fileHeader fsctx.Fil
|
||||||
fileInfo := fileHeader.Info()
|
fileInfo := fileHeader.Info()
|
||||||
|
|
||||||
// 更新文件大小
|
// 更新文件大小
|
||||||
return fileInfo.Model.(*model.File).UpdateSize(fileInfo.Model.(*model.File).GetSize() + fileInfo.Size)
|
return fileInfo.Model.(*model.File).UpdateSize(fileInfo.AppendStart + fileInfo.Size)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HookChunkUploadFailed 单个分片上传失败后
|
// HookChunkUploadFailed 单个分片上传失败后
|
||||||
|
@ -309,7 +309,7 @@ func HookChunkUploadFailed(ctx context.Context, fs *FileSystem, fileHeader fsctx
|
||||||
fileInfo := fileHeader.Info()
|
fileInfo := fileHeader.Info()
|
||||||
|
|
||||||
// 更新文件大小
|
// 更新文件大小
|
||||||
return fileInfo.Model.(*model.File).UpdateSize(fileInfo.Model.(*model.File).GetSize() - fileInfo.Size)
|
return fileInfo.Model.(*model.File).UpdateSize(fileInfo.AppendStart)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HookChunkUploadFinished 分片上传结束后处理文件
|
// HookChunkUploadFinished 分片上传结束后处理文件
|
||||||
|
|
|
@ -20,9 +20,11 @@ type UploadPolicy struct {
|
||||||
|
|
||||||
// UploadCredential 返回给客户端的上传凭证
|
// UploadCredential 返回给客户端的上传凭证
|
||||||
type UploadCredential struct {
|
type UploadCredential struct {
|
||||||
SessionID string `json:"sessionID"`
|
SessionID string `json:"sessionID"`
|
||||||
ChunkSize uint64 `json:"chunkSize"` // 分块大小,0 为部分快
|
ChunkSize uint64 `json:"chunkSize"` // 分块大小,0 为部分快
|
||||||
Expires int64 `json:"expires"` // 上传凭证过期时间, Unix 时间戳
|
Expires int64 `json:"expires"` // 上传凭证过期时间, Unix 时间戳
|
||||||
|
UploadURLs []string `json:"uploadURLs"`
|
||||||
|
Credential string `json:"credential"`
|
||||||
|
|
||||||
Token string `json:"token"`
|
Token string `json:"token"`
|
||||||
Policy string `json:"policy"`
|
Policy string `json:"policy"`
|
||||||
|
|
|
@ -46,7 +46,7 @@ func InitSlaveRouter() *gin.Engine {
|
||||||
// 接收主机心跳包
|
// 接收主机心跳包
|
||||||
v3.POST("heartbeat", controllers.SlaveHeartbeat)
|
v3.POST("heartbeat", controllers.SlaveHeartbeat)
|
||||||
// 上传
|
// 上传
|
||||||
v3.POST("upload", controllers.SlaveUpload)
|
v3.POST("upload/:sessionId", controllers.SlaveUpload)
|
||||||
// 创建上传会话上传
|
// 创建上传会话上传
|
||||||
v3.PUT("upload", controllers.SlaveGetUploadSession)
|
v3.PUT("upload", controllers.SlaveGetUploadSession)
|
||||||
// 下载
|
// 下载
|
||||||
|
|
|
@ -66,10 +66,10 @@ func (service *CreateUploadSessionService) Create(ctx context.Context, c *gin.Co
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// UploadService 本机策略上传服务
|
// UploadService 本机及从机策略上传服务
|
||||||
type UploadService struct {
|
type UploadService struct {
|
||||||
ID string `uri:"sessionId" binding:"required"`
|
ID string `uri:"sessionId" binding:"required"`
|
||||||
Index int `uri:"index" binding:"min=0"`
|
Index int `uri:"index" form:"index" binding:"min=0"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upload 处理本机文件分片上传
|
// Upload 处理本机文件分片上传
|
||||||
|
@ -117,8 +117,7 @@ func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serial
|
||||||
}
|
}
|
||||||
|
|
||||||
if expectedSizeStart > actualSizeStart {
|
if expectedSizeStart > actualSizeStart {
|
||||||
util.Log().Warning("尝试上传覆盖分片[%d],数据将被忽略", service.Index)
|
util.Log().Info("尝试上传覆盖分片[%d] Start=%d", service.Index, actualSizeStart)
|
||||||
return serializer.Response{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return processChunkUpload(ctx, c, fs, &uploadSession, service.Index, file)
|
return processChunkUpload(ctx, c, fs, &uploadSession, service.Index, file)
|
||||||
|
|
Loading…
Add table
Reference in a new issue