From 7dd636da74ed6aeba27a7e0e96d0101bf6a1b567 Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sun, 27 Feb 2022 14:16:36 +0800 Subject: [PATCH] Feat: upload session recycle crontab job / API for cleanup all upload session --- models/file.go | 13 ++++ models/migration.go | 1 + models/policy.go | 8 +-- pkg/crontab/collect.go | 44 +++++++++++++ pkg/crontab/init.go | 7 +- pkg/filesystem/driver/cos/handler.go | 5 ++ pkg/filesystem/driver/handler.go | 3 + pkg/filesystem/driver/local/handler.go | 5 ++ pkg/filesystem/driver/onedrive/handler.go | 5 ++ pkg/filesystem/driver/oss/handler.go | 5 ++ pkg/filesystem/driver/qiniu/handler.go | 5 ++ pkg/filesystem/driver/remote/handler.go | 5 ++ pkg/filesystem/driver/s3/handler.go | 5 ++ .../driver/shadow/masterinslave/handler.go | 5 ++ .../driver/shadow/slaveinmaster/handler.go | 5 ++ pkg/filesystem/driver/upyun/handler.go | 5 ++ pkg/filesystem/file.go | 33 ++++++++-- pkg/filesystem/manage.go | 1 + routers/controllers/file.go | 27 +++++++- routers/router.go | 16 +++-- service/explorer/upload.go | 66 ++++++++++++++++++- 21 files changed, 248 insertions(+), 21 deletions(-) diff --git a/models/file.go b/models/file.go index 7c472ef..9725a38 100644 --- a/models/file.go +++ b/models/file.go @@ -139,6 +139,19 @@ func GetChildFilesOfFolders(folders *[]Folder) ([]File, error) { return files, result.Error } +// GetUploadPlaceholderFiles 获取所有上传占位文件 +// UID为0表示忽略用户 +func GetUploadPlaceholderFiles(uid uint) []*File { + query := DB + if uid != 0 { + query = query.Where("user_id = ?", uid) + } + + var files []*File + query.Where("upload_session_id is not NULL").Find(&files) + return files +} + // GetPolicy 获取文件所属策略 func (file *File) GetPolicy() *Policy { if file.Policy.Model.ID == 0 { diff --git a/models/migration.go b/models/migration.go index d29844f..230f35a 100644 --- a/models/migration.go +++ b/models/migration.go @@ -162,6 +162,7 @@ Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; verti {Name: "home_view_method", Value: "icon", Type: "view"}, {Name: "share_view_method", Value: "list", Type: "view"}, {Name: "cron_garbage_collect", Value: "@hourly", Type: "cron"}, + {Name: "cron_recycle_upload_session", Value: "@every 1h30m", Type: "cron"}, {Name: "authn_enabled", Value: "0", Type: "authn"}, {Name: "captcha_type", Value: "normal", Type: "captcha"}, {Name: "captcha_height", Value: "60", Type: "captcha"}, diff --git a/models/policy.go b/models/policy.go index 25938c2..ef1ce92 100644 --- a/models/policy.go +++ b/models/policy.go @@ -216,13 +216,7 @@ func (policy *Policy) IsThumbExist(name string) bool { // IsTransitUpload 返回此策略上传给定size文件时是否需要服务端中转 func (policy *Policy) IsTransitUpload(size uint64) bool { - if policy.Type == "local" { - return true - } - if policy.Type == "onedrive" && size < 4*1024*1024 { - return true - } - return false + return policy.Type == "local" } // IsPathGenerateNeeded 返回此策略是否需要在生成上传凭证时生成存储路径 diff --git a/pkg/crontab/collect.go b/pkg/crontab/collect.go index be6798b..79bf587 100644 --- a/pkg/crontab/collect.go +++ b/pkg/crontab/collect.go @@ -1,6 +1,7 @@ package crontab import ( + "context" "os" "path/filepath" "strings" @@ -8,6 +9,7 @@ import ( model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/cache" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" "github.com/cloudreve/Cloudreve/v3/pkg/util" ) @@ -53,3 +55,45 @@ func collectCache(store *cache.MemoStore) { util.Log().Debug("清理内存缓存") store.GarbageCollect() } + +func uploadSessionCollect() { + placeholders := model.GetUploadPlaceholderFiles(0) + + // 将过期的上传会话按照用户分组 + userToFiles := make(map[uint][]uint) + for _, file := range placeholders { + _, sessionExist := cache.Get(filesystem.UploadSessionCachePrefix + *file.UploadSessionID) + if sessionExist { + continue + } + + if _, ok := userToFiles[file.UserID]; !ok { + userToFiles[file.UserID] = make([]uint, 0) + } + + userToFiles[file.UserID] = append(userToFiles[file.UserID], file.ID) + } + + // 删除过期的会话 + for uid, filesIDs := range userToFiles { + user, err := model.GetUserByID(uid) + if err != nil { + util.Log().Warning("上传会话所属用户不存在, %s", err) + continue + } + + fs, err := filesystem.NewFileSystem(&user) + if err != nil { + util.Log().Warning("无法初始化文件系统, %s", err) + continue + } + + if err = fs.Delete(context.Background(), []uint{}, filesIDs, false); err != nil { + util.Log().Warning("无法删除上传会话, %s", err) + } + + fs.Recycle() + } + + util.Log().Info("定时任务 [cron_recycle_upload_session] 执行完毕") +} diff --git a/pkg/crontab/init.go b/pkg/crontab/init.go index 1b8a322..0a696a4 100644 --- a/pkg/crontab/init.go +++ b/pkg/crontab/init.go @@ -21,13 +21,18 @@ func Reload() { func Init() { util.Log().Info("初始化定时任务...") // 读取cron日程设置 - options := model.GetSettingByNames("cron_garbage_collect") + options := model.GetSettingByNames( + "cron_garbage_collect", + "cron_recycle_upload_session", + ) Cron := cron.New() for k, v := range options { var handler func() switch k { case "cron_garbage_collect": handler = garbageCollect + case "cron_recycle_upload_session": + handler = uploadSessionCollect default: util.Log().Warning("未知定时任务类型 [%s],跳过", k) continue diff --git a/pkg/filesystem/driver/cos/handler.go b/pkg/filesystem/driver/cos/handler.go index efad19f..7577ec4 100644 --- a/pkg/filesystem/driver/cos/handler.go +++ b/pkg/filesystem/driver/cos/handler.go @@ -365,6 +365,11 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria } +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} + // Meta 获取文件信息 func (handler Driver) Meta(ctx context.Context, path string) (*MetaData, error) { res, err := handler.Client.Object.Head(ctx, path, &cossdk.ObjectHeadOptions{}) diff --git a/pkg/filesystem/driver/handler.go b/pkg/filesystem/driver/handler.go index 5608764..74c99cd 100644 --- a/pkg/filesystem/driver/handler.go +++ b/pkg/filesystem/driver/handler.go @@ -32,6 +32,9 @@ type Handler interface { // Token 获取有效期为ttl的上传凭证和签名 Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (serializer.UploadCredential, error) + // CancelToken 取消已经创建的有状态上传凭证 + CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error + // List 递归列取远程端path路径下文件、目录,不包含path本身, // 返回的对象路径以path作为起始根目录. // recursive - 是否递归列出 diff --git a/pkg/filesystem/driver/local/handler.go b/pkg/filesystem/driver/local/handler.go index 3c428ca..c1742b5 100644 --- a/pkg/filesystem/driver/local/handler.go +++ b/pkg/filesystem/driver/local/handler.go @@ -260,3 +260,8 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria ChunkSize: handler.Policy.OptionsSerialized.ChunkSize, }, nil } + +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} diff --git a/pkg/filesystem/driver/onedrive/handler.go b/pkg/filesystem/driver/onedrive/handler.go index 7d043d9..a5944f9 100644 --- a/pkg/filesystem/driver/onedrive/handler.go +++ b/pkg/filesystem/driver/onedrive/handler.go @@ -249,3 +249,8 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria Token: apiURL.String(), }, nil } + +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} diff --git a/pkg/filesystem/driver/oss/handler.go b/pkg/filesystem/driver/oss/handler.go index e5ed154..2a229cd 100644 --- a/pkg/filesystem/driver/oss/handler.go +++ b/pkg/filesystem/driver/oss/handler.go @@ -463,3 +463,8 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPoli Token: signature, }, nil } + +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} diff --git a/pkg/filesystem/driver/qiniu/handler.go b/pkg/filesystem/driver/qiniu/handler.go index 7304d8f..94a2b8e 100644 --- a/pkg/filesystem/driver/qiniu/handler.go +++ b/pkg/filesystem/driver/qiniu/handler.go @@ -308,3 +308,8 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy storage.Pu Token: upToken, }, nil } + +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} diff --git a/pkg/filesystem/driver/remote/handler.go b/pkg/filesystem/driver/remote/handler.go index c2d150f..0eb2b74 100644 --- a/pkg/filesystem/driver/remote/handler.go +++ b/pkg/filesystem/driver/remote/handler.go @@ -345,3 +345,8 @@ func (handler Driver) getUploadCredential(ctx context.Context, policy serializer } return serializer.UploadCredential{}, errors.New("无法签名上传策略") } + +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} diff --git a/pkg/filesystem/driver/s3/handler.go b/pkg/filesystem/driver/s3/handler.go index 8152ab7..be72308 100644 --- a/pkg/filesystem/driver/s3/handler.go +++ b/pkg/filesystem/driver/s3/handler.go @@ -446,3 +446,8 @@ func (handler Driver) CORS() error { return err } + +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} diff --git a/pkg/filesystem/driver/shadow/masterinslave/handler.go b/pkg/filesystem/driver/shadow/masterinslave/handler.go index 2382435..cc3853c 100644 --- a/pkg/filesystem/driver/shadow/masterinslave/handler.go +++ b/pkg/filesystem/driver/shadow/masterinslave/handler.go @@ -54,3 +54,8 @@ func (d *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { return nil, ErrNotImplemented } + +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} diff --git a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go index 92fee1c..4b495c9 100644 --- a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go +++ b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go @@ -120,3 +120,8 @@ func (d *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { return nil, ErrNotImplemented } + +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} diff --git a/pkg/filesystem/driver/upyun/handler.go b/pkg/filesystem/driver/upyun/handler.go index dac0679..5c03e5e 100644 --- a/pkg/filesystem/driver/upyun/handler.go +++ b/pkg/filesystem/driver/upyun/handler.go @@ -335,6 +335,11 @@ func (handler Driver) Token(ctx context.Context, ttl int64, uploadSession *seria return handler.getUploadCredential(ctx, putPolicy) } +// 取消上传凭证 +func (handler Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error { + return nil +} + func (handler Driver) getUploadCredential(ctx context.Context, policy UploadPolicy) (serializer.UploadCredential, error) { // 生成上传策略 policyJSON, err := json.Marshal(policy) diff --git a/pkg/filesystem/file.go b/pkg/filesystem/file.go index 0edcb72..9bc2e0f 100644 --- a/pkg/filesystem/file.go +++ b/pkg/filesystem/file.go @@ -5,6 +5,7 @@ import ( "io" model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/conf" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response" @@ -177,23 +178,45 @@ func (fs *FileSystem) deleteGroupedFile(ctx context.Context, files map[uint][]*m for policyID, toBeDeletedFiles := range files { // 列举出需要物理删除的文件的物理路径 - sourceNames := make([]string, 0, len(toBeDeletedFiles)) + sourceNamesAll := make([]string, 0, len(toBeDeletedFiles)) + sourceNamesDeleted := make([]string, 0, len(toBeDeletedFiles)) + sourceNamesTryDeleted := make([]string, 0, len(toBeDeletedFiles)) + for i := 0; i < len(toBeDeletedFiles); i++ { - sourceNames = append(sourceNames, toBeDeletedFiles[i].SourceName) + sourceNamesAll = append(sourceNamesAll, toBeDeletedFiles[i].SourceName) + if !(toBeDeletedFiles[i].UploadSessionID != nil && toBeDeletedFiles[i].Size == 0) { + sourceNamesDeleted = append(sourceNamesDeleted, toBeDeletedFiles[i].SourceName) + } else { + sourceNamesTryDeleted = append(sourceNamesTryDeleted, toBeDeletedFiles[i].SourceName) + } + + if toBeDeletedFiles[i].UploadSessionID != nil { + if session, ok := cache.Get(UploadSessionCachePrefix + *toBeDeletedFiles[i].UploadSessionID); ok { + uploadSession := session.(serializer.UploadSession) + if err := fs.Handler.CancelToken(ctx, &uploadSession); err != nil { + util.Log().Warning("无法取消 [%s] 的上传会话: %s", err) + } + + cache.Deletes([]string{*toBeDeletedFiles[i].UploadSessionID}, UploadSessionCachePrefix) + } + + } } // 切换上传策略 fs.Policy = toBeDeletedFiles[0].GetPolicy() err := fs.DispatchHandler() if err != nil { - failed[policyID] = sourceNames + failed[policyID] = sourceNamesAll continue } // 执行删除 - failedFile, _ := fs.Handler.Delete(ctx, sourceNames) + failedFile, _ := fs.Handler.Delete(ctx, sourceNamesDeleted) failed[policyID] = failedFile + // 尝试删除上传会话中大小为0的占位文件。如果失败也忽略 + fs.Handler.Delete(ctx, sourceNamesTryDeleted) } return failed @@ -208,7 +231,7 @@ func (fs *FileSystem) GroupFileByPolicy(ctx context.Context, files []model.File) // 如果已存在分组,直接追加 policyGroup[files[key].PolicyID] = append(file, &files[key]) } else { - // 分布不存在,创建 + // 分组不存在,创建 policyGroup[files[key].PolicyID] = make([]*model.File, 0) policyGroup[files[key].PolicyID] = append(policyGroup[files[key].PolicyID], &files[key]) } diff --git a/pkg/filesystem/manage.go b/pkg/filesystem/manage.go index d502f3a..b6e66ad 100644 --- a/pkg/filesystem/manage.go +++ b/pkg/filesystem/manage.go @@ -185,6 +185,7 @@ func (fs *FileSystem) Delete(ctx context.Context, dirs, files []uint, force bool } // 删除文件记录对应的分享记录 + // TODO 先取消分享再删除文件 model.DeleteShareBySourceIDs(deletedFileIDs, false) // 归还容量 diff --git a/routers/controllers/file.go b/routers/controllers/file.go index 129bd45..78c60b5 100644 --- a/routers/controllers/file.go +++ b/routers/controllers/file.go @@ -343,13 +343,38 @@ func FileUpload(c *gin.Context) { //}) } +// DeleteUploadCredential 删除上传会话 +func DeleteUploadCredential(c *gin.Context) { + // 创建上下文 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var service explorer.UploadSessionService + if err := c.ShouldBindUri(&service); err == nil { + res := service.Delete(ctx, c) + c.JSON(200, res) + } else { + c.JSON(200, ErrorResponse(err)) + } +} + +// DeleteAllCredential 删除全部上传会话 +func DeleteAllCredential(c *gin.Context) { + // 创建上下文 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + res := explorer.DeleteAllUploadSession(ctx, c) + c.JSON(200, res) +} + // GetUploadCredential 创建上传会话 func GetUploadCredential(c *gin.Context) { // 创建上下文 ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var service explorer.UploadSessionService + var service explorer.CreateUploadSessionService if err := c.ShouldBindJSON(&service); err == nil { res := service.Create(ctx, c) c.JSON(200, res) diff --git a/routers/router.go b/routers/router.go index 10e496d..9baba22 100644 --- a/routers/router.go +++ b/routers/router.go @@ -504,10 +504,18 @@ func InitMasterRouter() *gin.Engine { // 文件 file := auth.Group("file", middleware.HashID(hashid.FileID)) { - // 文件上传 - file.POST("upload/:sessionId/:index", controllers.FileUpload) - // 创建上传会话 - file.PUT("upload", controllers.GetUploadCredential) + // 上传 + upload := file.Group("upload") + { + // 文件上传 + upload.POST(":sessionId/:index", controllers.FileUpload) + // 创建上传会话 + upload.PUT("", controllers.GetUploadCredential) + // 删除给定上传会话 + upload.DELETE(":sessionId", controllers.DeleteUploadCredential) + // 删除全部上传会话 + upload.DELETE("", controllers.DeleteAllCredential) + } // 更新文件 file.PUT("update/:id", controllers.PutContent) // 创建空白文件 diff --git a/service/explorer/upload.go b/service/explorer/upload.go index ff31cce..e9f9b89 100644 --- a/service/explorer/upload.go +++ b/service/explorer/upload.go @@ -17,8 +17,8 @@ import ( "time" ) -// UploadSessionService 获取上传凭证服务 -type UploadSessionService struct { +// CreateUploadSessionService 获取上传凭证服务 +type CreateUploadSessionService struct { Path string `json:"path" binding:"required"` Size uint64 `json:"size" binding:"min=0"` Name string `json:"name" binding:"required"` @@ -27,7 +27,7 @@ type UploadSessionService struct { } // Create 创建新的上传会话 -func (service *UploadSessionService) Create(ctx context.Context, c *gin.Context) serializer.Response { +func (service *CreateUploadSessionService) Create(ctx context.Context, c *gin.Context) serializer.Response { // 创建文件系统 fs, err := filesystem.NewFileSystemFromContext(c) if err != nil { @@ -84,6 +84,10 @@ func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serial return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) } + if uploadSession.UID != fs.User.ID { + return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session expired or not exist", nil) + } + // 查找上传会话创建的占位文件 file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID) if err != nil { @@ -91,6 +95,10 @@ func (service *UploadService) Upload(ctx context.Context, c *gin.Context) serial } // 重设 fs 存储策略 + if !uploadSession.Policy.IsTransitUpload(uploadSession.Size) { + return serializer.Err(serializer.CodePolicyNotAllowed, "Storage policy not supported", err) + } + fs.Policy = &uploadSession.Policy if err := fs.DispatchHandler(); err != nil { return serializer.Err(serializer.CodePolicyNotAllowed, "Unknown storage policy", err) @@ -169,3 +177,55 @@ func processChunkUpload(ctx context.Context, c *gin.Context, fs *filesystem.File return serializer.Response{} } + +// UploadSessionService 上传会话服务 +type UploadSessionService struct { + ID string `uri:"sessionId" binding:"required"` +} + +// Delete 删除指定上传会话 +func (service *UploadSessionService) Delete(ctx context.Context, c *gin.Context) serializer.Response { + // 创建文件系统 + fs, err := filesystem.NewFileSystemFromContext(c) + if err != nil { + return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) + } + defer fs.Recycle() + + // 查找需要删除的上传会话的占位文件 + file, err := model.GetFilesByUploadSession(service.ID, fs.User.ID) + if err != nil { + return serializer.Err(serializer.CodeUploadSessionExpired, "Upload session file placeholder not exist", err) + } + + // 删除文件 + if err := fs.Delete(ctx, []uint{}, []uint{file.ID}, false); err != nil { + return serializer.Err(serializer.CodeInternalSetting, "Failed to delete upload session", err) + } + + return serializer.Response{} +} + +// DeleteAllUploadSession 删除当前用户的全部上传绘会话 +func DeleteAllUploadSession(ctx context.Context, c *gin.Context) serializer.Response { + // 创建文件系统 + fs, err := filesystem.NewFileSystemFromContext(c) + if err != nil { + return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) + } + defer fs.Recycle() + + // 查找需要删除的上传会话的占位文件 + files := model.GetUploadPlaceholderFiles(fs.User.ID) + fileIDs := make([]uint, len(files)) + for i, file := range files { + fileIDs[i] = file.ID + } + + // 删除文件 + if err := fs.Delete(ctx, []uint{}, fileIDs, false); err != nil { + return serializer.Err(serializer.CodeInternalSetting, "Failed to cleanup upload session", err) + } + + return serializer.Response{} +}