0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-13 22:50:38 -05:00

feat(startup): update logic for metadb update on startup, skip unmodified repos (#2024)

- MetaDB stores the time of the last update of a repo
- During startup we check if the layout has been updated after the last recorded change in the db
- If this is the case, the repo is parsed and updated in the DB otherwise it's skipped

Signed-off-by: Laurentiu Niculae <niculae.laurentiu1@gmail.com>
This commit is contained in:
LaurentiuNiculae 2023-11-16 20:39:27 +02:00 committed by GitHub
parent 60eaf7b5d9
commit 4fb1e756c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 763 additions and 159 deletions

View file

@ -264,22 +264,22 @@ func TestCreateMetaDBDriver(t *testing.T) {
const perms = 0o600
boltDB, err := bbolt.Open(path.Join(dir, "repo.db"), perms, &bbolt.Options{Timeout: time.Second * 10})
boltDB, err := bbolt.Open(path.Join(dir, "meta.db"), perms, &bbolt.Options{Timeout: time.Second * 10})
So(err, ShouldBeNil)
err = boltDB.Close()
So(err, ShouldBeNil)
err = os.Chmod(path.Join(dir, "repo.db"), 0o200)
err = os.Chmod(path.Join(dir, "meta.db"), 0o200)
So(err, ShouldBeNil)
_, err = meta.New(conf.Storage.StorageConfig, log)
So(err, ShouldNotBeNil)
err = os.Chmod(path.Join(dir, "repo.db"), 0o600)
err = os.Chmod(path.Join(dir, "meta.db"), 0o600)
So(err, ShouldBeNil)
defer os.Remove(path.Join(dir, "repo.db"))
defer os.Remove(path.Join(dir, "meta.db"))
})
}

View file

@ -53,7 +53,7 @@ import (
const (
graphqlQueryPrefix = constants.FullSearchPrefix
DBFileName = "repo.db"
DBFileName = "meta.db"
)
var (

View file

@ -65,7 +65,12 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
return err
}
_, err = transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
repoBlobsBuck, err := transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck))
if err != nil {
return err
}
_, err = repoBlobsBuck.CreateBucketIfNotExists([]byte(RepoLastUpdatedBuck))
if err != nil {
return err
}
@ -84,6 +89,53 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) {
}, nil
}
func (bdw *BoltDB) GetAllRepoNames() ([]string, error) {
repoNames := []string{}
err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck))
return repoMetaBuck.ForEach(func(repo, _ []byte) error {
repoNames = append(repoNames, string(repo))
return nil
})
})
return repoNames, err
}
func (bdw *BoltDB) GetRepoLastUpdated(repo string) time.Time {
lastUpdated := time.Time{}
err := bdw.DB.View(func(tx *bbolt.Tx) error {
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))
lastUpdatedBlob := lastUpdatedBuck.Get([]byte(repo))
if len(lastUpdatedBlob) == 0 {
return zerr.ErrRepoMetaNotFound
}
protoTime := &timestamppb.Timestamp{}
err := proto.Unmarshal(lastUpdatedBlob, protoTime)
if err != nil {
return err
}
lastUpdated = *mConvert.GetTime(protoTime)
return nil
})
if err != nil {
return time.Time{}
}
return lastUpdated
}
func (bdw *BoltDB) SetImageMeta(digest godigest.Digest, imageMeta mTypes.ImageMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(ImageMetaBuck))
@ -132,6 +184,7 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
err = bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))
imageBuck := tx.Bucket([]byte(ImageMetaBuck))
// 1. Add image data to db if needed
@ -226,6 +279,11 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
protoRepoMeta, repoBlobs = common.AddImageMetaToRepoMeta(protoRepoMeta, repoBlobs, reference, imageMeta)
err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck)
if err != nil {
return err
}
err = setProtoRepoBlobs(repoBlobs, repoBlobsBuck)
if err != nil {
return err
@ -237,6 +295,17 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference
return err
}
func setRepoLastUpdated(repo string, lastUpdated time.Time, repoLastUpdatedBuck *bbolt.Bucket) error {
protoTime := timestamppb.New(lastUpdated)
protoTimeBlob, err := proto.Marshal(protoTime)
if err != nil {
return err
}
return repoLastUpdatedBuck.Put([]byte(repo), protoTimeBlob)
}
func unmarshalProtoRepoBlobs(repo string, repoBlobsBytes []byte) (*proto_go.RepoBlobs, error) {
repoBlobs := &proto_go.RepoBlobs{
Name: repo,
@ -1010,6 +1079,7 @@ func (bdw *BoltDB) DecrementRepoStars(repo string) error {
func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
buck := tx.Bucket([]byte(RepoMetaBuck))
repoLastUpdatedBuck := tx.Bucket([]byte(RepoBlobsBuck)).Bucket([]byte(RepoLastUpdatedBuck))
repoMeta.Name = repo
@ -1018,7 +1088,35 @@ func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
return err
}
return buck.Put([]byte(repo), repoMetaBlob)
err = buck.Put([]byte(repo), repoMetaBlob)
if err != nil {
return err
}
// The last update time is set to 0 in order to force an update in case of a next storage parsing
return setRepoLastUpdated(repo, time.Time{}, repoLastUpdatedBuck)
})
return err
}
func (bdw *BoltDB) DeleteRepoMeta(repo string) error {
err := bdw.DB.Update(func(tx *bbolt.Tx) error {
repoBuck := tx.Bucket([]byte(RepoMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))
err := repoBuck.Delete([]byte(repo))
if err != nil {
return err
}
err = repoBlobsBuck.Delete([]byte(repo))
if err != nil {
return err
}
return repoLastUpdatedBuck.Delete([]byte(repo))
})
return err
@ -1212,6 +1310,7 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go
repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck))
imageMetaBuck := tx.Bucket([]byte(ImageMetaBuck))
repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck))
repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck))
protoRepoMeta, err := getProtoRepoMeta(repo, repoMetaBuck)
if err != nil {
@ -1292,6 +1391,11 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go
return err
}
err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck)
if err != nil {
return err
}
protoRepoMeta, repoBlobs = common.RemoveImageFromRepoMeta(protoRepoMeta, repoBlobs, reference)
repoBlobsBytes, err = proto.Marshal(repoBlobs)
@ -1934,12 +2038,39 @@ func (bdw *BoltDB) ResetDB() error {
}
func resetBucket(transaction *bbolt.Tx, bucketName string) error {
err := transaction.DeleteBucket([]byte(bucketName))
bucket := transaction.Bucket([]byte(bucketName))
if bucket == nil {
return nil
}
// we need to create the sub buckets if they exits, we'll presume the sub-buckets are not nested more than 1 layer
subBuckets := [][]byte{}
err := bucket.ForEachBucket(func(bucketName []byte) error {
subBuckets = append(subBuckets, bucketName)
return nil
})
if err != nil {
return err
}
_, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
err = transaction.DeleteBucket([]byte(bucketName))
if err != nil {
return err
}
bucket, err = transaction.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}
for _, subBucket := range subBuckets {
_, err := bucket.CreateBucketIfNotExists(subBucket)
if err != nil {
return err
}
}
return err
}

View file

@ -63,58 +63,6 @@ func TestWrapperErrors(t *testing.T) {
ctx := userAc.DeriveContext(context.Background())
Convey("ResetDB", func() {
Convey("RepoMetaBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.RepoMetaBuck))
})
So(err, ShouldBeNil)
err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})
Convey("ImageMetaBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.ImageMetaBuck))
})
So(err, ShouldBeNil)
err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})
Convey("RepoBlobsBuck", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.RepoBlobsBuck))
})
So(err, ShouldBeNil)
err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})
Convey("UserAPIKeysBucket", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.UserAPIKeysBucket))
})
So(err, ShouldBeNil)
err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})
Convey("UserDataBucket", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
return tx.DeleteBucket([]byte(boltdb.UserDataBucket))
})
So(err, ShouldBeNil)
err = boltdbWrapper.ResetDB()
So(err, ShouldNotBeNil)
})
})
Convey("RemoveRepoReference", func() {
Convey("getProtoRepoMeta errors", func() {
err := setRepoMeta("repo", badProtoBlob, boltdbWrapper.DB)
@ -192,6 +140,21 @@ func TestWrapperErrors(t *testing.T) {
})
})
Convey("GetRepoLastUpdated", func() {
Convey("bad blob in db", func() {
err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error {
repoBlobsBuck := tx.Bucket([]byte(boltdb.RepoBlobsBuck))
lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(boltdb.RepoLastUpdatedBuck))
return lastUpdatedBuck.Put([]byte("repo"), []byte("bad-blob"))
})
So(err, ShouldBeNil)
lastUpdated := boltdbWrapper.GetRepoLastUpdated("repo")
So(lastUpdated, ShouldEqual, time.Time{})
})
})
Convey("UpdateStatsOnDownload", func() {
Convey("repo meta not found", func() {
err = boltdbWrapper.UpdateStatsOnDownload("repo", "ref")

View file

@ -4,8 +4,12 @@ package boltdb
const (
ImageMetaBuck = "ImageMeta"
RepoMetaBuck = "RepoMeta"
RepoBlobsBuck = "RepoBlobsMeta"
UserDataBucket = "UserData"
VersionBucket = "Version"
UserAPIKeysBucket = "UserAPIKeys"
)
const (
RepoBlobsBuck = "RepoBlobsMeta"
RepoLastUpdatedBuck = "RepoLastUpdated" // Sub-bucket
)

View file

@ -14,7 +14,7 @@ type DBParameters struct {
func GetBoltDriver(params DBParameters) (*bolt.DB, error) {
const perms = 0o600
boltDB, err := bolt.Open(path.Join(params.RootDir, "repo.db"), perms, &bolt.Options{Timeout: time.Second * 10})
boltDB, err := bolt.Open(path.Join(params.RootDir, "meta.db"), perms, &bolt.Options{Timeout: time.Second * 10})
if err != nil {
return nil, err
}

View file

@ -90,6 +90,66 @@ func New(client *dynamodb.Client, params DBDriverParameters, log log.Logger,
return &dynamoWrapper, nil
}
func (dwr *DynamoDB) GetAllRepoNames() ([]string, error) {
ctx := context.Background()
attributeIterator := NewBaseDynamoAttributesIterator(dwr.Client, dwr.RepoMetaTablename, "TableKey", 0, dwr.Log)
repoNames := []string{}
repoNameAttribute, err := attributeIterator.First(ctx)
for ; repoNameAttribute != nil; repoNameAttribute, err = attributeIterator.Next(ctx) {
if err != nil {
return []string{}, err
}
var repoName string
err := attributevalue.Unmarshal(repoNameAttribute, &repoName)
if err != nil {
continue
}
repoNames = append(repoNames, repoName)
}
return repoNames, nil
}
func (dwr *DynamoDB) GetRepoLastUpdated(repo string) time.Time {
resp, err := dwr.Client.GetItem(context.Background(), &dynamodb.GetItemInput{
TableName: aws.String(dwr.RepoBlobsTablename),
Key: map[string]types.AttributeValue{
"TableKey": &types.AttributeValueMemberS{Value: repo},
},
ProjectionExpression: aws.String("RepoLastUpdated"),
})
if err != nil {
return time.Time{}
}
protoRepoLastUpdated := &timestamppb.Timestamp{}
repoLastUpdatedBlob := []byte{}
if resp.Item != nil {
err = attributevalue.Unmarshal(resp.Item["RepoLastUpdated"], &repoLastUpdatedBlob)
if err != nil {
return time.Time{}
}
if len(repoLastUpdatedBlob) > 0 {
err := proto.Unmarshal(repoLastUpdatedBlob, protoRepoLastUpdated)
if err != nil {
return time.Time{}
}
}
}
lastUpdated := *mConvert.GetTime(protoRepoLastUpdated)
return lastUpdated
}
func (dwr *DynamoDB) ImageTrustStore() mTypes.ImageTrustStore {
return dwr.imgTrustStore
}
@ -117,7 +177,7 @@ func (dwr *DynamoDB) SetProtoImageMeta(digest godigest.Digest, protoImageMeta *p
":ImageMeta": mdAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: digest.String(),
},
},
@ -136,7 +196,7 @@ func (dwr *DynamoDB) GetProtoImageMeta(ctx context.Context, digest godigest.Dige
resp, err := dwr.Client.GetItem(ctx, &dynamodb.GetItemInput{
TableName: aws.String(dwr.ImageMetaTablename),
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{Value: digest.String()},
"TableKey": &types.AttributeValueMemberS{Value: digest.String()},
},
})
if err != nil {
@ -185,7 +245,7 @@ func (dwr *DynamoDB) setProtoRepoMeta(repo string, repoMeta *proto_go.RepoMeta)
":RepoMeta": repoAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: repo,
},
},
@ -200,7 +260,7 @@ func (dwr *DynamoDB) getProtoRepoMeta(ctx context.Context, repo string) (*proto_
resp, err := dwr.Client.GetItem(ctx, &dynamodb.GetItemInput{
TableName: aws.String(dwr.RepoMetaTablename),
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{Value: repo},
"TableKey": &types.AttributeValueMemberS{Value: repo},
},
})
if err != nil {
@ -354,11 +414,43 @@ func (dwr *DynamoDB) SetRepoReference(ctx context.Context, repo string, referenc
return dwr.setProtoRepoMeta(repo, repoMeta) //nolint: contextcheck
}
func (dwr *DynamoDB) updateRepoLastUpdated(ctx context.Context, repo string, time time.Time) error {
protoTime := timestamppb.New(time)
protoTimeBlob, err := proto.Marshal(protoTime)
if err != nil {
return err
}
mdAttributeValue, err := attributevalue.Marshal(protoTimeBlob)
if err != nil {
return err
}
_, err = dwr.Client.UpdateItem(ctx, &dynamodb.UpdateItemInput{
ExpressionAttributeNames: map[string]string{
"#RLU": "RepoLastUpdated",
},
ExpressionAttributeValues: map[string]types.AttributeValue{
":RepoLastUpdated": mdAttributeValue,
},
Key: map[string]types.AttributeValue{
"TableKey": &types.AttributeValueMemberS{
Value: repo,
},
},
TableName: aws.String(dwr.RepoBlobsTablename),
UpdateExpression: aws.String("SET #RLU = :RepoLastUpdated"),
})
return err
}
func (dwr *DynamoDB) getProtoRepoBlobs(ctx context.Context, repo string) (*proto_go.RepoBlobs, error) {
resp, err := dwr.Client.GetItem(ctx, &dynamodb.GetItemInput{
TableName: aws.String(dwr.RepoBlobsTablename),
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{Value: repo},
"TableKey": &types.AttributeValueMemberS{Value: repo},
},
})
if err != nil {
@ -393,6 +485,18 @@ func (dwr *DynamoDB) getProtoRepoBlobs(ctx context.Context, repo string) (*proto
}
func (dwr *DynamoDB) setRepoBlobsInfo(repo string, repoBlobs *proto_go.RepoBlobs) error {
protoTime := timestamppb.Now()
protoTimeBlob, err := proto.Marshal(protoTime)
if err != nil {
return err
}
timeAttributeValue, err := attributevalue.Marshal(protoTimeBlob)
if err != nil {
return err
}
bytes, err := proto.Marshal(repoBlobs)
if err != nil {
return err
@ -406,17 +510,19 @@ func (dwr *DynamoDB) setRepoBlobsInfo(repo string, repoBlobs *proto_go.RepoBlobs
_, err = dwr.Client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
ExpressionAttributeNames: map[string]string{
"#RBI": "RepoBlobsInfo",
"#RLU": "RepoLastUpdated",
},
ExpressionAttributeValues: map[string]types.AttributeValue{
":RepoBlobsInfo": mdAttributeValue,
":RepoBlobsInfo": mdAttributeValue,
":RepoLastUpdated": timeAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: repo,
},
},
TableName: aws.String(dwr.RepoBlobsTablename),
UpdateExpression: aws.String("SET #RBI = :RepoBlobsInfo"),
UpdateExpression: aws.String("SET #RBI = :RepoBlobsInfo, #RLU = :RepoLastUpdated"),
})
return err
@ -932,9 +1038,43 @@ func (dwr *DynamoDB) DecrementRepoStars(repo string) error {
func (dwr *DynamoDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error {
protoRepoMeta := mConvert.GetProtoRepoMeta(repoMeta)
err := dwr.updateRepoLastUpdated(context.Background(), repo, time.Time{})
if err != nil {
return err
}
return dwr.setProtoRepoMeta(repo, protoRepoMeta)
}
func (dwr *DynamoDB) DeleteRepoMeta(repo string) error {
_, err := dwr.Client.TransactWriteItems(context.Background(), &dynamodb.TransactWriteItemsInput{
TransactItems: []types.TransactWriteItem{
{
Delete: &types.Delete{
Key: map[string]types.AttributeValue{
"TableKey": &types.AttributeValueMemberS{
Value: repo,
},
},
TableName: aws.String(dwr.RepoMetaTablename),
},
},
{
Delete: &types.Delete{
Key: map[string]types.AttributeValue{
"TableKey": &types.AttributeValueMemberS{
Value: repo,
},
},
TableName: aws.String(dwr.RepoBlobsTablename),
},
},
},
})
return err
}
func (dwr *DynamoDB) GetReferrersInfo(repo string, referredDigest godigest.Digest, artifactTypes []string,
) ([]mTypes.ReferrerInfo, error) {
repoMeta, err := dwr.GetRepoMeta(context.Background(), repo)
@ -1216,6 +1356,8 @@ func (dwr *DynamoDB) FilterImageMeta(ctx context.Context, digests []string,
func (dwr *DynamoDB) RemoveRepoReference(repo, reference string, manifestDigest godigest.Digest,
) error {
ctx := context.Background()
protoRepoMeta, err := dwr.getProtoRepoMeta(context.Background(), repo)
if err != nil {
if errors.Is(err, zerr.ErrRepoMetaNotFound) {
@ -1295,7 +1437,7 @@ func (dwr *DynamoDB) RemoveRepoReference(repo, reference string, manifestDigest
delete(protoRepoMeta.Referrers, manifestDigest.String())
}
repoBlobsInfo, err := dwr.getProtoRepoBlobs(context.Background(), repo)
repoBlobsInfo, err := dwr.getProtoRepoBlobs(ctx, repo)
if err != nil {
return err
}
@ -1448,7 +1590,7 @@ func (dwr *DynamoDB) ToggleStarRepo(ctx context.Context, repo string) (
":UserData": userAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: userid,
},
},
@ -1466,7 +1608,7 @@ func (dwr *DynamoDB) ToggleStarRepo(ctx context.Context, repo string) (
":RepoMeta": repoAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: repo,
},
},
@ -1642,7 +1784,7 @@ func (dwr DynamoDB) AddUserAPIKey(ctx context.Context, hashedKey string, apiKeyD
":UserData": userAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: userid,
},
},
@ -1660,7 +1802,7 @@ func (dwr DynamoDB) AddUserAPIKey(ctx context.Context, hashedKey string, apiKeyD
":Identity": &types.AttributeValueMemberS{Value: userid},
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: hashedKey,
},
},
@ -1687,7 +1829,7 @@ func (dwr DynamoDB) DeleteUserAPIKey(ctx context.Context, keyID string) error {
_, err = dwr.Client.DeleteItem(ctx, &dynamodb.DeleteItemInput{
TableName: aws.String(dwr.APIKeyTablename),
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{Value: hash},
"TableKey": &types.AttributeValueMemberS{Value: hash},
},
})
if err != nil {
@ -1709,7 +1851,7 @@ func (dwr DynamoDB) GetUserAPIKeyInfo(hashedKey string) (string, error) {
resp, err := dwr.Client.GetItem(context.Background(), &dynamodb.GetItemInput{
TableName: aws.String(dwr.APIKeyTablename),
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{Value: hashedKey},
"TableKey": &types.AttributeValueMemberS{Value: hashedKey},
},
})
if err != nil {
@ -1745,7 +1887,7 @@ func (dwr DynamoDB) GetUserData(ctx context.Context) (mTypes.UserData, error) {
resp, err := dwr.Client.GetItem(ctx, &dynamodb.GetItemInput{
TableName: aws.String(dwr.UserDataTablename),
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{Value: userid},
"TableKey": &types.AttributeValueMemberS{Value: userid},
},
})
if err != nil {
@ -1789,7 +1931,7 @@ func (dwr DynamoDB) SetUserData(ctx context.Context, userData mTypes.UserData) e
":UserData": userAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: userid,
},
},
@ -1815,7 +1957,7 @@ func (dwr DynamoDB) DeleteUserData(ctx context.Context) error {
_, err = dwr.Client.DeleteItem(ctx, &dynamodb.DeleteItemInput{
TableName: aws.String(dwr.UserDataTablename),
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{Value: userid},
"TableKey": &types.AttributeValueMemberS{Value: userid},
},
})
@ -1847,7 +1989,7 @@ func getBatchImageKeys(digests []string) []map[string]types.AttributeValue {
for _, digest := range digests {
result = append(result, map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: digest,
},
})
@ -1928,13 +2070,13 @@ func (dwr *DynamoDB) createTable(tableName string) error {
TableName: aws.String(tableName),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("Key"),
AttributeName: aws.String("TableKey"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("Key"),
AttributeName: aws.String("TableKey"),
KeyType: types.KeyTypeHash,
},
},
@ -1985,13 +2127,13 @@ func (dwr *DynamoDB) createVersionTable() error {
TableName: aws.String(dwr.VersionTablename),
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("Key"),
AttributeName: aws.String("TableKey"),
AttributeType: types.ScalarAttributeTypeS,
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("Key"),
AttributeName: aws.String("TableKey"),
KeyType: types.KeyTypeHash,
},
},
@ -2024,7 +2166,7 @@ func (dwr *DynamoDB) createVersionTable() error {
":Version": mdAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: version.DBVersionKey,
},
},
@ -2044,7 +2186,7 @@ func (dwr *DynamoDB) getDBVersion() (string, error) {
resp, err := dwr.Client.GetItem(context.TODO(), &dynamodb.GetItemInput{
TableName: aws.String(dwr.VersionTablename),
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{Value: version.DBVersionKey},
"TableKey": &types.AttributeValueMemberS{Value: version.DBVersionKey},
},
})
if err != nil {

View file

@ -4,6 +4,7 @@ import (
"context"
"os"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
@ -879,6 +880,22 @@ func TestWrapperErrors(t *testing.T) {
So(err, ShouldNotBeNil)
})
Convey("GetRepoLastUpdated", func() {
Convey("bad table", func() {
dynamoWrapper.RepoBlobsTablename = "bad-table"
lastUpdated := dynamoWrapper.GetRepoLastUpdated("repo")
So(lastUpdated, ShouldEqual, time.Time{})
})
Convey("unmarshal error", func() {
err := setRepoLastUpdated("repo", []byte("bad-blob"), dynamoWrapper)
So(err, ShouldBeNil)
lastUpdated := dynamoWrapper.GetRepoLastUpdated("repo")
So(lastUpdated, ShouldEqual, time.Time{})
})
})
Convey("DeleteUserAPIKey returns nil", func() {
userAc := reqCtx.NewUserAccessControl()
userAc.SetUsername("email")
@ -1090,7 +1107,7 @@ func setRepoMeta(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB) er
":RepoMeta": userAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: repo,
},
},
@ -1101,6 +1118,31 @@ func setRepoMeta(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB) er
return err
}
func setRepoLastUpdated(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB) error { //nolint: unparam
lastUpdatedAttributeValue, err := attributevalue.Marshal(blob)
if err != nil {
return err
}
_, err = dynamoWrapper.Client.UpdateItem(context.Background(), &dynamodb.UpdateItemInput{
ExpressionAttributeNames: map[string]string{
"#RLU": "RepoLastUpdated",
},
ExpressionAttributeValues: map[string]types.AttributeValue{
":RepoLastUpdated": lastUpdatedAttributeValue,
},
Key: map[string]types.AttributeValue{
"TableKey": &types.AttributeValueMemberS{
Value: repo,
},
},
TableName: aws.String(dynamoWrapper.RepoBlobsTablename),
UpdateExpression: aws.String("SET #RLU = :RepoLastUpdated"),
})
return err
}
func setRepoBlobInfo(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB) error {
userAttributeValue, err := attributevalue.Marshal(blob)
if err != nil {
@ -1115,7 +1157,7 @@ func setRepoBlobInfo(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB
":RepoBlobsInfo": userAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: repo,
},
},
@ -1140,7 +1182,7 @@ func setImageMeta(digest godigest.Digest, blob []byte, dynamoWrapper *mdynamodb.
":ImageMeta": userAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: digest.String(),
},
},
@ -1165,7 +1207,7 @@ func setBadUserData(client *dynamodb.Client, userDataTablename, userID string) e
":UserData": userAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: userID,
},
},
@ -1190,7 +1232,7 @@ func setVersion(client *dynamodb.Client, versionTablename string, version string
":Version": mdAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: "DBVersion",
},
},

View file

@ -50,8 +50,9 @@ func NewBaseDynamoAttributesIterator(client *dynamodb.Client, table, attribute s
func (dii *BaseAttributesIterator) First(ctx context.Context) (types.AttributeValue, error) {
scanOutput, err := dii.Client.Scan(ctx, &dynamodb.ScanInput{
TableName: aws.String(dii.Table),
Limit: dii.readLimit,
TableName: aws.String(dii.Table),
Limit: dii.readLimit,
ProjectionExpression: aws.String(dii.Attribute),
})
if err != nil {
return &types.AttributeValueMemberBOOL{}, err

View file

@ -48,7 +48,7 @@ func getManifestDigest(md mTypes.ManifestMeta) string { return md.Digest.String(
func TestBoltDB(t *testing.T) {
Convey("BoltDB creation", t, func() {
boltDBParams := boltdb.DBParameters{RootDir: t.TempDir()}
repoDBPath := path.Join(boltDBParams.RootDir, "repo.db")
repoDBPath := path.Join(boltDBParams.RootDir, "meta.db")
boltDriver, err := boltdb.GetBoltDriver(boltDBParams)
So(err, ShouldBeNil)
@ -85,7 +85,7 @@ func TestBoltDB(t *testing.T) {
boltdbWrapper.SetImageTrustStore(imgTrustStore)
defer func() {
os.Remove(path.Join(boltDBParams.RootDir, "repo.db"))
os.Remove(path.Join(boltDBParams.RootDir, "meta.db"))
os.RemoveAll(path.Join(boltDBParams.RootDir, "_cosign"))
os.RemoveAll(path.Join(boltDBParams.RootDir, "_notation"))
}()
@ -2147,7 +2147,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func
Convey("GetReferrersInfo", func() {
repo := "repo"
tag := "tag"
tag := "test-tag"
image := CreateRandomImage()
err := metaDB.SetRepoReference(ctx, repo, tag, image.AsImageMeta())
@ -2243,7 +2243,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func
Convey("FilterRepos", func() {
repo := "repoFilter"
tag1 := "tag1"
tag2 := "tag2"
tag2 := "tag22"
image := CreateImageWith().DefaultLayers().PlatformConfig("image-platform", "image-os").Build()
err := metaDB.SetRepoReference(ctx, repo, tag1, image.AsImageMeta())
@ -2371,6 +2371,71 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func
So(repoMeta.IsStarred, ShouldBeTrue)
So(repoMeta.Tags, ShouldContainKey, "tag")
})
Convey("GetAllRepoNames", func() {
repo1 := "repo1"
repo2 := "repo2"
repo3 := "repo3"
imageMeta := CreateRandomImage().AsImageMeta()
err := metaDB.SetRepoReference(ctx, repo1, "tag", imageMeta)
So(err, ShouldBeNil)
err = metaDB.SetRepoReference(ctx, repo2, "tag", imageMeta)
So(err, ShouldBeNil)
err = metaDB.SetRepoReference(ctx, repo3, "tag", imageMeta)
So(err, ShouldBeNil)
repos, err := metaDB.GetAllRepoNames()
So(err, ShouldBeNil)
So(repos, ShouldContain, repo1)
So(repos, ShouldContain, repo2)
So(repos, ShouldContain, repo3)
err = metaDB.DeleteRepoMeta(repo1)
So(err, ShouldBeNil)
repos, err = metaDB.GetAllRepoNames()
So(err, ShouldBeNil)
So(repos, ShouldNotContain, repo1)
So(repos, ShouldContain, repo2)
So(repos, ShouldContain, repo3)
err = metaDB.DeleteRepoMeta(repo2)
So(err, ShouldBeNil)
repos, err = metaDB.GetAllRepoNames()
So(err, ShouldBeNil)
So(repos, ShouldNotContain, repo1)
So(repos, ShouldNotContain, repo2)
So(repos, ShouldContain, repo3)
err = metaDB.SetRepoReference(ctx, repo1, "tag", imageMeta)
So(err, ShouldBeNil)
repos, err = metaDB.GetAllRepoNames()
So(err, ShouldBeNil)
So(repos, ShouldContain, repo1)
So(repos, ShouldNotContain, repo2)
So(repos, ShouldContain, repo3)
err = metaDB.DeleteRepoMeta(repo1)
So(err, ShouldBeNil)
repos, err = metaDB.GetAllRepoNames()
So(err, ShouldBeNil)
So(repos, ShouldNotContain, repo1)
So(repos, ShouldNotContain, repo2)
So(repos, ShouldContain, repo3)
err = metaDB.DeleteRepoMeta(repo3)
So(err, ShouldBeNil)
repos, err = metaDB.GetAllRepoNames()
So(err, ShouldBeNil)
So(repos, ShouldNotContain, repo1)
So(repos, ShouldNotContain, repo2)
So(repos, ShouldNotContain, repo3)
})
})
}

View file

@ -14,8 +14,7 @@ import (
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta/convert"
mTypes "zotregistry.io/zot/pkg/meta/types"
"zotregistry.io/zot/pkg/storage"
storageTypes "zotregistry.io/zot/pkg/storage/types"
stypes "zotregistry.io/zot/pkg/storage/types"
)
const (
@ -25,37 +24,85 @@ const (
// ParseStorage will sync all repos found in the rootdirectory of the oci layout that zot was deployed on with the
// ParseStorage database.
func ParseStorage(metaDB mTypes.MetaDB, storeController storage.StoreController, log log.Logger) error {
func ParseStorage(metaDB mTypes.MetaDB, storeController stypes.StoreController, log log.Logger) error {
log.Info().Msg("Started parsing storage and updating MetaDB")
allRepos, err := getAllRepos(storeController)
allStorageRepos, err := getAllRepos(storeController, log)
if err != nil {
rootDir := storeController.DefaultStore.RootDir()
log.Error().Err(err).Str("rootDir", rootDir).
Msg("load-local-layout: failed to get all repo names present under rootDir")
return err
}
allMetaDBRepos, err := metaDB.GetAllRepoNames()
if err != nil {
log.Error().Err(err).
Msg("load-metadb-layout: failed to get all repo names present in metadb")
return err
}
for i, repo := range allRepos {
log.Info().Int("total", len(allRepos)).Int("progress", i).Str("current-repo", repo).
Msgf("parsing next repo '%s'", repo)
err := ParseRepo(repo, metaDB, storeController, log)
for _, repo := range getReposToBeDeleted(allStorageRepos, allMetaDBRepos) {
err := metaDB.DeleteRepoMeta(repo)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("load-local-layout: failed to sync repo")
log.Error().Err(err).Str("rootDir", storeController.GetImageStore(repo).RootDir()).
Str("repo", repo).Msg("load-metadb-layout: failed to get all repo names present in metadb")
return err
}
}
for i, repo := range allStorageRepos {
log.Info().Int("total", len(allStorageRepos)).Int("progress", i).Str("current-repo", repo).
Msgf("parsing next repo '%s'", repo)
imgStore := storeController.GetImageStore(repo)
_, _, storageLastUpdated, err := imgStore.StatIndex(repo)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to sync repo")
continue
}
metaLastUpdated := metaDB.GetRepoLastUpdated(repo)
if storageLastUpdated.Before(metaLastUpdated) {
continue
}
err = ParseRepo(repo, metaDB, storeController, log)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to sync repo")
continue
}
}
log.Info().Msg("Done parsing storage and updating MetaDB")
return nil
}
// getReposToBeDeleted will return all repos that are found in metaDB but not found in storage anymore.
func getReposToBeDeleted(allStorageRepos []string, allMetaDBRepos []string) []string {
toBeDeleted := []string{}
storageRepoNameSet := map[string]struct{}{}
for i := range allStorageRepos {
storageRepoNameSet[allStorageRepos[i]] = struct{}{}
}
for _, metaDBRepo := range allMetaDBRepos {
if _, found := storageRepoNameSet[metaDBRepo]; !found {
toBeDeleted = append(toBeDeleted, metaDBRepo)
}
}
return toBeDeleted
}
// ParseRepo reads the contents of a repo and syncs all images and signatures found.
func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController storage.StoreController, log log.Logger) error {
func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreController, log log.Logger) error {
imageStore := storeController.GetImageStore(repo)
var lockLatency time.Time
@ -120,16 +167,22 @@ func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController storage.StoreC
return nil
}
func getAllRepos(storeController storage.StoreController) ([]string, error) {
allRepos, err := storeController.DefaultStore.GetRepositories()
func getAllRepos(storeController stypes.StoreController, log log.Logger) ([]string, error) {
allRepos, err := storeController.GetDefaultImageStore().GetRepositories()
if err != nil {
log.Error().Err(err).Str("rootDir", storeController.GetDefaultImageStore().RootDir()).
Msg("load-local-layout: failed to get all repo names present under rootDir")
return nil, err
}
if storeController.SubStore != nil {
for _, store := range storeController.SubStore {
if storeController.GetImageSubStores() != nil {
for _, store := range storeController.GetImageSubStores() {
substoreRepos, err := store.GetRepositories()
if err != nil {
log.Error().Err(err).Str("rootDir", store.RootDir()).
Msg("load-local-layout: failed to get all repo names present under rootDir")
return nil, err
}
@ -141,7 +194,7 @@ func getAllRepos(storeController storage.StoreController) ([]string, error) {
}
func GetSignatureLayersInfo(repo, tag, manifestDigest, signatureType string, manifestBlob []byte,
imageStore storageTypes.ImageStore, log log.Logger,
imageStore stypes.ImageStore, log log.Logger,
) ([]mTypes.LayerInfo, error) {
switch signatureType {
case zcommon.CosignSignature:
@ -154,7 +207,7 @@ func GetSignatureLayersInfo(repo, tag, manifestDigest, signatureType string, man
}
func getCosignSignatureLayersInfo(
repo, tag, manifestDigest string, manifestBlob []byte, imageStore storageTypes.ImageStore, log log.Logger,
repo, tag, manifestDigest string, manifestBlob []byte, imageStore stypes.ImageStore, log log.Logger,
) ([]mTypes.LayerInfo, error) {
layers := []mTypes.LayerInfo{}
@ -197,7 +250,7 @@ func getCosignSignatureLayersInfo(
}
func getNotationSignatureLayersInfo(
repo, manifestDigest string, manifestBlob []byte, imageStore storageTypes.ImageStore, log log.Logger,
repo, manifestDigest string, manifestBlob []byte, imageStore stypes.ImageStore, log log.Logger,
) ([]mTypes.LayerInfo, error) {
layers := []mTypes.LayerInfo{}
@ -250,7 +303,7 @@ func getNotationSignatureLayersInfo(
// SetMetadataFromInput tries to set manifest metadata and update repo metadata by adding the current tag
// (in case the reference is a tag). The function expects image manifests and indexes (multi arch images).
func SetImageMetaFromInput(ctx context.Context, repo, reference, mediaType string, digest godigest.Digest, blob []byte,
imageStore storageTypes.ImageStore, metaDB mTypes.MetaDB, log log.Logger,
imageStore stypes.ImageStore, metaDB mTypes.MetaDB, log log.Logger,
) error {
var imageMeta mTypes.ImageMeta

View file

@ -7,6 +7,7 @@ import (
"io"
"os"
"path"
"path/filepath"
"testing"
"time"
@ -24,9 +25,11 @@ import (
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/local"
storageTypes "zotregistry.io/zot/pkg/storage/types"
tcommon "zotregistry.io/zot/pkg/test/common"
"zotregistry.io/zot/pkg/test/deprecated"
. "zotregistry.io/zot/pkg/test/image-utils"
"zotregistry.io/zot/pkg/test/mocks"
ociutils "zotregistry.io/zot/pkg/test/oci-utils"
"zotregistry.io/zot/pkg/test/signature"
tskip "zotregistry.io/zot/pkg/test/skip"
)
@ -50,7 +53,7 @@ func TestParseStorageErrors(t *testing.T) {
// sync repo fail
err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
So(err, ShouldBeNil)
Convey("getAllRepos errors", func() {
imageStore1 := mocks.MockedImageStore{
@ -73,6 +76,40 @@ func TestParseStorageErrors(t *testing.T) {
err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
})
Convey("metaDB.GetAllRepoNames errors", func() {
metaDB.GetAllRepoNamesFn = func() ([]string, error) { return nil, ErrTestError }
err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
})
Convey("metaDB.DeleteRepoMeta errors", func() {
imageStore1 := mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) { return []string{"repo1", "repo2"}, nil },
}
storeController := storage.StoreController{DefaultStore: imageStore1}
metaDB.GetAllRepoNamesFn = func() ([]string, error) { return []string{"deleted"}, nil }
metaDB.DeleteRepoMetaFn = func(repo string) error { return ErrTestError }
err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
})
Convey("StatIndex errors", func() {
imageStore1 := mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) { return []string{"repo1", "repo2"}, nil },
}
imageStore1.StatIndexFn = func(repo string) (bool, int64, time.Time, error) {
return false, 0, time.Time{}, ErrTestError
}
storeController := storage.StoreController{DefaultStore: imageStore1}
err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", ""))
So(err, ShouldBeNil)
})
})
Convey("Parse Repo", t, func() {
@ -250,16 +287,17 @@ func getIndexBlob(index ispec.Index) []byte {
func TestParseStorageWithBoltDB(t *testing.T) {
Convey("Boltdb", t, func() {
rootDir := t.TempDir()
log := log.NewLogger("debug", "/dev/null")
boltDB, err := boltdb.GetBoltDriver(boltdb.DBParameters{
RootDir: rootDir,
})
So(err, ShouldBeNil)
metaDB, err := boltdb.New(boltDB, log.NewLogger("debug", ""))
metaDB, err := boltdb.New(boltDB, log)
So(err, ShouldBeNil)
RunParseStorageTests(rootDir, metaDB)
RunParseStorageTests(rootDir, metaDB, log)
})
}
@ -268,6 +306,7 @@ func TestParseStorageDynamoWrapper(t *testing.T) {
Convey("Dynamodb", t, func() {
rootDir := t.TempDir()
log := log.NewLogger("debug", "/dev/null")
params := dynamodb.DBDriverParameters{
Endpoint: os.Getenv("DYNAMODBMOCK_ENDPOINT"),
@ -283,7 +322,7 @@ func TestParseStorageDynamoWrapper(t *testing.T) {
dynamoClient, err := dynamodb.GetDynamoClient(params)
So(err, ShouldBeNil)
dynamoWrapper, err := dynamodb.New(dynamoClient, params, log.NewLogger("debug", ""))
dynamoWrapper, err := dynamodb.New(dynamoClient, params, log)
So(err, ShouldBeNil)
err = dynamoWrapper.ResetTable(dynamoWrapper.RepoMetaTablename)
@ -295,29 +334,26 @@ func TestParseStorageDynamoWrapper(t *testing.T) {
err = dynamoWrapper.ResetTable(dynamoWrapper.ImageMetaTablename)
So(err, ShouldBeNil)
RunParseStorageTests(rootDir, dynamoWrapper)
RunParseStorageTests(rootDir, dynamoWrapper, log)
})
}
func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB, log log.Logger) {
ctx := context.Background()
Convey("Test with simple case", func() {
imageStore := local.NewImageStore(rootDir, false, false,
log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil)
log, monitoring.NewMetricsServer(false, log), nil, nil)
storeController := storage.StoreController{DefaultStore: imageStore}
manifests := []ispec.Manifest{}
for i := 0; i < 3; i++ {
config, layers, manifest, err := deprecated.GetRandomImageComponents(100) //nolint:staticcheck
So(err, ShouldBeNil)
image := CreateRandomImage() //nolint:staticcheck
manifests = append(manifests, manifest)
manifests = append(manifests, image.Manifest)
err = WriteImageToFileSystem(
Image{
Config: config,
Layers: layers,
Manifest: manifest,
}, repo, fmt.Sprintf("tag%d", i), storeController)
err := WriteImageToFileSystem(
image, repo, fmt.Sprintf("tag%d", i), storeController)
So(err, ShouldBeNil)
}
@ -364,18 +400,16 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
err = os.WriteFile(indexPath, buf, 0o600)
So(err, ShouldBeNil)
err = meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", ""))
err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck
So(err, ShouldBeNil)
repos, err := metaDB.GetMultipleRepoMeta(context.Background(),
repos, err := metaDB.GetMultipleRepoMeta(ctx,
func(repoMeta mTypes.RepoMeta) bool { return true })
So(err, ShouldBeNil)
So(len(repos), ShouldEqual, 1)
So(len(repos[0].Tags), ShouldEqual, 2)
ctx := context.Background()
for tag, descriptor := range repos[0].Tags {
imageManifestData, err := metaDB.GetFullImageMeta(ctx, repo, tag)
So(err, ShouldBeNil)
@ -388,7 +422,7 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
Convey("Accept orphan signatures", func() {
imageStore := local.NewImageStore(rootDir, false, false,
log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil)
log, monitoring.NewMetricsServer(false, log), nil, nil)
storeController := storage.StoreController{DefaultStore: imageStore}
// add an image
@ -424,10 +458,10 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
}, repo, signatureTag, storeController)
So(err, ShouldBeNil)
err = meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", ""))
err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck
So(err, ShouldBeNil)
repos, err := metaDB.GetMultipleRepoMeta(context.Background(),
repos, err := metaDB.GetMultipleRepoMeta(ctx,
func(repoMeta mTypes.RepoMeta) bool { return true })
So(err, ShouldBeNil)
@ -443,7 +477,7 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
Convey("Check statistics after load", func() {
imageStore := local.NewImageStore(rootDir, false, false,
log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil)
log, monitoring.NewMetricsServer(false, log), nil, nil)
storeController := storage.StoreController{DefaultStore: imageStore}
// add an image
@ -452,7 +486,7 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
err := WriteImageToFileSystem(image, repo, "tag", storeController)
So(err, ShouldBeNil)
err = metaDB.SetRepoReference(context.Background(), repo, "tag", image.AsImageMeta())
err = metaDB.SetRepoReference(ctx, repo, "tag", image.AsImageMeta())
So(err, ShouldBeNil)
err = metaDB.IncrementRepoStars(repo)
@ -464,17 +498,17 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
err = metaDB.UpdateStatsOnDownload(repo, "tag")
So(err, ShouldBeNil)
repoMeta, err := metaDB.GetRepoMeta(context.Background(), repo)
repoMeta, err := metaDB.GetRepoMeta(ctx, repo)
So(err, ShouldBeNil)
So(repoMeta.Statistics[image.DigestStr()].DownloadCount, ShouldEqual, 3)
So(repoMeta.StarCount, ShouldEqual, 1)
So(time.Now(), ShouldHappenAfter, repoMeta.Statistics[image.DigestStr()].LastPullTimestamp)
err = meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", ""))
err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck
So(err, ShouldBeNil)
repoMeta, err = metaDB.GetRepoMeta(context.Background(), repo)
repoMeta, err = metaDB.GetRepoMeta(ctx, repo)
So(err, ShouldBeNil)
So(repoMeta.Statistics[image.DigestStr()].DownloadCount, ShouldEqual, 3)
@ -484,7 +518,7 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
// make sure pushTimestamp is always populated to not interfere with retention logic
Convey("Always update pushTimestamp if its value is 0(time.Time{})", func() {
imageStore := local.NewImageStore(rootDir, false, false,
log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil)
log, monitoring.NewMetricsServer(false, log), nil, nil)
storeController := storage.StoreController{DefaultStore: imageStore}
// add an image
@ -493,13 +527,13 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
err := WriteImageToFileSystem(image, repo, "tag", storeController)
So(err, ShouldBeNil)
err = metaDB.SetRepoReference(context.Background(), repo, "tag", image.AsImageMeta())
err = metaDB.SetRepoReference(ctx, repo, "tag", image.AsImageMeta())
So(err, ShouldBeNil)
err = metaDB.UpdateStatsOnDownload(repo, "tag")
So(err, ShouldBeNil)
repoMeta, err := metaDB.GetRepoMeta(context.Background(), repo)
repoMeta, err := metaDB.GetRepoMeta(ctx, repo)
So(err, ShouldBeNil)
So(repoMeta.Statistics[image.DigestStr()].DownloadCount, ShouldEqual, 1)
@ -516,16 +550,100 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) {
So(err, ShouldBeNil)
// metaDB should detect that pushTimestamp is 0 and update it.
err = meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", ""))
err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck
So(err, ShouldBeNil)
repoMeta, err = metaDB.GetRepoMeta(context.Background(), repo)
repoMeta, err = metaDB.GetRepoMeta(ctx, repo)
So(err, ShouldBeNil)
So(repoMeta.Statistics[image.DigestStr()].DownloadCount, ShouldEqual, 1)
So(repoMeta.DownloadCount, ShouldEqual, 1)
So(repoMeta.Statistics[image.DigestStr()].PushTimestamp, ShouldHappenAfter, oldPushTimestamp)
})
Convey("Parse 2 times and check correct update of the metaDB for modified and deleted repos", func() {
storeController := ociutils.GetDefaultStoreController(rootDir, log)
notModifiedRepo := "not-modified-repo"
modifiedAddImageRepo := "modified-add-image-repo"
modifiedRemoveImageRepo := "modified-remove-image-repo"
deletedRepo := "deleted-repo"
addedRepo := "added-repo"
tag := "tag"
tag2 := "tag2"
newTag := "newTag"
image := CreateRandomImage()
err := WriteImageToFileSystem(image, notModifiedRepo, tag, storeController)
So(err, ShouldBeNil)
err = WriteImageToFileSystem(image, modifiedAddImageRepo, tag, storeController)
So(err, ShouldBeNil)
err = WriteImageToFileSystem(image, modifiedRemoveImageRepo, tag, storeController)
So(err, ShouldBeNil)
err = WriteImageToFileSystem(image, modifiedRemoveImageRepo, tag2, storeController)
So(err, ShouldBeNil)
err = WriteImageToFileSystem(image, deletedRepo, tag, storeController)
So(err, ShouldBeNil)
err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck
So(err, ShouldBeNil)
repoMetaList, err := metaDB.SearchRepos(ctx, "")
So(err, ShouldBeNil)
So(len(repoMetaList), ShouldEqual, 4)
repoNames := tcommon.AccumulateField(repoMetaList, func(rm mTypes.RepoMeta) string { return rm.Name })
So(repoNames, ShouldContain, notModifiedRepo)
So(repoNames, ShouldContain, modifiedAddImageRepo)
So(repoNames, ShouldContain, modifiedRemoveImageRepo)
So(repoNames, ShouldContain, deletedRepo)
time.Sleep(time.Second)
// Update the storage
err = WriteImageToFileSystem(image, modifiedAddImageRepo, newTag, storeController)
So(err, ShouldBeNil)
err = storeController.GetDefaultImageStore().DeleteImageManifest(modifiedRemoveImageRepo, tag2, false)
So(err, ShouldBeNil)
err = os.RemoveAll(filepath.Join(rootDir, deletedRepo))
So(err, ShouldBeNil)
err = WriteImageToFileSystem(image, addedRepo, tag, storeController)
So(err, ShouldBeNil)
// Parse again
err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck
So(err, ShouldBeNil)
repoMetaList, err = metaDB.SearchRepos(ctx, "")
So(err, ShouldBeNil)
So(len(repoMetaList), ShouldEqual, 4)
repoNames = tcommon.AccumulateField(repoMetaList, func(rm mTypes.RepoMeta) string { return rm.Name })
So(repoNames, ShouldContain, notModifiedRepo)
So(repoNames, ShouldContain, modifiedAddImageRepo)
So(repoNames, ShouldContain, modifiedRemoveImageRepo)
So(repoNames, ShouldNotContain, deletedRepo)
So(repoNames, ShouldContain, addedRepo)
repoMeta, err := metaDB.GetRepoMeta(ctx, modifiedAddImageRepo)
So(err, ShouldBeNil)
So(repoMeta.Tags, ShouldContainKey, tag)
So(repoMeta.Tags, ShouldContainKey, newTag)
repoMeta, err = metaDB.GetRepoMeta(ctx, modifiedRemoveImageRepo)
So(err, ShouldBeNil)
So(repoMeta.Tags, ShouldContainKey, tag)
So(repoMeta.Tags, ShouldNotContainKey, tag2)
})
}
func TestGetSignatureLayersInfo(t *testing.T) {

View file

@ -112,6 +112,9 @@ type MetaDB interface { //nolint:interfacebloat
// SetRepoMeta returns RepoMetadata of a repo from the database
SetRepoMeta(repo string, repoMeta RepoMeta) error
// DeleteRepoMeta
DeleteRepoMeta(repo string) error
// GetReferrersInfo returns a list of for all referrers of the given digest that match one of the
// artifact types.
GetReferrersInfo(repo string, referredDigest godigest.Digest, artifactTypes []string) ([]ReferrerInfo, error)
@ -136,6 +139,10 @@ type MetaDB interface { //nolint:interfacebloat
// specific metadata such as star count, downloads other statistics
ResetRepoReferences(repo string) error
GetRepoLastUpdated(repo string) time.Time
GetAllRepoNames() ([]string, error)
// ResetDB will delete all data in the DB
ResetDB() error

View file

@ -34,7 +34,7 @@ func TestVersioningBoltDB(t *testing.T) {
log := log.NewLogger("debug", "")
boltdbWrapper, err := boltdb.New(boltDriver, log)
defer os.Remove(path.Join(boltDBParams.RootDir, "repo.db"))
defer os.Remove(path.Join(boltDBParams.RootDir, "meta.db"))
So(boltdbWrapper, ShouldNotBeNil)
So(err, ShouldBeNil)
@ -200,7 +200,7 @@ func setDynamoDBVersion(client *dynamodb.Client, versTable, vers string) error {
":Version": mdAttributeValue,
},
Key: map[string]types.AttributeValue{
"Key": &types.AttributeValueMemberS{
"TableKey": &types.AttributeValueMemberS{
Value: version.DBVersionKey,
},
},

View file

@ -51,8 +51,8 @@ var testCases = []struct {
}
func TestGarbageCollectAndRetention(t *testing.T) {
log := zlog.NewLogger("info", "")
audit := zlog.NewAuditLogger("debug", "")
log := zlog.NewLogger("info", "/dev/null")
audit := zlog.NewAuditLogger("debug", "/dev/null")
metrics := monitoring.NewMetricsServer(false, log)

View file

@ -1505,6 +1505,25 @@ func (is *ImageStore) GetIndexContent(repo string) ([]byte, error) {
return buf, nil
}
func (is *ImageStore) StatIndex(repo string) (bool, int64, time.Time, error) {
repoIndexPath := path.Join(is.rootDir, repo, "index.json")
fileInfo, err := is.storeDriver.Stat(repoIndexPath)
if err != nil {
if errors.As(err, &driver.PathNotFoundError{}) {
is.log.Error().Err(err).Str("indexFile", repoIndexPath).Msg("index.json doesn't exist")
return false, 0, time.Time{}, zerr.ErrRepoNotFound
}
is.log.Error().Err(err).Str("indexFile", repoIndexPath).Msg("failed to read index.json")
return false, 0, time.Time{}, err
}
return true, fileInfo.Size(), fileInfo.ModTime(), nil
}
func (is *ImageStore) PutIndexContent(repo string, index ispec.Index) error {
dir := path.Join(is.rootDir, repo)

View file

@ -2994,6 +2994,24 @@ func TestPullRange(t *testing.T) {
})
}
func TestStatIndex(t *testing.T) {
Convey("NewImageStore", t, func() {
dir := t.TempDir()
log := zlog.Logger{Logger: zerolog.New(os.Stdout)}
metrics := monitoring.NewMetricsServer(false, log)
imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, nil)
err := WriteImageToFileSystem(CreateRandomImage(), "repo", "tag",
storage.StoreController{DefaultStore: imgStore})
So(err, ShouldBeNil)
Convey("StatIndex PathNotFoundError", func() {
_, _, _, err := imgStore.StatIndex("not-found")
So(err, ShouldNotBeNil)
})
})
}
func TestStorageDriverErr(t *testing.T) {
dir := t.TempDir()

View file

@ -54,6 +54,7 @@ type ImageStore interface { //nolint:interfacebloat
CleanupRepo(repo string, blobs []godigest.Digest, removeRepo bool) (int, error)
GetIndexContent(repo string) ([]byte, error)
PutIndexContent(repo string, index ispec.Index) error
StatIndex(repo string) (bool, int64, time.Time, error)
GetBlobContent(repo string, digest godigest.Digest) ([]byte, error)
GetReferrers(repo string, digest godigest.Digest, artifactTypes []string) (ispec.Index, error)
GetOrasReferrers(repo string, digest godigest.Digest, artifactType string) ([]artifactspec.Descriptor, error)

View file

@ -212,7 +212,7 @@ func GenerateRandomName() (string, int64) {
return string(randomBytes), seed
}
func AccumulateField[R any, T any](list []T, accFunc func(T) R) []R {
func AccumulateField[T any, R any](list []T, accFunc func(T) R) []R {
result := make([]R, 0, len(list))
for i := range list {

View file

@ -56,6 +56,15 @@ type MockedImageStore struct {
CleanupRepoFn func(repo string, blobs []godigest.Digest, removeRepo bool) (int, error)
PutIndexContentFn func(repo string, index ispec.Index) error
PopulateStorageMetricsFn func(interval time.Duration, sch *scheduler.Scheduler)
StatIndexFn func(repo string) (bool, int64, time.Time, error)
}
func (is MockedImageStore) StatIndex(repo string) (bool, int64, time.Time, error) {
if is.StatIndexFn != nil {
return is.StatIndexFn(repo)
}
return true, 0, time.Time{}, nil
}
func (is MockedImageStore) Lock(t *time.Time) {

View file

@ -2,6 +2,7 @@ package mocks
import (
"context"
"time"
godigest "github.com/opencontainers/go-digest"
@ -9,6 +10,10 @@ import (
)
type MetaDBMock struct {
DeleteRepoMetaFn func(repo string) error
GetRepoLastUpdatedFn func(repo string) time.Time
GetStarredReposFn func(ctx context.Context) ([]string, error)
GetBookmarkedReposFn func(ctx context.Context) ([]string, error)
@ -98,9 +103,35 @@ type MetaDBMock struct {
ResetRepoReferencesFn func(repo string) error
GetAllRepoNamesFn func() ([]string, error)
ResetDBFn func() error
}
func (sdm MetaDBMock) DeleteRepoMeta(repo string) error {
if sdm.DeleteRepoMetaFn != nil {
return sdm.DeleteRepoMetaFn(repo)
}
return nil
}
func (sdm MetaDBMock) GetAllRepoNames() ([]string, error) {
if sdm.GetAllRepoNamesFn != nil {
return sdm.GetAllRepoNamesFn()
}
return []string{}, nil
}
func (sdm MetaDBMock) GetRepoLastUpdated(repo string) time.Time {
if sdm.GetRepoLastUpdatedFn != nil {
return sdm.GetRepoLastUpdatedFn(repo)
}
return time.Time{}
}
func (sdm MetaDBMock) ResetDB() error {
if sdm.ResetDBFn != nil {
return sdm.ResetDBFn()