From 96b84bb5e581cc37ca37547e0a319257a4132851 Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Sat, 20 Nov 2021 17:14:45 +0800 Subject: [PATCH] Test: tasks pkg --- pkg/task/compress_test.go | 2 +- pkg/task/job.go | 4 ++-- pkg/task/job_test.go | 36 ++++++++++++++++++++++++++++++++-- pkg/task/pool.go | 2 +- pkg/task/pool_test.go | 4 ++-- pkg/task/slavetask/transfer.go | 4 ++-- pkg/task/transfer_test.go | 4 ++-- 7 files changed, 44 insertions(+), 12 deletions(-) diff --git a/pkg/task/compress_test.go b/pkg/task/compress_test.go index 681b9fb..34b282d 100644 --- a/pkg/task/compress_test.go +++ b/pkg/task/compress_test.go @@ -144,7 +144,7 @@ func TestCompressTask_Do(t *testing.T) { task.Do() asserts.NoError(mock.ExpectationsWereMet()) asserts.NotEmpty(task.GetError().Msg) - asserts.True(util.IsEmpty("test/compress")) + asserts.True(util.IsEmpty(util.RelativePath("test/compress"))) } } diff --git a/pkg/task/job.go b/pkg/task/job.go index 064b078..781c460 100644 --- a/pkg/task/job.go +++ b/pkg/task/job.go @@ -82,7 +82,7 @@ func Record(job Job) (*model.Task, error) { } // Resume 从数据库中恢复未完成任务 -func Resume() { +func Resume(p Pool) { tasks := model.GetTasksByStatus(Queued, Processing) if len(tasks) == 0 { return @@ -97,7 +97,7 @@ func Resume() { } if job != nil { - TaskPoll.Submit(job) + p.Submit(job) } } } diff --git a/pkg/task/job_test.go b/pkg/task/job_test.go index 432bbb3..81793ee 100644 --- a/pkg/task/job_test.go +++ b/pkg/task/job_test.go @@ -2,6 +2,7 @@ package task import ( "errors" + testMock "github.com/stretchr/testify/mock" "testing" "github.com/DATA-DOG/go-sqlmock" @@ -21,15 +22,46 @@ func TestRecord(t *testing.T) { asserts.NoError(err) } +type taskPoolMock struct { + testMock.Mock +} + +func (t taskPoolMock) Add(num int) { + t.Called(num) +} + +func (t taskPoolMock) Submit(job Job) { + t.Called(job) +} + func TestResume(t *testing.T) { asserts := assert.New(t) + mockPool := taskPoolMock{} // 没有任务 { - mock.ExpectQuery("SELECT(.+)").WithArgs(Queued).WillReturnRows(sqlmock.NewRows([]string{"type"})) - Resume() + mock.ExpectQuery("SELECT(.+)").WithArgs(Queued, Processing).WillReturnRows(sqlmock.NewRows([]string{"type"})) + Resume(mockPool) asserts.NoError(mock.ExpectationsWereMet()) } + + // 有任务, 类型未知 + { + mock.ExpectQuery("SELECT(.+)").WithArgs(Queued, Processing).WillReturnRows(sqlmock.NewRows([]string{"type"}).AddRow(233)) + Resume(mockPool) + asserts.NoError(mock.ExpectationsWereMet()) + } + + // 有任务 + { + mockPool.On("Submit", testMock.Anything) + mock.ExpectQuery("SELECT(.+)").WithArgs(Queued, Processing).WillReturnRows(sqlmock.NewRows([]string{"type", "props"}).AddRow(CompressTaskType, "{}")) + mock.ExpectQuery("SELECT(.+)users").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1)) + mock.ExpectQuery("SELECT(.+)policies").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1)) + Resume(mockPool) + asserts.NoError(mock.ExpectationsWereMet()) + mockPool.AssertExpectations(t) + } } func TestGetJobFromModel(t *testing.T) { diff --git a/pkg/task/pool.go b/pkg/task/pool.go index 3188d7e..53e94a5 100644 --- a/pkg/task/pool.go +++ b/pkg/task/pool.go @@ -63,6 +63,6 @@ func Init() { util.Log().Info("初始化任务队列,WorkerNum = %d", maxWorker) if conf.SystemConfig.Mode == "master" { - Resume() + Resume(TaskPoll) } } diff --git a/pkg/task/pool_test.go b/pkg/task/pool_test.go index 5b7f74e..fbe4134 100644 --- a/pkg/task/pool_test.go +++ b/pkg/task/pool_test.go @@ -29,10 +29,10 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { asserts := assert.New(t) cache.Set("setting_max_worker_num", "10", 0) - mock.ExpectQuery("SELECT(.+)").WithArgs(Queued).WillReturnRows(sqlmock.NewRows([]string{"type"}).AddRow(-1)) + mock.ExpectQuery("SELECT(.+)").WithArgs(Queued, Processing).WillReturnRows(sqlmock.NewRows([]string{"type"}).AddRow(-1)) Init() asserts.NoError(mock.ExpectationsWereMet()) - asserts.Len(TaskPoll.idleWorker, 10) + asserts.Len(TaskPoll.(*AsyncPool).idleWorker, 10) } func TestPool_Submit(t *testing.T) { diff --git a/pkg/task/slavetask/transfer.go b/pkg/task/slavetask/transfer.go index 9231092..c467794 100644 --- a/pkg/task/slavetask/transfer.go +++ b/pkg/task/slavetask/transfer.go @@ -69,7 +69,7 @@ func (job *TransferTask) SetErrorMsg(msg string, err error) { } if err := cluster.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), notifyMsg); err != nil { - util.Log().Warning("无法发送转存失败通知到从机, ", err) + util.Log().Warning("无法发送转存失败通知到从机, %s", err) } } @@ -132,7 +132,7 @@ func (job *TransferTask) Do() { } if err := cluster.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil { - util.Log().Warning("无法发送转存成功通知到从机, ", err) + util.Log().Warning("无法发送转存成功通知到从机, %s", err) } } diff --git a/pkg/task/transfer_test.go b/pkg/task/transfer_test.go index 5d2c9c5..94301f6 100644 --- a/pkg/task/transfer_test.go +++ b/pkg/task/transfer_test.go @@ -138,7 +138,7 @@ func TestNewTransferTask(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT(.+)").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit() - job, err := NewTransferTask(1, []string{}, "/", "/", false) + job, err := NewTransferTask(1, []string{}, "/", "/", false, 0, nil) asserts.NoError(mock.ExpectationsWereMet()) asserts.NotNil(job) asserts.NoError(err) @@ -150,7 +150,7 @@ func TestNewTransferTask(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT(.+)").WillReturnError(errors.New("error")) mock.ExpectRollback() - job, err := NewTransferTask(1, []string{}, "/", "/", false) + job, err := NewTransferTask(1, []string{}, "/", "/", false, 0, nil) asserts.NoError(mock.ExpectationsWereMet()) asserts.Nil(job) asserts.Error(err)