Feat: RWMutex / reload for aira2
This commit is contained in:
parent
48659f6952
commit
c1d2b933aa
11 changed files with 72 additions and 38 deletions
|
@ -24,7 +24,7 @@ func Init(path string) {
|
||||||
if conf.SystemConfig.Mode == "master" {
|
if conf.SystemConfig.Mode == "master" {
|
||||||
model.Init()
|
model.Init()
|
||||||
task.Init()
|
task.Init()
|
||||||
aria2.Init()
|
aria2.Init(false)
|
||||||
email.Init()
|
email.Init()
|
||||||
crontab.Init()
|
crontab.Init()
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ type GroupOption struct {
|
||||||
ShareDownload bool `json:"share_download,omitempty"`
|
ShareDownload bool `json:"share_download,omitempty"`
|
||||||
ShareFree bool `json:"share_free,omitempty"`
|
ShareFree bool `json:"share_free,omitempty"`
|
||||||
Aria2 bool `json:"aria2,omitempty"` // 离线下载
|
Aria2 bool `json:"aria2,omitempty"` // 离线下载
|
||||||
Aria2Options []interface{} `json:"aria2_options,omitempty"` // 离线下载用户组配置
|
Aria2Options map[string]interface{} `json:"aria2_options,omitempty"` // 离线下载用户组配置
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetGroupByID 用ID获取用户组
|
// GetGroupByID 用ID获取用户组
|
||||||
|
|
|
@ -140,10 +140,11 @@ Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; verti
|
||||||
{Name: "gravatar_server", Value: `https://gravatar.loli.net/`, Type: "avatar"},
|
{Name: "gravatar_server", Value: `https://gravatar.loli.net/`, Type: "avatar"},
|
||||||
{Name: "defaultTheme", Value: `#3f51b5`, Type: "basic"},
|
{Name: "defaultTheme", Value: `#3f51b5`, Type: "basic"},
|
||||||
{Name: "themes", Value: `{"#3f51b5":{"palette":{"primary":{"main":"#3f51b5"},"secondary":{"main":"#f50057"}}},"#2196f3":{"palette":{"primary":{"main":"#2196f3"},"secondary":{"main":"#FFC107"}}},"#673AB7":{"palette":{"primary":{"main":"#673AB7"},"secondary":{"main":"#2196F3"}}},"#E91E63":{"palette":{"primary":{"main":"#E91E63"},"secondary":{"main":"#42A5F5","contrastText":"#fff"}}},"#FF5722":{"palette":{"primary":{"main":"#FF5722"},"secondary":{"main":"#3F51B5"}}},"#FFC107":{"palette":{"primary":{"main":"#FFC107"},"secondary":{"main":"#26C6DA"}}},"#8BC34A":{"palette":{"primary":{"main":"#8BC34A","contrastText":"#fff"},"secondary":{"main":"#FF8A65","contrastText":"#fff"}}},"#009688":{"palette":{"primary":{"main":"#009688"},"secondary":{"main":"#4DD0E1","contrastText":"#fff"}}},"#607D8B":{"palette":{"primary":{"main":"#607D8B"},"secondary":{"main":"#F06292"}}},"#795548":{"palette":{"primary":{"main":"#795548"},"secondary":{"main":"#4CAF50","contrastText":"#fff"}}}}`, Type: "basic"},
|
{Name: "themes", Value: `{"#3f51b5":{"palette":{"primary":{"main":"#3f51b5"},"secondary":{"main":"#f50057"}}},"#2196f3":{"palette":{"primary":{"main":"#2196f3"},"secondary":{"main":"#FFC107"}}},"#673AB7":{"palette":{"primary":{"main":"#673AB7"},"secondary":{"main":"#2196F3"}}},"#E91E63":{"palette":{"primary":{"main":"#E91E63"},"secondary":{"main":"#42A5F5","contrastText":"#fff"}}},"#FF5722":{"palette":{"primary":{"main":"#FF5722"},"secondary":{"main":"#3F51B5"}}},"#FFC107":{"palette":{"primary":{"main":"#FFC107"},"secondary":{"main":"#26C6DA"}}},"#8BC34A":{"palette":{"primary":{"main":"#8BC34A","contrastText":"#fff"},"secondary":{"main":"#FF8A65","contrastText":"#fff"}}},"#009688":{"palette":{"primary":{"main":"#009688"},"secondary":{"main":"#4DD0E1","contrastText":"#fff"}}},"#607D8B":{"palette":{"primary":{"main":"#607D8B"},"secondary":{"main":"#F06292"}}},"#795548":{"palette":{"primary":{"main":"#795548"},"secondary":{"main":"#4CAF50","contrastText":"#fff"}}}}`, Type: "basic"},
|
||||||
{Name: "aria2_token", Value: `your token`, Type: "aria2"},
|
{Name: "aria2_token", Value: ``, Type: "aria2"},
|
||||||
|
{Name: "aria2_rpcurl", Value: ``, Type: "aria2"},
|
||||||
{Name: "aria2_temp_path", Value: ``, Type: "aria2"},
|
{Name: "aria2_temp_path", Value: ``, Type: "aria2"},
|
||||||
{Name: "aria2_options", Value: `[]`, Type: "aria2"},
|
{Name: "aria2_options", Value: `{}`, Type: "aria2"},
|
||||||
{Name: "aria2_interval", Value: `10`, Type: "aria2"},
|
{Name: "aria2_interval", Value: `60`, Type: "aria2"},
|
||||||
{Name: "max_worker_num", Value: `10`, Type: "task"},
|
{Name: "max_worker_num", Value: `10`, Type: "task"},
|
||||||
{Name: "max_parallel_transfer", Value: `4`, Type: "task"},
|
{Name: "max_parallel_transfer", Value: `4`, Type: "task"},
|
||||||
{Name: "secret_key", Value: util.RandStringRunes(256), Type: "auth"},
|
{Name: "secret_key", Value: util.RandStringRunes(256), Type: "auth"},
|
||||||
|
|
|
@ -7,18 +7,22 @@ import (
|
||||||
"github.com/HFO4/cloudreve/pkg/serializer"
|
"github.com/HFO4/cloudreve/pkg/serializer"
|
||||||
"github.com/HFO4/cloudreve/pkg/util"
|
"github.com/HFO4/cloudreve/pkg/util"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Instance 默认使用的Aria2处理实例
|
// Instance 默认使用的Aria2处理实例
|
||||||
var Instance Aria2 = &DummyAria2{}
|
var Instance Aria2 = &DummyAria2{}
|
||||||
|
|
||||||
|
// Lock Instance的读写锁
|
||||||
|
var Lock sync.RWMutex
|
||||||
|
|
||||||
// EventNotifier 任务状态更新通知处理器
|
// EventNotifier 任务状态更新通知处理器
|
||||||
var EventNotifier = &Notifier{}
|
var EventNotifier = &Notifier{}
|
||||||
|
|
||||||
// Aria2 离线下载处理接口
|
// Aria2 离线下载处理接口
|
||||||
type Aria2 interface {
|
type Aria2 interface {
|
||||||
// CreateTask 创建新的任务
|
// CreateTask 创建新的任务
|
||||||
CreateTask(task *model.Download, options []interface{}) error
|
CreateTask(task *model.Download, options map[string]interface{}) error
|
||||||
// 返回状态信息
|
// 返回状态信息
|
||||||
Status(task *model.Download) (rpc.StatusInfo, error)
|
Status(task *model.Download) (rpc.StatusInfo, error)
|
||||||
// 取消任务
|
// 取消任务
|
||||||
|
@ -63,7 +67,7 @@ type DummyAria2 struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateTask 创建新任务,此处直接返回未开启错误
|
// CreateTask 创建新任务,此处直接返回未开启错误
|
||||||
func (instance *DummyAria2) CreateTask(model *model.Download, options []interface{}) error {
|
func (instance *DummyAria2) CreateTask(model *model.Download, options map[string]interface{}) error {
|
||||||
return ErrNotEnabled
|
return ErrNotEnabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,7 +87,16 @@ func (instance *DummyAria2) Select(task *model.Download, files []int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init 初始化
|
// Init 初始化
|
||||||
func Init() {
|
func Init(isReload bool) {
|
||||||
|
Lock.Lock()
|
||||||
|
defer Lock.Unlock()
|
||||||
|
|
||||||
|
// 关闭上个初始连接
|
||||||
|
if previousClient, ok := Instance.(*RPCService); ok {
|
||||||
|
util.Log().Debug("关闭上个 aria2 连接")
|
||||||
|
previousClient.caller.Close()
|
||||||
|
}
|
||||||
|
|
||||||
options := model.GetSettingByNames("aria2_rpcurl", "aria2_token", "aria2_options")
|
options := model.GetSettingByNames("aria2_rpcurl", "aria2_token", "aria2_options")
|
||||||
timeout := model.GetIntSetting("aria2_call_timeout", 5)
|
timeout := model.GetIntSetting("aria2_call_timeout", 5)
|
||||||
if options["aria2_rpcurl"] == "" {
|
if options["aria2_rpcurl"] == "" {
|
||||||
|
@ -93,9 +106,6 @@ func Init() {
|
||||||
|
|
||||||
util.Log().Info("初始化 aria2 RPC 服务[%s]", options["aria2_rpcurl"])
|
util.Log().Info("初始化 aria2 RPC 服务[%s]", options["aria2_rpcurl"])
|
||||||
client := &RPCService{}
|
client := &RPCService{}
|
||||||
if previousClient, ok := Instance.(*RPCService); ok {
|
|
||||||
client = previousClient
|
|
||||||
}
|
|
||||||
|
|
||||||
// 解析RPC服务地址
|
// 解析RPC服务地址
|
||||||
server, err := url.Parse(options["aria2_rpcurl"])
|
server, err := url.Parse(options["aria2_rpcurl"])
|
||||||
|
@ -107,7 +117,7 @@ func Init() {
|
||||||
server.Path = "/jsonrpc"
|
server.Path = "/jsonrpc"
|
||||||
|
|
||||||
// 加载自定义下载配置
|
// 加载自定义下载配置
|
||||||
var globalOptions []interface{}
|
var globalOptions map[string]interface{}
|
||||||
err = json.Unmarshal([]byte(options["aria2_options"]), &globalOptions)
|
err = json.Unmarshal([]byte(options["aria2_options"]), &globalOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.Log().Warning("无法解析 aria2 全局配置,%s", err)
|
util.Log().Warning("无法解析 aria2 全局配置,%s", err)
|
||||||
|
@ -123,6 +133,7 @@ func Init() {
|
||||||
|
|
||||||
Instance = client
|
Instance = client
|
||||||
|
|
||||||
|
if !isReload {
|
||||||
// 从数据库中读取未完成任务,创建监控
|
// 从数据库中读取未完成任务,创建监控
|
||||||
unfinished := model.GetDownloadsByStatus(Ready, Paused, Downloading)
|
unfinished := model.GetDownloadsByStatus(Ready, Paused, Downloading)
|
||||||
|
|
||||||
|
@ -132,6 +143,8 @@ func Init() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// getStatus 将给定的状态字符串转换为状态标识数字
|
// getStatus 将给定的状态字符串转换为状态标识数字
|
||||||
func getStatus(status string) int {
|
func getStatus(status string) int {
|
||||||
switch status {
|
switch status {
|
||||||
|
|
|
@ -45,14 +45,14 @@ func TestInit(t *testing.T) {
|
||||||
// 未指定RPC地址,跳过
|
// 未指定RPC地址,跳过
|
||||||
{
|
{
|
||||||
cache.Set("setting_aria2_rpcurl", "", 0)
|
cache.Set("setting_aria2_rpcurl", "", 0)
|
||||||
Init()
|
Init(false)
|
||||||
asserts.IsType(&DummyAria2{}, Instance)
|
asserts.IsType(&DummyAria2{}, Instance)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 无法解析服务器地址
|
// 无法解析服务器地址
|
||||||
{
|
{
|
||||||
cache.Set("setting_aria2_rpcurl", string(byte(0x7f)), 0)
|
cache.Set("setting_aria2_rpcurl", string(byte(0x7f)), 0)
|
||||||
Init()
|
Init(false)
|
||||||
asserts.IsType(&DummyAria2{}, Instance)
|
asserts.IsType(&DummyAria2{}, Instance)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ func TestInit(t *testing.T) {
|
||||||
Instance = &RPCService{}
|
Instance = &RPCService{}
|
||||||
cache.Set("setting_aria2_options", "?", 0)
|
cache.Set("setting_aria2_options", "?", 0)
|
||||||
cache.Set("setting_aria2_rpcurl", "ws://127.0.0.1:1234", 0)
|
cache.Set("setting_aria2_rpcurl", "ws://127.0.0.1:1234", 0)
|
||||||
Init()
|
Init(false)
|
||||||
asserts.IsType(&DummyAria2{}, Instance)
|
asserts.IsType(&DummyAria2{}, Instance)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ func TestInit(t *testing.T) {
|
||||||
cache.Set("setting_aria2_call_timeout", "1", 0)
|
cache.Set("setting_aria2_call_timeout", "1", 0)
|
||||||
cache.Set("setting_aria2_interval", "100", 0)
|
cache.Set("setting_aria2_interval", "100", 0)
|
||||||
mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"g_id"}).AddRow("1"))
|
mock.ExpectQuery("SELECT(.+)").WillReturnRows(sqlmock.NewRows([]string{"g_id"}).AddRow("1"))
|
||||||
Init()
|
Init(false)
|
||||||
asserts.NoError(mock.ExpectationsWereMet())
|
asserts.NoError(mock.ExpectationsWereMet())
|
||||||
asserts.IsType(&RPCService{}, Instance)
|
asserts.IsType(&RPCService{}, Instance)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,11 @@ type RPCService struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientOptions struct {
|
type clientOptions struct {
|
||||||
Options []interface{} // 创建下载时额外添加的设置
|
Options map[string]interface{} // 创建下载时额外添加的设置
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init 初始化
|
// Init 初始化
|
||||||
func (client *RPCService) Init(server, secret string, timeout int, options []interface{}) error {
|
func (client *RPCService) Init(server, secret string, timeout int, options map[string]interface{}) error {
|
||||||
// 客户端已存在,则关闭先前连接
|
// 客户端已存在,则关闭先前连接
|
||||||
if client.caller != nil {
|
if client.caller != nil {
|
||||||
client.caller.Close()
|
client.caller.Close()
|
||||||
|
@ -84,7 +84,7 @@ func (client *RPCService) Select(task *model.Download, files []int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateTask 创建新任务
|
// CreateTask 创建新任务
|
||||||
func (client *RPCService) CreateTask(task *model.Download, groupOptions []interface{}) error {
|
func (client *RPCService) CreateTask(task *model.Download, groupOptions map[string]interface{}) error {
|
||||||
// 生成存储路径
|
// 生成存储路径
|
||||||
path := filepath.Join(
|
path := filepath.Join(
|
||||||
model.GetSettingByName("aria2_temp_path"),
|
model.GetSettingByName("aria2_temp_path"),
|
||||||
|
@ -93,13 +93,17 @@ func (client *RPCService) CreateTask(task *model.Download, groupOptions []interf
|
||||||
)
|
)
|
||||||
|
|
||||||
// 创建下载任务
|
// 创建下载任务
|
||||||
options := []interface{}{map[string]string{"dir": path}}
|
options := map[string]interface{}{
|
||||||
if len(client.options.Options) > 0 {
|
"dir": path,
|
||||||
options = append(options, client.options.Options...)
|
}
|
||||||
|
for k, v := range client.options.Options {
|
||||||
|
options[k] = v
|
||||||
|
}
|
||||||
|
for k, v := range groupOptions {
|
||||||
|
options[k] = v
|
||||||
}
|
}
|
||||||
options = append(options, groupOptions...)
|
|
||||||
|
|
||||||
gid, err := client.caller.AddURI(task.Source, options...)
|
gid, err := client.caller.AddURI(task.Source, options)
|
||||||
if err != nil || gid == "" {
|
if err != nil || gid == "" {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,6 @@ func TestRPCService_CreateTask(t *testing.T) {
|
||||||
caller := &RPCService{}
|
caller := &RPCService{}
|
||||||
asserts.NoError(caller.Init("http://127.0.0.1", "", 1, nil))
|
asserts.NoError(caller.Init("http://127.0.0.1", "", 1, nil))
|
||||||
cache.Set("setting_aria2_temp_path", "test", 0)
|
cache.Set("setting_aria2_temp_path", "test", 0)
|
||||||
err := caller.CreateTask(&model.Download{Parent: "test"}, []interface{}{map[string]string{"1": "1"}})
|
err := caller.CreateTask(&model.Download{Parent: "test"}, map[string]interface{}{"1": "1"})
|
||||||
asserts.Error(err)
|
asserts.Error(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,7 +69,10 @@ func (monitor *Monitor) Loop() {
|
||||||
|
|
||||||
// Update 更新状态,返回值表示是否退出监控
|
// Update 更新状态,返回值表示是否退出监控
|
||||||
func (monitor *Monitor) Update() bool {
|
func (monitor *Monitor) Update() bool {
|
||||||
|
Lock.RLock()
|
||||||
status, err := Instance.Status(monitor.Task)
|
status, err := Instance.Status(monitor.Task)
|
||||||
|
Lock.RUnlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
monitor.retried++
|
monitor.retried++
|
||||||
util.Log().Warning("无法获取下载任务[%s]的状态,%s", monitor.Task.GID, err)
|
util.Log().Warning("无法获取下载任务[%s]的状态,%s", monitor.Task.GID, err)
|
||||||
|
@ -160,7 +163,9 @@ func (monitor *Monitor) UpdateTaskInfo(status rpc.StatusInfo) error {
|
||||||
// 文件大小更新后,对文件限制等进行校验
|
// 文件大小更新后,对文件限制等进行校验
|
||||||
if err := monitor.ValidateFile(); err != nil {
|
if err := monitor.ValidateFile(); err != nil {
|
||||||
// 验证失败时取消任务
|
// 验证失败时取消任务
|
||||||
|
Lock.RLock()
|
||||||
Instance.Cancel(monitor.Task)
|
Instance.Cancel(monitor.Task)
|
||||||
|
Lock.RUnlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package controllers
|
package controllers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/HFO4/cloudreve/pkg/aria2"
|
||||||
"github.com/HFO4/cloudreve/pkg/email"
|
"github.com/HFO4/cloudreve/pkg/email"
|
||||||
"github.com/HFO4/cloudreve/pkg/request"
|
"github.com/HFO4/cloudreve/pkg/request"
|
||||||
"github.com/HFO4/cloudreve/pkg/serializer"
|
"github.com/HFO4/cloudreve/pkg/serializer"
|
||||||
|
@ -66,6 +67,8 @@ func AdminReloadService(c *gin.Context) {
|
||||||
switch service {
|
switch service {
|
||||||
case "email":
|
case "email":
|
||||||
email.Init()
|
email.Init()
|
||||||
|
case "aria2":
|
||||||
|
aria2.Init(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.JSON(200, serializer.Response{})
|
c.JSON(200, serializer.Response{})
|
||||||
|
|
|
@ -41,9 +41,13 @@ func (service *AddURLService) Add(c *gin.Context, taskType int) serializer.Respo
|
||||||
UserID: fs.User.ID,
|
UserID: fs.User.ID,
|
||||||
Source: service.URL,
|
Source: service.URL,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
aria2.Lock.RLock()
|
||||||
if err := aria2.Instance.CreateTask(task, fs.User.Group.OptionsSerialized.Aria2Options); err != nil {
|
if err := aria2.Instance.CreateTask(task, fs.User.Group.OptionsSerialized.Aria2Options); err != nil {
|
||||||
|
aria2.Lock.RUnlock()
|
||||||
return serializer.Err(serializer.CodeNotSet, "任务创建失败", err)
|
return serializer.Err(serializer.CodeNotSet, "任务创建失败", err)
|
||||||
}
|
}
|
||||||
|
aria2.Lock.RUnlock()
|
||||||
|
|
||||||
return serializer.Response{}
|
return serializer.Response{}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,8 @@ func (service *DownloadTaskService) Delete(c *gin.Context) serializer.Response {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 取消任务
|
// 取消任务
|
||||||
|
aria2.Lock.RLock()
|
||||||
|
defer aria2.Lock.RUnlock()
|
||||||
if err := aria2.Instance.Cancel(download); err != nil {
|
if err := aria2.Instance.Cancel(download); err != nil {
|
||||||
return serializer.Err(serializer.CodeNotSet, "操作失败", err)
|
return serializer.Err(serializer.CodeNotSet, "操作失败", err)
|
||||||
}
|
}
|
||||||
|
@ -75,6 +77,8 @@ func (service *SelectFileService) Select(c *gin.Context) serializer.Response {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 选取下载
|
// 选取下载
|
||||||
|
aria2.Lock.RLock()
|
||||||
|
defer aria2.Lock.RUnlock()
|
||||||
if err := aria2.Instance.Select(download, service.Indexes); err != nil {
|
if err := aria2.Instance.Select(download, service.Indexes); err != nil {
|
||||||
return serializer.Err(serializer.CodeNotSet, "操作失败", err)
|
return serializer.Err(serializer.CodeNotSet, "操作失败", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue