0
Fork 0
mirror of https://codeberg.org/forgejo/forgejo.git synced 2025-01-05 06:00:26 -05:00

Use queue instead of memory queue in webhook send service (#19390)

This commit is contained in:
Lunny Xiao 2022-04-26 02:03:01 +08:00 committed by GitHub
parent 257cea654c
commit 7c164d5a91
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 157 deletions

View file

@ -1,104 +0,0 @@
// Copyright 2016 The Gogs Authors. All rights reserved.
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package sync
// UniqueQueue is a queue which guarantees only one instance of same
// identity is in the line. Instances with same identity will be
// discarded if there is already one in the line.
//
// This queue is particularly useful for preventing duplicated task
// of same purpose.
type UniqueQueue struct {
table *StatusTable
queue chan string
closed chan struct{}
}
// NewUniqueQueue initializes and returns a new UniqueQueue object.
func NewUniqueQueue(queueLength int) *UniqueQueue {
if queueLength <= 0 {
queueLength = 100
}
return &UniqueQueue{
table: NewStatusTable(),
queue: make(chan string, queueLength),
closed: make(chan struct{}),
}
}
// Close closes this queue
func (q *UniqueQueue) Close() {
select {
case <-q.closed:
default:
q.table.lock.Lock()
select {
case <-q.closed:
default:
close(q.closed)
}
q.table.lock.Unlock()
}
}
// IsClosed returns a channel that is closed when this Queue is closed
func (q *UniqueQueue) IsClosed() <-chan struct{} {
return q.closed
}
// IDs returns the current ids in the pool
func (q *UniqueQueue) IDs() []string {
q.table.lock.Lock()
defer q.table.lock.Unlock()
ids := make([]string, 0, len(q.table.pool))
for id := range q.table.pool {
ids = append(ids, id)
}
return ids
}
// Queue returns channel of queue for retrieving instances.
func (q *UniqueQueue) Queue() <-chan string {
return q.queue
}
// Exist returns true if there is an instance with given identity
// exists in the queue.
func (q *UniqueQueue) Exist(id string) bool {
return q.table.IsRunning(id)
}
// AddFunc adds new instance to the queue with a custom runnable function,
// the queue is blocked until the function exits.
func (q *UniqueQueue) AddFunc(id string, fn func()) {
q.table.lock.Lock()
if _, ok := q.table.pool[id]; ok {
q.table.lock.Unlock()
return
}
q.table.pool[id] = struct{}{}
if fn != nil {
fn()
}
q.table.lock.Unlock()
select {
case <-q.closed:
return
case q.queue <- id:
return
}
}
// Add adds new instance to the queue.
func (q *UniqueQueue) Add(id string) {
q.AddFunc(id, nil)
}
// Remove removes instance from the queue.
func (q *UniqueQueue) Remove(id string) {
q.table.Stop(id)
}

View file

@ -9,10 +9,15 @@ import (
"testing" "testing"
"code.gitea.io/gitea/models/unittest" "code.gitea.io/gitea/models/unittest"
"code.gitea.io/gitea/modules/setting"
webhook_service "code.gitea.io/gitea/services/webhook"
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
setting.LoadForTest()
setting.NewQueueService()
unittest.MainTest(m, &unittest.TestOptions{ unittest.MainTest(m, &unittest.TestOptions{
GiteaRootPath: filepath.Join("..", "..", "..", ".."), GiteaRootPath: filepath.Join("..", "..", "..", ".."),
SetUp: webhook_service.Init,
}) })
} }

View file

@ -145,7 +145,7 @@ func GlobalInitInstalled(ctx context.Context) {
mustInit(stats_indexer.Init) mustInit(stats_indexer.Init)
mirror_service.InitSyncMirrors() mirror_service.InitSyncMirrors()
webhook.InitDeliverHooks() mustInit(webhook.Init)
mustInit(pull_service.Init) mustInit(pull_service.Init)
mustInit(task.Init) mustInit(task.Init)
mustInit(repo_migrations.Init) mustInit(repo_migrations.Init)

View file

@ -15,7 +15,6 @@ import (
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -26,6 +25,7 @@ import (
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process" "code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
"github.com/gobwas/glob" "github.com/gobwas/glob"
@ -202,10 +202,8 @@ func Deliver(ctx context.Context, t *webhook_model.HookTask) error {
return nil return nil
} }
// DeliverHooks checks and delivers undelivered hooks. // populateDeliverHooks checks and delivers undelivered hooks.
// FIXME: graceful: This would likely benefit from either a worker pool with dummy queue func populateDeliverHooks(ctx context.Context) {
// or a full queue. Then more hooks could be sent at same time.
func DeliverHooks(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -226,42 +224,9 @@ func DeliverHooks(ctx context.Context) {
return return
default: default:
} }
if err = Deliver(ctx, t); err != nil {
log.Error("deliver: %v", err)
}
}
// Start listening on new hook requests. if err := addToTask(t.RepoID); err != nil {
for { log.Error("DeliverHook failed [%d]: %v", t.RepoID, err)
select {
case <-ctx.Done():
hookQueue.Close()
return
case repoIDStr := <-hookQueue.Queue():
log.Trace("DeliverHooks [repo_id: %v]", repoIDStr)
hookQueue.Remove(repoIDStr)
repoID, err := strconv.ParseInt(repoIDStr, 10, 64)
if err != nil {
log.Error("Invalid repo ID: %s", repoIDStr)
continue
}
tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID)
if err != nil {
log.Error("Get repository [%d] hook tasks: %v", repoID, err)
continue
}
for _, t := range tasks {
select {
case <-ctx.Done():
return
default:
}
if err = Deliver(ctx, t); err != nil {
log.Error("deliver: %v", err)
}
}
} }
} }
} }
@ -297,8 +262,8 @@ func webhookProxy() func(req *http.Request) (*url.URL, error) {
} }
} }
// InitDeliverHooks starts the hooks delivery thread // Init starts the hooks delivery thread
func InitDeliverHooks() { func Init() error {
timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second timeout := time.Duration(setting.Webhook.DeliverTimeout) * time.Second
allowedHostListValue := setting.Webhook.AllowedHostList allowedHostListValue := setting.Webhook.AllowedHostList
@ -316,5 +281,13 @@ func InitDeliverHooks() {
}, },
} }
go graceful.GetManager().RunWithShutdownContext(DeliverHooks) hookQueue = queue.CreateUniqueQueue("webhook_sender", handle, "")
if hookQueue == nil {
return fmt.Errorf("Unable to create webhook_sender Queue")
}
go graceful.GetManager().RunWithShutdownFns(hookQueue.Run)
populateDeliverHooks(graceful.GetManager().HammerContext())
return nil
} }

View file

@ -9,12 +9,16 @@ import (
"testing" "testing"
"code.gitea.io/gitea/models/unittest" "code.gitea.io/gitea/models/unittest"
"code.gitea.io/gitea/modules/setting"
_ "code.gitea.io/gitea/models" _ "code.gitea.io/gitea/models"
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
setting.LoadForTest()
setting.NewQueueService()
unittest.MainTest(m, &unittest.TestOptions{ unittest.MainTest(m, &unittest.TestOptions{
GiteaRootPath: filepath.Join("..", ".."), GiteaRootPath: filepath.Join("..", ".."),
SetUp: Init,
}) })
} }

View file

@ -12,10 +12,11 @@ import (
repo_model "code.gitea.io/gitea/models/repo" repo_model "code.gitea.io/gitea/models/repo"
webhook_model "code.gitea.io/gitea/models/webhook" webhook_model "code.gitea.io/gitea/models/webhook"
"code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
api "code.gitea.io/gitea/modules/structs" api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
"github.com/gobwas/glob" "github.com/gobwas/glob"
@ -80,7 +81,7 @@ func IsValidHookTaskType(name string) bool {
} }
// hookQueue is a global queue of web hooks // hookQueue is a global queue of web hooks
var hookQueue = sync.NewUniqueQueue(setting.Webhook.QueueLength) var hookQueue queue.UniqueQueue
// getPayloadBranch returns branch for hook event, if applicable. // getPayloadBranch returns branch for hook event, if applicable.
func getPayloadBranch(p api.Payloader) string { func getPayloadBranch(p api.Payloader) string {
@ -101,14 +102,47 @@ func getPayloadBranch(p api.Payloader) string {
return "" return ""
} }
// handle passed PR IDs and test the PRs
func handle(data ...queue.Data) []queue.Data {
for _, datum := range data {
repoIDStr := datum.(string)
log.Trace("DeliverHooks [repo_id: %v]", repoIDStr)
repoID, err := strconv.ParseInt(repoIDStr, 10, 64)
if err != nil {
log.Error("Invalid repo ID: %s", repoIDStr)
continue
}
tasks, err := webhook_model.FindRepoUndeliveredHookTasks(repoID)
if err != nil {
log.Error("Get repository [%d] hook tasks: %v", repoID, err)
continue
}
for _, t := range tasks {
if err = Deliver(graceful.GetManager().HammerContext(), t); err != nil {
log.Error("deliver: %v", err)
}
}
}
return nil
}
func addToTask(repoID int64) error {
err := hookQueue.PushFunc(strconv.FormatInt(repoID, 10), nil)
if err != nil && err != queue.ErrAlreadyInQueue {
return err
}
return nil
}
// PrepareWebhook adds special webhook to task queue for given payload. // PrepareWebhook adds special webhook to task queue for given payload.
func PrepareWebhook(w *webhook_model.Webhook, repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error { func PrepareWebhook(w *webhook_model.Webhook, repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error {
if err := prepareWebhook(w, repo, event, p); err != nil { if err := prepareWebhook(w, repo, event, p); err != nil {
return err return err
} }
go hookQueue.Add(strconv.FormatInt(repo.ID, 10)) return addToTask(repo.ID)
return nil
} }
func checkBranch(w *webhook_model.Webhook, branch string) bool { func checkBranch(w *webhook_model.Webhook, branch string) bool {
@ -188,8 +222,7 @@ func PrepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventT
return err return err
} }
go hookQueue.Add(strconv.FormatInt(repo.ID, 10)) return addToTask(repo.ID)
return nil
} }
func prepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error { func prepareWebhooks(repo *repo_model.Repository, event webhook_model.HookEventType, p api.Payloader) error {
@ -240,7 +273,5 @@ func ReplayHookTask(w *webhook_model.Webhook, uuid string) error {
return err return err
} }
go hookQueue.Add(strconv.FormatInt(t.RepoID, 10)) return addToTask(t.RepoID)
return nil
} }