Cloudreve/pkg/aria2/monitor/monitor_test.go

253 lines
6.7 KiB
Go
Raw Normal View History

Feat: aria2 download and transfer in slave node (#1040) * Feat: retrieve nodes from data table * Feat: master node ping slave node in REST API * Feat: master send scheduled ping request * Feat: inactive nodes recover loop * Modify: remove database operations from aria2 RPC caller implementation * Feat: init aria2 client in master node * Feat: Round Robin load balancer * Feat: create and monitor aria2 task in master node * Feat: salve receive and handle heartbeat * Fix: Node ID will be 0 in download record generated in older version * Feat: sign request headers with all `X-` prefix * Feat: API call to slave node will carry meta data in headers * Feat: call slave aria2 rpc method from master * Feat: get slave aria2 task status Feat: encode slave response data using gob * Feat: aria2 callback to master node / cancel or select task to slave node * Fix: use dummy aria2 client when caller initialize failed in master node * Feat: slave aria2 status event callback / salve RPC auth * Feat: prototype for slave driven filesystem * Feat: retry for init aria2 client in master node * Feat: init request client with global options * Feat: slave receive async task from master * Fix: competition write in request header * Refactor: dependency initialize order * Feat: generic message queue implementation * Feat: message queue implementation * Feat: master waiting slave transfer result * Feat: slave transfer file in stateless policy * Feat: slave transfer file in slave policy * Feat: slave transfer file in local policy * Feat: slave transfer file in OneDrive policy * Fix: failed to initialize update checker http client * Feat: list slave nodes for dashboard * Feat: test aria2 rpc connection in slave * Feat: add and save node * Feat: add and delete node in node pool * Fix: temp file cannot be removed when aria2 task fails * Fix: delete node in admin panel * Feat: edit node and get node info * Modify: delete unused settings
2021-10-31 09:41:56 +08:00
package monitor
2020-02-06 13:53:47 +08:00
import (
"database/sql"
2020-02-06 13:53:47 +08:00
"errors"
"github.com/DATA-DOG/go-sqlmock"
model "github.com/cloudreve/Cloudreve/v3/models"
Feat: aria2 download and transfer in slave node (#1040) * Feat: retrieve nodes from data table * Feat: master node ping slave node in REST API * Feat: master send scheduled ping request * Feat: inactive nodes recover loop * Modify: remove database operations from aria2 RPC caller implementation * Feat: init aria2 client in master node * Feat: Round Robin load balancer * Feat: create and monitor aria2 task in master node * Feat: salve receive and handle heartbeat * Fix: Node ID will be 0 in download record generated in older version * Feat: sign request headers with all `X-` prefix * Feat: API call to slave node will carry meta data in headers * Feat: call slave aria2 rpc method from master * Feat: get slave aria2 task status Feat: encode slave response data using gob * Feat: aria2 callback to master node / cancel or select task to slave node * Fix: use dummy aria2 client when caller initialize failed in master node * Feat: slave aria2 status event callback / salve RPC auth * Feat: prototype for slave driven filesystem * Feat: retry for init aria2 client in master node * Feat: init request client with global options * Feat: slave receive async task from master * Fix: competition write in request header * Refactor: dependency initialize order * Feat: generic message queue implementation * Feat: message queue implementation * Feat: master waiting slave transfer result * Feat: slave transfer file in stateless policy * Feat: slave transfer file in slave policy * Feat: slave transfer file in local policy * Feat: slave transfer file in OneDrive policy * Fix: failed to initialize update checker http client * Feat: list slave nodes for dashboard * Feat: test aria2 rpc connection in slave * Feat: add and save node * Feat: add and delete node in node pool * Fix: temp file cannot be removed when aria2 task fails * Fix: delete node in admin panel * Feat: edit node and get node info * Modify: delete unused settings
2021-10-31 09:41:56 +08:00
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
"github.com/cloudreve/Cloudreve/v3/pkg/mocks"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
2020-02-06 13:53:47 +08:00
"github.com/jinzhu/gorm"
"github.com/stretchr/testify/assert"
testMock "github.com/stretchr/testify/mock"
"testing"
2020-02-06 13:53:47 +08:00
)
var mock sqlmock.Sqlmock
2020-02-06 13:53:47 +08:00
// TestMain 初始化数据库Mock
func TestMain(m *testing.M) {
var db *sql.DB
var err error
db, mock, err = sqlmock.New()
if err != nil {
panic("An error was not expected when opening a stub database connection")
2020-02-06 13:53:47 +08:00
}
model.DB, _ = gorm.Open("mysql", db)
defer db.Close()
m.Run()
2020-02-06 13:53:47 +08:00
}
func TestNewMonitor(t *testing.T) {
a := assert.New(t)
mockMQ := mq.NewMQ()
2020-02-06 13:53:47 +08:00
// node not available
2020-02-06 13:53:47 +08:00
{
mockPool := &mocks.NodePoolMock{}
mockPool.On("GetNodeByID", uint(1)).Return(nil)
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
task := &model.Download{
Model: gorm.Model{ID: 1},
}
NewMonitor(task, mockPool, mockMQ)
mockPool.AssertExpectations(t)
a.NoError(mock.ExpectationsWereMet())
a.NotEmpty(task.Error)
2020-02-06 13:53:47 +08:00
}
// success
2020-02-06 13:53:47 +08:00
{
mockNode := &mocks.NodeMock{}
mockNode.On("GetAria2Instance").Return(&common.DummyAria2{})
mockPool := &mocks.NodePoolMock{}
mockPool.On("GetNodeByID", uint(1)).Return(mockNode)
2020-02-06 13:53:47 +08:00
task := &model.Download{
Model: gorm.Model{ID: 1},
}
NewMonitor(task, mockPool, mockMQ)
mockNode.AssertExpectations(t)
mockPool.AssertExpectations(t)
2020-02-06 13:53:47 +08:00
}
}
2020-02-06 13:53:47 +08:00
func TestMonitor_Loop(t *testing.T) {
a := assert.New(t)
mockMQ := mq.NewMQ()
mockNode := &mocks.NodeMock{}
mockNode.On("GetAria2Instance").Return(&common.DummyAria2{})
m := &Monitor{
retried: MAX_RETRY,
node: mockNode,
Task: &model.Download{Model: gorm.Model{ID: 1}},
notifier: mockMQ.Subscribe("test", 1),
}
// into interval loop
2020-02-06 13:53:47 +08:00
{
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
m.Loop(mockMQ)
a.NoError(mock.ExpectationsWereMet())
a.NotEmpty(m.Task.Error)
2020-02-06 13:53:47 +08:00
}
// into notifier loop
2020-02-06 13:53:47 +08:00
{
m.Task.Error = ""
mockMQ.Publish("test", mq.Message{})
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
m.Loop(mockMQ)
a.NoError(mock.ExpectationsWereMet())
a.NotEmpty(m.Task.Error)
2020-02-06 13:53:47 +08:00
}
}
func TestMonitor_UpdateFailedAfterRetry(t *testing.T) {
a := assert.New(t)
mockNode := &mocks.NodeMock{}
mockNode.On("GetAria2Instance").Return(&common.DummyAria2{})
m := &Monitor{
node: mockNode,
Task: &model.Download{Model: gorm.Model{ID: 1}},
2020-02-06 13:53:47 +08:00
}
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
2020-02-06 13:53:47 +08:00
for i := 0; i < MAX_RETRY; i++ {
a.False(m.Update())
2020-02-06 13:53:47 +08:00
}
mockNode.AssertExpectations(t)
a.True(m.Update())
a.NoError(mock.ExpectationsWereMet())
a.NotEmpty(m.Task.Error)
2020-02-06 13:53:47 +08:00
}
func TestMonitor_UpdateMagentoFollow(t *testing.T) {
a := assert.New(t)
mockAria2 := &mocks.Aria2Mock{}
mockAria2.On("Status", testMock.Anything).Return(rpc.StatusInfo{
FollowedBy: []string{"next"},
}, nil)
mockNode := &mocks.NodeMock{}
mockNode.On("GetAria2Instance").Return(mockAria2)
m := &Monitor{
node: mockNode,
Task: &model.Download{Model: gorm.Model{ID: 1}},
}
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
a.False(m.Update())
a.NoError(mock.ExpectationsWereMet())
a.Equal("next", m.Task.GID)
mockAria2.AssertExpectations(t)
}
2020-02-06 13:53:47 +08:00
func TestMonitor_UpdateFailedToUpdateInfo(t *testing.T) {
a := assert.New(t)
mockAria2 := &mocks.Aria2Mock{}
mockAria2.On("Status", testMock.Anything).Return(rpc.StatusInfo{}, nil)
mockAria2.On("DeleteTempFile", testMock.Anything).Return(nil)
mockNode := &mocks.NodeMock{}
mockNode.On("GetAria2Instance").Return(mockAria2)
m := &Monitor{
node: mockNode,
Task: &model.Download{Model: gorm.Model{ID: 1}},
}
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnError(errors.New("error"))
mock.ExpectRollback()
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
a.True(m.Update())
a.NoError(mock.ExpectationsWereMet())
mockAria2.AssertExpectations(t)
mockNode.AssertExpectations(t)
a.NotEmpty(m.Task.Error)
2020-02-06 13:53:47 +08:00
}
func TestMonitor_UpdateCompleted(t *testing.T) {
a := assert.New(t)
mockAria2 := &mocks.Aria2Mock{}
mockAria2.On("Status", testMock.Anything).Return(rpc.StatusInfo{
Status: "complete",
}, nil)
mockAria2.On("DeleteTempFile", testMock.Anything).Return(nil)
mockNode := &mocks.NodeMock{}
mockNode.On("GetAria2Instance").Return(mockAria2)
mockNode.On("ID").Return(uint(1))
m := &Monitor{
node: mockNode,
Task: &model.Download{Model: gorm.Model{ID: 1}},
}
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
mock.ExpectQuery("SELECT(.+)users(.+)").WillReturnError(errors.New("error"))
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
a.True(m.Update())
a.NoError(mock.ExpectationsWereMet())
mockAria2.AssertExpectations(t)
mockNode.AssertExpectations(t)
a.NotEmpty(m.Task.Error)
}
2020-02-06 13:53:47 +08:00
func TestMonitor_UpdateError(t *testing.T) {
a := assert.New(t)
mockAria2 := &mocks.Aria2Mock{}
mockAria2.On("Status", testMock.Anything).Return(rpc.StatusInfo{
Status: "error",
ErrorMessage: "error",
}, nil)
mockAria2.On("DeleteTempFile", testMock.Anything).Return(nil)
mockNode := &mocks.NodeMock{}
mockNode.On("GetAria2Instance").Return(mockAria2)
m := &Monitor{
node: mockNode,
Task: &model.Download{Model: gorm.Model{ID: 1}},
}
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
a.True(m.Update())
a.NoError(mock.ExpectationsWereMet())
mockAria2.AssertExpectations(t)
mockNode.AssertExpectations(t)
a.NotEmpty(m.Task.Error)
}
2020-02-26 15:11:06 +08:00
func TestMonitor_UpdateActive(t *testing.T) {
a := assert.New(t)
mockAria2 := &mocks.Aria2Mock{}
mockAria2.On("Status", testMock.Anything).Return(rpc.StatusInfo{
Status: "active",
}, nil)
mockNode := &mocks.NodeMock{}
mockNode.On("GetAria2Instance").Return(mockAria2)
m := &Monitor{
node: mockNode,
Task: &model.Download{Model: gorm.Model{ID: 1}},
}
mock.ExpectBegin()
mock.ExpectExec("UPDATE(.+)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()
a.False(m.Update())
a.NoError(mock.ExpectationsWereMet())
mockAria2.AssertExpectations(t)
mockNode.AssertExpectations(t)
2020-02-06 13:53:47 +08:00
}