0
Fork 0
mirror of https://codeberg.org/forgejo/forgejo.git synced 2025-01-25 15:49:35 -05:00
forgejo/services/migrations/github.go
wxiaoguang 6f9c278559
Rewrite queue (#24505)
# ⚠️ Breaking

Many deprecated queue config options are removed (actually, they should
have been removed in 1.18/1.19).

If you see the fatal message when starting Gitea: "Please update your
app.ini to remove deprecated config options", please follow the error
messages to remove these options from your app.ini.

Example:

```
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].ISSUE_INDEXER_QUEUE_TYPE`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [E] Removed queue option: `[indexer].UPDATE_BUFFER_LEN`. Use new options in `[queue.issue_indexer]`
2023/05/06 19:39:22 [F] Please update your app.ini to remove deprecated config options
```

Many options in `[queue]` are are dropped, including:
`WRAP_IF_NECESSARY`, `MAX_ATTEMPTS`, `TIMEOUT`, `WORKERS`,
`BLOCK_TIMEOUT`, `BOOST_TIMEOUT`, `BOOST_WORKERS`, they can be removed
from app.ini.

# The problem

The old queue package has some legacy problems:

* complexity: I doubt few people could tell how it works.
* maintainability: Too many channels and mutex/cond are mixed together,
too many different structs/interfaces depends each other.
* stability: due to the complexity & maintainability, sometimes there
are strange bugs and difficult to debug, and some code doesn't have test
(indeed some code is difficult to test because a lot of things are mixed
together).
* general applicability: although it is called "queue", its behavior is
not a well-known queue.
* scalability: it doesn't seem easy to make it work with a cluster
without breaking its behaviors.

It came from some very old code to "avoid breaking", however, its
technical debt is too heavy now. It's a good time to introduce a better
"queue" package.

# The new queue package

It keeps using old config and concept as much as possible.

* It only contains two major kinds of concepts:
    * The "base queue": channel, levelqueue, redis
* They have the same abstraction, the same interface, and they are
tested by the same testing code.
* The "WokerPoolQueue", it uses the "base queue" to provide "worker
pool" function, calls the "handler" to process the data in the base
queue.
* The new code doesn't do "PushBack"
* Think about a queue with many workers, the "PushBack" can't guarantee
the order for re-queued unhandled items, so in new code it just does
"normal push"
* The new code doesn't do "pause/resume"
* The "pause/resume" was designed to handle some handler's failure: eg:
document indexer (elasticsearch) is down
* If a queue is paused for long time, either the producers blocks or the
new items are dropped.
* The new code doesn't do such "pause/resume" trick, it's not a common
queue's behavior and it doesn't help much.
* If there are unhandled items, the "push" function just blocks for a
few seconds and then re-queue them and retry.
* The new code doesn't do "worker booster"
* Gitea's queue's handlers are light functions, the cost is only the
go-routine, so it doesn't make sense to "boost" them.
* The new code only use "max worker number" to limit the concurrent
workers.
* The new "Push" never blocks forever
* Instead of creating more and more blocking goroutines, return an error
is more friendly to the server and to the end user.

There are more details in code comments: eg: the "Flush" problem, the
strange "code.index" hanging problem, the "immediate" queue problem.

Almost ready for review.

TODO:

* [x] add some necessary comments during review
* [x] add some more tests if necessary
* [x] update documents and config options
* [x] test max worker / active worker
* [x] re-run the CI tasks to see whether any test is flaky
* [x] improve the `handleOldLengthConfiguration` to provide more
friendly messages
* [x] fine tune default config values (eg: length?)

## Code coverage:

![image](https://user-images.githubusercontent.com/2114189/236620635-55576955-f95d-4810-b12f-879026a3afdf.png)
2023-05-08 19:49:59 +08:00

889 lines
25 KiB
Go

// Copyright 2019 The Gitea Authors. All rights reserved.
// Copyright 2018 Jonas Franz. All rights reserved.
// SPDX-License-Identifier: MIT
package migrations
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
base "code.gitea.io/gitea/modules/migration"
"code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/structs"
"github.com/google/go-github/v51/github"
"golang.org/x/oauth2"
)
var (
_ base.Downloader = &GithubDownloaderV3{}
_ base.DownloaderFactory = &GithubDownloaderV3Factory{}
// GithubLimitRateRemaining limit to wait for new rate to apply
GithubLimitRateRemaining = 0
)
func init() {
RegisterDownloaderFactory(&GithubDownloaderV3Factory{})
}
// GithubDownloaderV3Factory defines a github downloader v3 factory
type GithubDownloaderV3Factory struct{}
// New returns a Downloader related to this factory according MigrateOptions
func (f *GithubDownloaderV3Factory) New(ctx context.Context, opts base.MigrateOptions) (base.Downloader, error) {
u, err := url.Parse(opts.CloneAddr)
if err != nil {
return nil, err
}
baseURL := u.Scheme + "://" + u.Host
fields := strings.Split(u.Path, "/")
oldOwner := fields[1]
oldName := strings.TrimSuffix(fields[2], ".git")
log.Trace("Create github downloader BaseURL: %s %s/%s", baseURL, oldOwner, oldName)
return NewGithubDownloaderV3(ctx, baseURL, opts.AuthUsername, opts.AuthPassword, opts.AuthToken, oldOwner, oldName), nil
}
// GitServiceType returns the type of git service
func (f *GithubDownloaderV3Factory) GitServiceType() structs.GitServiceType {
return structs.GithubService
}
// GithubDownloaderV3 implements a Downloader interface to get repository information
// from github via APIv3
type GithubDownloaderV3 struct {
base.NullDownloader
ctx context.Context
clients []*github.Client
baseURL string
repoOwner string
repoName string
userName string
password string
rates []*github.Rate
curClientIdx int
maxPerPage int
SkipReactions bool
SkipReviews bool
}
// NewGithubDownloaderV3 creates a github Downloader via github v3 API
func NewGithubDownloaderV3(ctx context.Context, baseURL, userName, password, token, repoOwner, repoName string) *GithubDownloaderV3 {
downloader := GithubDownloaderV3{
userName: userName,
baseURL: baseURL,
password: password,
ctx: ctx,
repoOwner: repoOwner,
repoName: repoName,
maxPerPage: 100,
}
if token != "" {
tokens := strings.Split(token, ",")
for _, token := range tokens {
token = strings.TrimSpace(token)
ts := oauth2.StaticTokenSource(
&oauth2.Token{AccessToken: token},
)
client := &http.Client{
Transport: &oauth2.Transport{
Base: NewMigrationHTTPTransport(),
Source: oauth2.ReuseTokenSource(nil, ts),
},
}
downloader.addClient(client, baseURL)
}
} else {
transport := NewMigrationHTTPTransport()
transport.Proxy = func(req *http.Request) (*url.URL, error) {
req.SetBasicAuth(userName, password)
return proxy.Proxy()(req)
}
client := &http.Client{
Transport: transport,
}
downloader.addClient(client, baseURL)
}
return &downloader
}
// String implements Stringer
func (g *GithubDownloaderV3) String() string {
return fmt.Sprintf("migration from github server %s %s/%s", g.baseURL, g.repoOwner, g.repoName)
}
// ColorFormat provides a basic color format for a GithubDownloader
func (g *GithubDownloaderV3) ColorFormat(s fmt.State) {
if g == nil {
log.ColorFprintf(s, "<nil: GithubDownloaderV3>")
return
}
log.ColorFprintf(s, "migration from github server %s %s/%s", g.baseURL, g.repoOwner, g.repoName)
}
func (g *GithubDownloaderV3) addClient(client *http.Client, baseURL string) {
githubClient := github.NewClient(client)
if baseURL != "https://github.com" {
githubClient, _ = github.NewEnterpriseClient(baseURL, baseURL, client)
}
g.clients = append(g.clients, githubClient)
g.rates = append(g.rates, nil)
}
// SetContext set context
func (g *GithubDownloaderV3) SetContext(ctx context.Context) {
g.ctx = ctx
}
func (g *GithubDownloaderV3) waitAndPickClient() {
var recentIdx int
var maxRemaining int
for i := 0; i < len(g.clients); i++ {
if g.rates[i] != nil && g.rates[i].Remaining > maxRemaining {
maxRemaining = g.rates[i].Remaining
recentIdx = i
}
}
g.curClientIdx = recentIdx // if no max remain, it will always pick the first client.
for g.rates[g.curClientIdx] != nil && g.rates[g.curClientIdx].Remaining <= GithubLimitRateRemaining {
timer := time.NewTimer(time.Until(g.rates[g.curClientIdx].Reset.Time))
select {
case <-g.ctx.Done():
timer.Stop()
return
case <-timer.C:
}
err := g.RefreshRate()
if err != nil {
log.Error("g.getClient().RateLimits: %s", err)
}
}
}
// RefreshRate update the current rate (doesn't count in rate limit)
func (g *GithubDownloaderV3) RefreshRate() error {
rates, _, err := g.getClient().RateLimits(g.ctx)
if err != nil {
// if rate limit is not enabled, ignore it
if strings.Contains(err.Error(), "404") {
g.setRate(nil)
return nil
}
return err
}
g.setRate(rates.GetCore())
return nil
}
func (g *GithubDownloaderV3) getClient() *github.Client {
return g.clients[g.curClientIdx]
}
func (g *GithubDownloaderV3) setRate(rate *github.Rate) {
g.rates[g.curClientIdx] = rate
}
// GetRepoInfo returns a repository information
func (g *GithubDownloaderV3) GetRepoInfo() (*base.Repository, error) {
g.waitAndPickClient()
gr, resp, err := g.getClient().Repositories.Get(g.ctx, g.repoOwner, g.repoName)
if err != nil {
return nil, err
}
g.setRate(&resp.Rate)
// convert github repo to stand Repo
return &base.Repository{
Owner: g.repoOwner,
Name: gr.GetName(),
IsPrivate: gr.GetPrivate(),
Description: gr.GetDescription(),
OriginalURL: gr.GetHTMLURL(),
CloneURL: gr.GetCloneURL(),
DefaultBranch: gr.GetDefaultBranch(),
}, nil
}
// GetTopics return github topics
func (g *GithubDownloaderV3) GetTopics() ([]string, error) {
g.waitAndPickClient()
r, resp, err := g.getClient().Repositories.Get(g.ctx, g.repoOwner, g.repoName)
if err != nil {
return nil, err
}
g.setRate(&resp.Rate)
return r.Topics, nil
}
// GetMilestones returns milestones
func (g *GithubDownloaderV3) GetMilestones() ([]*base.Milestone, error) {
perPage := g.maxPerPage
milestones := make([]*base.Milestone, 0, perPage)
for i := 1; ; i++ {
g.waitAndPickClient()
ms, resp, err := g.getClient().Issues.ListMilestones(g.ctx, g.repoOwner, g.repoName,
&github.MilestoneListOptions{
State: "all",
ListOptions: github.ListOptions{
Page: i,
PerPage: perPage,
},
})
if err != nil {
return nil, err
}
g.setRate(&resp.Rate)
for _, m := range ms {
state := "open"
if m.State != nil {
state = *m.State
}
milestones = append(milestones, &base.Milestone{
Title: m.GetTitle(),
Description: m.GetDescription(),
Deadline: convertGithubTimestampToTime(m.DueOn),
State: state,
Created: m.GetCreatedAt().Time,
Updated: convertGithubTimestampToTime(m.UpdatedAt),
Closed: convertGithubTimestampToTime(m.ClosedAt),
})
}
if len(ms) < perPage {
break
}
}
return milestones, nil
}
func convertGithubLabel(label *github.Label) *base.Label {
return &base.Label{
Name: label.GetName(),
Color: label.GetColor(),
Description: label.GetDescription(),
}
}
// GetLabels returns labels
func (g *GithubDownloaderV3) GetLabels() ([]*base.Label, error) {
perPage := g.maxPerPage
labels := make([]*base.Label, 0, perPage)
for i := 1; ; i++ {
g.waitAndPickClient()
ls, resp, err := g.getClient().Issues.ListLabels(g.ctx, g.repoOwner, g.repoName,
&github.ListOptions{
Page: i,
PerPage: perPage,
})
if err != nil {
return nil, err
}
g.setRate(&resp.Rate)
for _, label := range ls {
labels = append(labels, convertGithubLabel(label))
}
if len(ls) < perPage {
break
}
}
return labels, nil
}
func (g *GithubDownloaderV3) convertGithubRelease(rel *github.RepositoryRelease) *base.Release {
// GitHub allows commitish to be a reference.
// In this case, we need to remove the prefix, i.e. convert "refs/heads/main" to "main".
targetCommitish := strings.TrimPrefix(rel.GetTargetCommitish(), git.BranchPrefix)
r := &base.Release{
Name: rel.GetName(),
TagName: rel.GetTagName(),
TargetCommitish: targetCommitish,
Draft: rel.GetDraft(),
Prerelease: rel.GetPrerelease(),
Created: rel.GetCreatedAt().Time,
PublisherID: rel.GetAuthor().GetID(),
PublisherName: rel.GetAuthor().GetLogin(),
PublisherEmail: rel.GetAuthor().GetEmail(),
Body: rel.GetBody(),
}
if rel.PublishedAt != nil {
r.Published = rel.PublishedAt.Time
}
httpClient := NewMigrationHTTPClient()
for _, asset := range rel.Assets {
assetID := *asset.ID // Don't optimize this, for closure we need a local variable
r.Assets = append(r.Assets, &base.ReleaseAsset{
ID: asset.GetID(),
Name: asset.GetName(),
ContentType: asset.ContentType,
Size: asset.Size,
DownloadCount: asset.DownloadCount,
Created: asset.CreatedAt.Time,
Updated: asset.UpdatedAt.Time,
DownloadFunc: func() (io.ReadCloser, error) {
g.waitAndPickClient()
readCloser, redirectURL, err := g.getClient().Repositories.DownloadReleaseAsset(g.ctx, g.repoOwner, g.repoName, assetID, nil)
if err != nil {
return nil, err
}
if err := g.RefreshRate(); err != nil {
log.Error("g.getClient().RateLimits: %s", err)
}
if readCloser != nil {
return readCloser, nil
}
if redirectURL == "" {
return nil, fmt.Errorf("no release asset found for %d", assetID)
}
// Prevent open redirect
if !hasBaseURL(redirectURL, g.baseURL) &&
!hasBaseURL(redirectURL, "https://objects.githubusercontent.com/") {
WarnAndNotice("Unexpected AssetURL for assetID[%d] in %s: %s", asset.GetID(), g, redirectURL)
return io.NopCloser(strings.NewReader(redirectURL)), nil
}
g.waitAndPickClient()
req, err := http.NewRequestWithContext(g.ctx, "GET", redirectURL, nil)
if err != nil {
return nil, err
}
resp, err := httpClient.Do(req)
err1 := g.RefreshRate()
if err1 != nil {
log.Error("g.RefreshRate(): %s", err1)
}
if err != nil {
return nil, err
}
return resp.Body, nil
},
})
}
return r
}
// GetReleases returns releases
func (g *GithubDownloaderV3) GetReleases() ([]*base.Release, error) {
perPage := g.maxPerPage
releases := make([]*base.Release, 0, perPage)
for i := 1; ; i++ {
g.waitAndPickClient()
ls, resp, err := g.getClient().Repositories.ListReleases(g.ctx, g.repoOwner, g.repoName,
&github.ListOptions{
Page: i,
PerPage: perPage,
})
if err != nil {
return nil, err
}
g.setRate(&resp.Rate)
for _, release := range ls {
releases = append(releases, g.convertGithubRelease(release))
}
if len(ls) < perPage {
break
}
}
return releases, nil
}
// GetIssues returns issues according start and limit
func (g *GithubDownloaderV3) GetIssues(page, perPage int) ([]*base.Issue, bool, error) {
if perPage > g.maxPerPage {
perPage = g.maxPerPage
}
opt := &github.IssueListByRepoOptions{
Sort: "created",
Direction: "asc",
State: "all",
ListOptions: github.ListOptions{
PerPage: perPage,
Page: page,
},
}
allIssues := make([]*base.Issue, 0, perPage)
g.waitAndPickClient()
issues, resp, err := g.getClient().Issues.ListByRepo(g.ctx, g.repoOwner, g.repoName, opt)
if err != nil {
return nil, false, fmt.Errorf("error while listing repos: %w", err)
}
log.Trace("Request get issues %d/%d, but in fact get %d", perPage, page, len(issues))
g.setRate(&resp.Rate)
for _, issue := range issues {
if issue.IsPullRequest() {
continue
}
labels := make([]*base.Label, 0, len(issue.Labels))
for _, l := range issue.Labels {
labels = append(labels, convertGithubLabel(l))
}
// get reactions
var reactions []*base.Reaction
if !g.SkipReactions {
for i := 1; ; i++ {
g.waitAndPickClient()
res, resp, err := g.getClient().Reactions.ListIssueReactions(g.ctx, g.repoOwner, g.repoName, issue.GetNumber(), &github.ListOptions{
Page: i,
PerPage: perPage,
})
if err != nil {
return nil, false, err
}
g.setRate(&resp.Rate)
if len(res) == 0 {
break
}
for _, reaction := range res {
reactions = append(reactions, &base.Reaction{
UserID: reaction.User.GetID(),
UserName: reaction.User.GetLogin(),
Content: reaction.GetContent(),
})
}
}
}
var assignees []string
for i := range issue.Assignees {
assignees = append(assignees, issue.Assignees[i].GetLogin())
}
allIssues = append(allIssues, &base.Issue{
Title: *issue.Title,
Number: int64(*issue.Number),
PosterID: issue.GetUser().GetID(),
PosterName: issue.GetUser().GetLogin(),
PosterEmail: issue.GetUser().GetEmail(),
Content: issue.GetBody(),
Milestone: issue.GetMilestone().GetTitle(),
State: issue.GetState(),
Created: issue.GetCreatedAt().Time,
Updated: issue.GetUpdatedAt().Time,
Labels: labels,
Reactions: reactions,
Closed: &issue.ClosedAt.Time,
IsLocked: issue.GetLocked(),
Assignees: assignees,
ForeignIndex: int64(*issue.Number),
})
}
return allIssues, len(issues) < perPage, nil
}
// SupportGetRepoComments return true if it supports get repo comments
func (g *GithubDownloaderV3) SupportGetRepoComments() bool {
return true
}
// GetComments returns comments according issueNumber
func (g *GithubDownloaderV3) GetComments(commentable base.Commentable) ([]*base.Comment, bool, error) {
comments, err := g.getComments(commentable)
return comments, false, err
}
func (g *GithubDownloaderV3) getComments(commentable base.Commentable) ([]*base.Comment, error) {
var (
allComments = make([]*base.Comment, 0, g.maxPerPage)
created = "created"
asc = "asc"
)
opt := &github.IssueListCommentsOptions{
Sort: &created,
Direction: &asc,
ListOptions: github.ListOptions{
PerPage: g.maxPerPage,
},
}
for {
g.waitAndPickClient()
comments, resp, err := g.getClient().Issues.ListComments(g.ctx, g.repoOwner, g.repoName, int(commentable.GetForeignIndex()), opt)
if err != nil {
return nil, fmt.Errorf("error while listing repos: %w", err)
}
g.setRate(&resp.Rate)
for _, comment := range comments {
// get reactions
var reactions []*base.Reaction
if !g.SkipReactions {
for i := 1; ; i++ {
g.waitAndPickClient()
res, resp, err := g.getClient().Reactions.ListIssueCommentReactions(g.ctx, g.repoOwner, g.repoName, comment.GetID(), &github.ListOptions{
Page: i,
PerPage: g.maxPerPage,
})
if err != nil {
return nil, err
}
g.setRate(&resp.Rate)
if len(res) == 0 {
break
}
for _, reaction := range res {
reactions = append(reactions, &base.Reaction{
UserID: reaction.User.GetID(),
UserName: reaction.User.GetLogin(),
Content: reaction.GetContent(),
})
}
}
}
allComments = append(allComments, &base.Comment{
IssueIndex: commentable.GetLocalIndex(),
Index: comment.GetID(),
PosterID: comment.GetUser().GetID(),
PosterName: comment.GetUser().GetLogin(),
PosterEmail: comment.GetUser().GetEmail(),
Content: comment.GetBody(),
Created: comment.GetCreatedAt().Time,
Updated: comment.GetUpdatedAt().Time,
Reactions: reactions,
})
}
if resp.NextPage == 0 {
break
}
opt.Page = resp.NextPage
}
return allComments, nil
}
// GetAllComments returns repository comments according page and perPageSize
func (g *GithubDownloaderV3) GetAllComments(page, perPage int) ([]*base.Comment, bool, error) {
var (
allComments = make([]*base.Comment, 0, perPage)
created = "created"
asc = "asc"
)
if perPage > g.maxPerPage {
perPage = g.maxPerPage
}
opt := &github.IssueListCommentsOptions{
Sort: &created,
Direction: &asc,
ListOptions: github.ListOptions{
Page: page,
PerPage: perPage,
},
}
g.waitAndPickClient()
comments, resp, err := g.getClient().Issues.ListComments(g.ctx, g.repoOwner, g.repoName, 0, opt)
if err != nil {
return nil, false, fmt.Errorf("error while listing repos: %w", err)
}
isEnd := resp.NextPage == 0
log.Trace("Request get comments %d/%d, but in fact get %d, next page is %d", perPage, page, len(comments), resp.NextPage)
g.setRate(&resp.Rate)
for _, comment := range comments {
// get reactions
var reactions []*base.Reaction
if !g.SkipReactions {
for i := 1; ; i++ {
g.waitAndPickClient()
res, resp, err := g.getClient().Reactions.ListIssueCommentReactions(g.ctx, g.repoOwner, g.repoName, comment.GetID(), &github.ListOptions{
Page: i,
PerPage: g.maxPerPage,
})
if err != nil {
return nil, false, err
}
g.setRate(&resp.Rate)
if len(res) == 0 {
break
}
for _, reaction := range res {
reactions = append(reactions, &base.Reaction{
UserID: reaction.User.GetID(),
UserName: reaction.User.GetLogin(),
Content: reaction.GetContent(),
})
}
}
}
idx := strings.LastIndex(*comment.IssueURL, "/")
issueIndex, _ := strconv.ParseInt((*comment.IssueURL)[idx+1:], 10, 64)
allComments = append(allComments, &base.Comment{
IssueIndex: issueIndex,
Index: comment.GetID(),
PosterID: comment.GetUser().GetID(),
PosterName: comment.GetUser().GetLogin(),
PosterEmail: comment.GetUser().GetEmail(),
Content: comment.GetBody(),
Created: comment.GetCreatedAt().Time,
Updated: comment.GetUpdatedAt().Time,
Reactions: reactions,
})
}
return allComments, isEnd, nil
}
// GetPullRequests returns pull requests according page and perPage
func (g *GithubDownloaderV3) GetPullRequests(page, perPage int) ([]*base.PullRequest, bool, error) {
if perPage > g.maxPerPage {
perPage = g.maxPerPage
}
opt := &github.PullRequestListOptions{
Sort: "created",
Direction: "asc",
State: "all",
ListOptions: github.ListOptions{
PerPage: perPage,
Page: page,
},
}
allPRs := make([]*base.PullRequest, 0, perPage)
g.waitAndPickClient()
prs, resp, err := g.getClient().PullRequests.List(g.ctx, g.repoOwner, g.repoName, opt)
if err != nil {
return nil, false, fmt.Errorf("error while listing repos: %w", err)
}
log.Trace("Request get pull requests %d/%d, but in fact get %d", perPage, page, len(prs))
g.setRate(&resp.Rate)
for _, pr := range prs {
labels := make([]*base.Label, 0, len(pr.Labels))
for _, l := range pr.Labels {
labels = append(labels, convertGithubLabel(l))
}
// get reactions
var reactions []*base.Reaction
if !g.SkipReactions {
for i := 1; ; i++ {
g.waitAndPickClient()
res, resp, err := g.getClient().Reactions.ListIssueReactions(g.ctx, g.repoOwner, g.repoName, pr.GetNumber(), &github.ListOptions{
Page: i,
PerPage: perPage,
})
if err != nil {
return nil, false, err
}
g.setRate(&resp.Rate)
if len(res) == 0 {
break
}
for _, reaction := range res {
reactions = append(reactions, &base.Reaction{
UserID: reaction.User.GetID(),
UserName: reaction.User.GetLogin(),
Content: reaction.GetContent(),
})
}
}
}
// download patch and saved as tmp file
g.waitAndPickClient()
allPRs = append(allPRs, &base.PullRequest{
Title: pr.GetTitle(),
Number: int64(pr.GetNumber()),
PosterID: pr.GetUser().GetID(),
PosterName: pr.GetUser().GetLogin(),
PosterEmail: pr.GetUser().GetEmail(),
Content: pr.GetBody(),
Milestone: pr.GetMilestone().GetTitle(),
State: pr.GetState(),
Created: pr.GetCreatedAt().Time,
Updated: pr.GetUpdatedAt().Time,
Closed: convertGithubTimestampToTime(pr.ClosedAt),
Labels: labels,
Merged: pr.MergedAt != nil,
MergeCommitSHA: pr.GetMergeCommitSHA(),
MergedTime: convertGithubTimestampToTime(pr.MergedAt),
IsLocked: pr.ActiveLockReason != nil,
Head: base.PullRequestBranch{
Ref: pr.GetHead().GetRef(),
SHA: pr.GetHead().GetSHA(),
OwnerName: pr.GetHead().GetUser().GetLogin(),
RepoName: pr.GetHead().GetRepo().GetName(),
CloneURL: pr.GetHead().GetRepo().GetCloneURL(), // see below for SECURITY related issues here
},
Base: base.PullRequestBranch{
Ref: pr.GetBase().GetRef(),
SHA: pr.GetBase().GetSHA(),
RepoName: pr.GetBase().GetRepo().GetName(),
OwnerName: pr.GetBase().GetUser().GetLogin(),
},
PatchURL: pr.GetPatchURL(), // see below for SECURITY related issues here
Reactions: reactions,
ForeignIndex: int64(*pr.Number),
})
// SECURITY: Ensure that the PR is safe
_ = CheckAndEnsureSafePR(allPRs[len(allPRs)-1], g.baseURL, g)
}
return allPRs, len(prs) < perPage, nil
}
func convertGithubReview(r *github.PullRequestReview) *base.Review {
return &base.Review{
ID: r.GetID(),
ReviewerID: r.GetUser().GetID(),
ReviewerName: r.GetUser().GetLogin(),
CommitID: r.GetCommitID(),
Content: r.GetBody(),
CreatedAt: r.GetSubmittedAt().Time,
State: r.GetState(),
}
}
func (g *GithubDownloaderV3) convertGithubReviewComments(cs []*github.PullRequestComment) ([]*base.ReviewComment, error) {
rcs := make([]*base.ReviewComment, 0, len(cs))
for _, c := range cs {
// get reactions
var reactions []*base.Reaction
if !g.SkipReactions {
for i := 1; ; i++ {
g.waitAndPickClient()
res, resp, err := g.getClient().Reactions.ListPullRequestCommentReactions(g.ctx, g.repoOwner, g.repoName, c.GetID(), &github.ListOptions{
Page: i,
PerPage: g.maxPerPage,
})
if err != nil {
return nil, err
}
g.setRate(&resp.Rate)
if len(res) == 0 {
break
}
for _, reaction := range res {
reactions = append(reactions, &base.Reaction{
UserID: reaction.User.GetID(),
UserName: reaction.User.GetLogin(),
Content: reaction.GetContent(),
})
}
}
}
rcs = append(rcs, &base.ReviewComment{
ID: c.GetID(),
InReplyTo: c.GetInReplyTo(),
Content: c.GetBody(),
TreePath: c.GetPath(),
DiffHunk: c.GetDiffHunk(),
Position: c.GetPosition(),
CommitID: c.GetCommitID(),
PosterID: c.GetUser().GetID(),
Reactions: reactions,
CreatedAt: c.GetCreatedAt().Time,
UpdatedAt: c.GetUpdatedAt().Time,
})
}
return rcs, nil
}
// GetReviews returns pull requests review
func (g *GithubDownloaderV3) GetReviews(reviewable base.Reviewable) ([]*base.Review, error) {
allReviews := make([]*base.Review, 0, g.maxPerPage)
if g.SkipReviews {
return allReviews, nil
}
opt := &github.ListOptions{
PerPage: g.maxPerPage,
}
// Get approve/request change reviews
for {
g.waitAndPickClient()
reviews, resp, err := g.getClient().PullRequests.ListReviews(g.ctx, g.repoOwner, g.repoName, int(reviewable.GetForeignIndex()), opt)
if err != nil {
return nil, fmt.Errorf("error while listing repos: %w", err)
}
g.setRate(&resp.Rate)
for _, review := range reviews {
r := convertGithubReview(review)
r.IssueIndex = reviewable.GetLocalIndex()
// retrieve all review comments
opt2 := &github.ListOptions{
PerPage: g.maxPerPage,
}
for {
g.waitAndPickClient()
reviewComments, resp, err := g.getClient().PullRequests.ListReviewComments(g.ctx, g.repoOwner, g.repoName, int(reviewable.GetForeignIndex()), review.GetID(), opt2)
if err != nil {
return nil, fmt.Errorf("error while listing repos: %w", err)
}
g.setRate(&resp.Rate)
cs, err := g.convertGithubReviewComments(reviewComments)
if err != nil {
return nil, err
}
r.Comments = append(r.Comments, cs...)
if resp.NextPage == 0 {
break
}
opt2.Page = resp.NextPage
}
allReviews = append(allReviews, r)
}
if resp.NextPage == 0 {
break
}
opt.Page = resp.NextPage
}
// Get requested reviews
for {
g.waitAndPickClient()
reviewers, resp, err := g.getClient().PullRequests.ListReviewers(g.ctx, g.repoOwner, g.repoName, int(reviewable.GetForeignIndex()), opt)
if err != nil {
return nil, fmt.Errorf("error while listing repos: %w", err)
}
g.setRate(&resp.Rate)
for _, user := range reviewers.Users {
r := &base.Review{
ReviewerID: user.GetID(),
ReviewerName: user.GetLogin(),
State: base.ReviewStateRequestReview,
IssueIndex: reviewable.GetLocalIndex(),
}
allReviews = append(allReviews, r)
}
// TODO: Handle Team requests
if resp.NextPage == 0 {
break
}
opt.Page = resp.NextPage
}
return allReviews, nil
}
func convertGithubTimestampToTime(t *github.Timestamp) *time.Time {
if t == nil {
return nil
}
return &t.Time
}