Feat: task for importing file from existing filesystem
This commit is contained in:
parent
36e5b31f73
commit
9eeb4b6d19
4 changed files with 247 additions and 0 deletions
216
pkg/task/import.go
Normal file
216
pkg/task/import.go
Normal file
|
@ -0,0 +1,216 @@
|
||||||
|
package task
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
model "github.com/HFO4/cloudreve/models"
|
||||||
|
"github.com/HFO4/cloudreve/pkg/filesystem"
|
||||||
|
"github.com/HFO4/cloudreve/pkg/filesystem/driver/local"
|
||||||
|
"github.com/HFO4/cloudreve/pkg/filesystem/fsctx"
|
||||||
|
"github.com/HFO4/cloudreve/pkg/util"
|
||||||
|
"path"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ImportTask 导入务
|
||||||
|
type ImportTask struct {
|
||||||
|
User *model.User
|
||||||
|
TaskModel *model.Task
|
||||||
|
TaskProps ImportProps
|
||||||
|
Err *JobError
|
||||||
|
}
|
||||||
|
|
||||||
|
// ImportProps 导入任务属性
|
||||||
|
type ImportProps struct {
|
||||||
|
PolicyID uint `json:"policy_id"` // 存储策略ID
|
||||||
|
Src string `json:"src"` // 原始路径
|
||||||
|
Recursive bool `json:"is_recursive"` // 是否递归导入
|
||||||
|
Dst string `json:"dst"` // 目的目录
|
||||||
|
}
|
||||||
|
|
||||||
|
// Props 获取任务属性
|
||||||
|
func (job *ImportTask) Props() string {
|
||||||
|
res, _ := json.Marshal(job.TaskProps)
|
||||||
|
return string(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type 获取任务状态
|
||||||
|
func (job *ImportTask) Type() int {
|
||||||
|
return ImportTaskType
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creator 获取创建者ID
|
||||||
|
func (job *ImportTask) Creator() uint {
|
||||||
|
return job.User.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Model 获取任务的数据库模型
|
||||||
|
func (job *ImportTask) Model() *model.Task {
|
||||||
|
return job.TaskModel
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetStatus 设定状态
|
||||||
|
func (job *ImportTask) SetStatus(status int) {
|
||||||
|
job.TaskModel.SetStatus(status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetError 设定任务失败信息
|
||||||
|
func (job *ImportTask) SetError(err *JobError) {
|
||||||
|
job.Err = err
|
||||||
|
res, _ := json.Marshal(job.Err)
|
||||||
|
job.TaskModel.SetError(string(res))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetErrorMsg 设定任务失败信息
|
||||||
|
func (job *ImportTask) SetErrorMsg(msg string, err error) {
|
||||||
|
jobErr := &JobError{Msg: msg}
|
||||||
|
if err != nil {
|
||||||
|
jobErr.Error = err.Error()
|
||||||
|
}
|
||||||
|
job.SetError(jobErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetError 返回任务失败信息
|
||||||
|
func (job *ImportTask) GetError() *JobError {
|
||||||
|
return job.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do 开始执行任务
|
||||||
|
func (job *ImportTask) Do() {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// 查找存储策略
|
||||||
|
policy, err := model.GetPolicyByID(job.TaskProps.PolicyID)
|
||||||
|
if err != nil {
|
||||||
|
job.SetErrorMsg("找不到存储策略", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建文件系统
|
||||||
|
job.User.Policy = policy
|
||||||
|
fs, err := filesystem.NewFileSystem(job.User)
|
||||||
|
if err != nil {
|
||||||
|
job.SetErrorMsg(err.Error(), nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer fs.Recycle()
|
||||||
|
|
||||||
|
// 注册钩子
|
||||||
|
fs.Use("BeforeAddFile", filesystem.HookValidateFile)
|
||||||
|
fs.Use("BeforeAddFile", filesystem.HookValidateCapacity)
|
||||||
|
fs.Use("AfterValidateFailed", filesystem.HookGiveBackCapacity)
|
||||||
|
|
||||||
|
// 列取目录、对象
|
||||||
|
job.TaskModel.SetProgress(ListingProgress)
|
||||||
|
coxIgnoreConflict := context.WithValue(context.Background(), fsctx.IgnoreConflictCtx,
|
||||||
|
true)
|
||||||
|
objects, err := fs.Handler.List(ctx, job.TaskProps.Src, job.TaskProps.Recursive)
|
||||||
|
if err != nil {
|
||||||
|
job.SetErrorMsg("无法列取文件", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
job.TaskModel.SetProgress(InsertingProgress)
|
||||||
|
|
||||||
|
// 虚拟目录路径与folder对象ID的对应
|
||||||
|
pathCache := make(map[string]*model.Folder, len(objects))
|
||||||
|
|
||||||
|
// 插入目录记录到用户文件系统
|
||||||
|
for _, object := range objects {
|
||||||
|
if object.IsDir {
|
||||||
|
// 创建目录
|
||||||
|
virtualPath := path.Join(job.TaskProps.Dst, object.RelativePath)
|
||||||
|
folder, err := fs.CreateDirectory(coxIgnoreConflict, virtualPath)
|
||||||
|
if err != nil {
|
||||||
|
util.Log().Warning("导入任务无法创建用户目录[%s], %s", virtualPath, err)
|
||||||
|
} else if folder.ID > 0 {
|
||||||
|
pathCache[virtualPath] = folder
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 插入文件记录到用户文件系统
|
||||||
|
for _, object := range objects {
|
||||||
|
if !object.IsDir {
|
||||||
|
// 创建文件信息
|
||||||
|
virtualPath := path.Dir(path.Join(job.TaskProps.Dst, object.RelativePath))
|
||||||
|
fileHeader := local.FileStream{
|
||||||
|
Size: object.Size,
|
||||||
|
VirtualPath: virtualPath,
|
||||||
|
Name: object.Name,
|
||||||
|
}
|
||||||
|
addFileCtx := context.WithValue(ctx, fsctx.FileHeaderCtx, fileHeader)
|
||||||
|
addFileCtx = context.WithValue(addFileCtx, fsctx.SavePathCtx, object.Source)
|
||||||
|
|
||||||
|
// 查找父目录
|
||||||
|
parentFolder := &model.Folder{}
|
||||||
|
if parent, ok := pathCache[virtualPath]; ok {
|
||||||
|
parentFolder = parent
|
||||||
|
} else {
|
||||||
|
if exist, folder := fs.IsPathExist(virtualPath); exist {
|
||||||
|
parentFolder = folder
|
||||||
|
} else {
|
||||||
|
util.Log().Warning("导入任务无法创插入文件[%s], 父目录不存在",
|
||||||
|
object.RelativePath)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 插入文件记录
|
||||||
|
_, err := fs.AddFile(addFileCtx, parentFolder)
|
||||||
|
if err != nil {
|
||||||
|
util.Log().Warning("导入任务无法创插入文件[%s], %s",
|
||||||
|
object.RelativePath, err)
|
||||||
|
if err == filesystem.ErrInsufficientCapacity {
|
||||||
|
job.SetErrorMsg("容量不足", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewImportTask 新建导入任务
|
||||||
|
func NewImportTask(user, policy uint, src, dst string, recursive bool) (Job, error) {
|
||||||
|
creator, err := model.GetActiveUserByID(user)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
newTask := &ImportTask{
|
||||||
|
User: &creator,
|
||||||
|
TaskProps: ImportProps{
|
||||||
|
PolicyID: policy,
|
||||||
|
Recursive: recursive,
|
||||||
|
Src: src,
|
||||||
|
Dst: dst,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
record, err := Record(newTask)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
newTask.TaskModel = record
|
||||||
|
|
||||||
|
return newTask, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewImportTaskFromModel 从数据库记录中恢复导入任务
|
||||||
|
func NewImportTaskFromModel(task *model.Task) (Job, error) {
|
||||||
|
user, err := model.GetActiveUserByID(task.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
newTask := &ImportTask{
|
||||||
|
User: &user,
|
||||||
|
TaskModel: task,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal([]byte(task.Props), &newTask.TaskProps)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return newTask, nil
|
||||||
|
}
|
|
@ -13,6 +13,8 @@ const (
|
||||||
DecompressTaskType
|
DecompressTaskType
|
||||||
// TransferTaskType 中转任务
|
// TransferTaskType 中转任务
|
||||||
TransferTaskType
|
TransferTaskType
|
||||||
|
// ImportTaskType 导入任务
|
||||||
|
ImportTaskType
|
||||||
)
|
)
|
||||||
|
|
||||||
// 任务状态
|
// 任务状态
|
||||||
|
@ -41,6 +43,10 @@ const (
|
||||||
DownloadingProgress
|
DownloadingProgress
|
||||||
// Transferring 转存中
|
// Transferring 转存中
|
||||||
TransferringProgress
|
TransferringProgress
|
||||||
|
// ListingProgress 索引中
|
||||||
|
ListingProgress
|
||||||
|
// InsertingProgress 插入中
|
||||||
|
InsertingProgress
|
||||||
)
|
)
|
||||||
|
|
||||||
// Job 任务接口
|
// Job 任务接口
|
||||||
|
@ -103,6 +109,8 @@ func GetJobFromModel(task *model.Task) (Job, error) {
|
||||||
return NewDecompressTaskFromModel(task)
|
return NewDecompressTaskFromModel(task)
|
||||||
case TransferTaskType:
|
case TransferTaskType:
|
||||||
return NewTransferTaskFromModel(task)
|
return NewTransferTaskFromModel(task)
|
||||||
|
case ImportTaskType:
|
||||||
|
return NewImportTaskFromModel(task)
|
||||||
default:
|
default:
|
||||||
return nil, ErrUnknownTaskType
|
return nil, ErrUnknownTaskType
|
||||||
}
|
}
|
||||||
|
|
|
@ -387,6 +387,8 @@ func InitMasterRouter() *gin.Engine {
|
||||||
task.POST("list", controllers.AdminListTask)
|
task.POST("list", controllers.AdminListTask)
|
||||||
// 删除
|
// 删除
|
||||||
task.POST("delete", controllers.AdminDeleteTask)
|
task.POST("delete", controllers.AdminDeleteTask)
|
||||||
|
// 新建文件导入任务
|
||||||
|
task.POST("import", controllers.AdminCreateImportTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package admin
|
||||||
import (
|
import (
|
||||||
model "github.com/HFO4/cloudreve/models"
|
model "github.com/HFO4/cloudreve/models"
|
||||||
"github.com/HFO4/cloudreve/pkg/serializer"
|
"github.com/HFO4/cloudreve/pkg/serializer"
|
||||||
|
"github.com/HFO4/cloudreve/pkg/task"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
@ -12,6 +13,26 @@ type TaskBatchService struct {
|
||||||
ID []uint `json:"id" binding:"min=1"`
|
ID []uint `json:"id" binding:"min=1"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ImportTaskService 导入任务
|
||||||
|
type ImportTaskService struct {
|
||||||
|
UID uint `json:"uid" binding:"required"`
|
||||||
|
PolicyID uint `json:"policy_id" binding:"required"`
|
||||||
|
Src string `json:"src" binding:"required,min=1,max=65535"`
|
||||||
|
Dst string `json:"dst" binding:"required,min=1,max=65535"`
|
||||||
|
Recursive bool `json:"recursive"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create 新建导入任务
|
||||||
|
func (service *ImportTaskService) Create(c *gin.Context, user *model.User) serializer.Response {
|
||||||
|
// 创建任务
|
||||||
|
job, err := task.NewImportTask(service.UID, service.PolicyID, service.Src, service.Dst, service.Recursive)
|
||||||
|
if err != nil {
|
||||||
|
return serializer.Err(serializer.CodeNotSet, "任务创建失败", err)
|
||||||
|
}
|
||||||
|
task.TaskPoll.Submit(job)
|
||||||
|
return serializer.Response{}
|
||||||
|
}
|
||||||
|
|
||||||
// Delete 删除任务
|
// Delete 删除任务
|
||||||
func (service *TaskBatchService) Delete(c *gin.Context) serializer.Response {
|
func (service *TaskBatchService) Delete(c *gin.Context) serializer.Response {
|
||||||
if err := model.DB.Where("id in (?)", service.ID).Delete(&model.Download{}).Error; err != nil {
|
if err := model.DB.Where("id in (?)", service.ID).Delete(&model.Download{}).Error; err != nil {
|
||||||
|
|
Loading…
Add table
Reference in a new issue