From 4fb1e756c4049cc5ab0014d96ae381e5f1172ad7 Mon Sep 17 00:00:00 2001 From: LaurentiuNiculae Date: Thu, 16 Nov 2023 20:39:27 +0200 Subject: [PATCH] feat(startup): update logic for metadb update on startup, skip unmodified repos (#2024) - MetaDB stores the time of the last update of a repo - During startup we check if the layout has been updated after the last recorded change in the db - If this is the case, the repo is parsed and updated in the DB otherwise it's skipped Signed-off-by: Laurentiu Niculae --- pkg/api/controller_test.go | 8 +- pkg/extensions/search/search_test.go | 2 +- pkg/meta/boltdb/boltdb.go | 139 ++++++++++++++++++- pkg/meta/boltdb/boltdb_test.go | 67 +++------- pkg/meta/boltdb/buckets.go | 6 +- pkg/meta/boltdb/parameters.go | 2 +- pkg/meta/dynamodb/dynamodb.go | 192 +++++++++++++++++++++++---- pkg/meta/dynamodb/dynamodb_test.go | 52 +++++++- pkg/meta/dynamodb/iterator.go | 5 +- pkg/meta/meta_test.go | 73 +++++++++- pkg/meta/parse.go | 97 +++++++++++--- pkg/meta/parse_test.go | 184 ++++++++++++++++++++----- pkg/meta/types/types.go | 7 + pkg/meta/version/version_test.go | 4 +- pkg/storage/gc/gc_test.go | 4 +- pkg/storage/imagestore/imagestore.go | 19 +++ pkg/storage/local/local_test.go | 18 +++ pkg/storage/types/types.go | 1 + pkg/test/common/utils.go | 2 +- pkg/test/mocks/image_store_mock.go | 9 ++ pkg/test/mocks/repo_db_mock.go | 31 +++++ 21 files changed, 763 insertions(+), 159 deletions(-) diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 9e245168..1e419e87 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -264,22 +264,22 @@ func TestCreateMetaDBDriver(t *testing.T) { const perms = 0o600 - boltDB, err := bbolt.Open(path.Join(dir, "repo.db"), perms, &bbolt.Options{Timeout: time.Second * 10}) + boltDB, err := bbolt.Open(path.Join(dir, "meta.db"), perms, &bbolt.Options{Timeout: time.Second * 10}) So(err, ShouldBeNil) err = boltDB.Close() So(err, ShouldBeNil) - err = os.Chmod(path.Join(dir, "repo.db"), 0o200) + err = os.Chmod(path.Join(dir, "meta.db"), 0o200) So(err, ShouldBeNil) _, err = meta.New(conf.Storage.StorageConfig, log) So(err, ShouldNotBeNil) - err = os.Chmod(path.Join(dir, "repo.db"), 0o600) + err = os.Chmod(path.Join(dir, "meta.db"), 0o600) So(err, ShouldBeNil) - defer os.Remove(path.Join(dir, "repo.db")) + defer os.Remove(path.Join(dir, "meta.db")) }) } diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index 9d3d9240..db83c282 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -53,7 +53,7 @@ import ( const ( graphqlQueryPrefix = constants.FullSearchPrefix - DBFileName = "repo.db" + DBFileName = "meta.db" ) var ( diff --git a/pkg/meta/boltdb/boltdb.go b/pkg/meta/boltdb/boltdb.go index a7a24298..b17d50a6 100644 --- a/pkg/meta/boltdb/boltdb.go +++ b/pkg/meta/boltdb/boltdb.go @@ -65,7 +65,12 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) { return err } - _, err = transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck)) + repoBlobsBuck, err := transaction.CreateBucketIfNotExists([]byte(RepoBlobsBuck)) + if err != nil { + return err + } + + _, err = repoBlobsBuck.CreateBucketIfNotExists([]byte(RepoLastUpdatedBuck)) if err != nil { return err } @@ -84,6 +89,53 @@ func New(boltDB *bbolt.DB, log log.Logger) (*BoltDB, error) { }, nil } +func (bdw *BoltDB) GetAllRepoNames() ([]string, error) { + repoNames := []string{} + + err := bdw.DB.View(func(tx *bbolt.Tx) error { + repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck)) + + return repoMetaBuck.ForEach(func(repo, _ []byte) error { + repoNames = append(repoNames, string(repo)) + + return nil + }) + }) + + return repoNames, err +} + +func (bdw *BoltDB) GetRepoLastUpdated(repo string) time.Time { + lastUpdated := time.Time{} + + err := bdw.DB.View(func(tx *bbolt.Tx) error { + repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck)) + + lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck)) + + lastUpdatedBlob := lastUpdatedBuck.Get([]byte(repo)) + if len(lastUpdatedBlob) == 0 { + return zerr.ErrRepoMetaNotFound + } + + protoTime := ×tamppb.Timestamp{} + + err := proto.Unmarshal(lastUpdatedBlob, protoTime) + if err != nil { + return err + } + + lastUpdated = *mConvert.GetTime(protoTime) + + return nil + }) + if err != nil { + return time.Time{} + } + + return lastUpdated +} + func (bdw *BoltDB) SetImageMeta(digest godigest.Digest, imageMeta mTypes.ImageMeta) error { err := bdw.DB.Update(func(tx *bbolt.Tx) error { buck := tx.Bucket([]byte(ImageMetaBuck)) @@ -132,6 +184,7 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference err = bdw.DB.Update(func(tx *bbolt.Tx) error { repoBuck := tx.Bucket([]byte(RepoMetaBuck)) repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck)) + repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck)) imageBuck := tx.Bucket([]byte(ImageMetaBuck)) // 1. Add image data to db if needed @@ -226,6 +279,11 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference protoRepoMeta, repoBlobs = common.AddImageMetaToRepoMeta(protoRepoMeta, repoBlobs, reference, imageMeta) + err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck) + if err != nil { + return err + } + err = setProtoRepoBlobs(repoBlobs, repoBlobsBuck) if err != nil { return err @@ -237,6 +295,17 @@ func (bdw *BoltDB) SetRepoReference(ctx context.Context, repo string, reference return err } +func setRepoLastUpdated(repo string, lastUpdated time.Time, repoLastUpdatedBuck *bbolt.Bucket) error { + protoTime := timestamppb.New(lastUpdated) + + protoTimeBlob, err := proto.Marshal(protoTime) + if err != nil { + return err + } + + return repoLastUpdatedBuck.Put([]byte(repo), protoTimeBlob) +} + func unmarshalProtoRepoBlobs(repo string, repoBlobsBytes []byte) (*proto_go.RepoBlobs, error) { repoBlobs := &proto_go.RepoBlobs{ Name: repo, @@ -1010,6 +1079,7 @@ func (bdw *BoltDB) DecrementRepoStars(repo string) error { func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error { err := bdw.DB.Update(func(tx *bbolt.Tx) error { buck := tx.Bucket([]byte(RepoMetaBuck)) + repoLastUpdatedBuck := tx.Bucket([]byte(RepoBlobsBuck)).Bucket([]byte(RepoLastUpdatedBuck)) repoMeta.Name = repo @@ -1018,7 +1088,35 @@ func (bdw *BoltDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error { return err } - return buck.Put([]byte(repo), repoMetaBlob) + err = buck.Put([]byte(repo), repoMetaBlob) + if err != nil { + return err + } + + // The last update time is set to 0 in order to force an update in case of a next storage parsing + return setRepoLastUpdated(repo, time.Time{}, repoLastUpdatedBuck) + }) + + return err +} + +func (bdw *BoltDB) DeleteRepoMeta(repo string) error { + err := bdw.DB.Update(func(tx *bbolt.Tx) error { + repoBuck := tx.Bucket([]byte(RepoMetaBuck)) + repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck)) + repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck)) + + err := repoBuck.Delete([]byte(repo)) + if err != nil { + return err + } + + err = repoBlobsBuck.Delete([]byte(repo)) + if err != nil { + return err + } + + return repoLastUpdatedBuck.Delete([]byte(repo)) }) return err @@ -1212,6 +1310,7 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go repoMetaBuck := tx.Bucket([]byte(RepoMetaBuck)) imageMetaBuck := tx.Bucket([]byte(ImageMetaBuck)) repoBlobsBuck := tx.Bucket([]byte(RepoBlobsBuck)) + repoLastUpdatedBuck := repoBlobsBuck.Bucket([]byte(RepoLastUpdatedBuck)) protoRepoMeta, err := getProtoRepoMeta(repo, repoMetaBuck) if err != nil { @@ -1292,6 +1391,11 @@ func (bdw *BoltDB) RemoveRepoReference(repo, reference string, manifestDigest go return err } + err = setRepoLastUpdated(repo, time.Now(), repoLastUpdatedBuck) + if err != nil { + return err + } + protoRepoMeta, repoBlobs = common.RemoveImageFromRepoMeta(protoRepoMeta, repoBlobs, reference) repoBlobsBytes, err = proto.Marshal(repoBlobs) @@ -1934,12 +2038,39 @@ func (bdw *BoltDB) ResetDB() error { } func resetBucket(transaction *bbolt.Tx, bucketName string) error { - err := transaction.DeleteBucket([]byte(bucketName)) + bucket := transaction.Bucket([]byte(bucketName)) + if bucket == nil { + return nil + } + + // we need to create the sub buckets if they exits, we'll presume the sub-buckets are not nested more than 1 layer + subBuckets := [][]byte{} + + err := bucket.ForEachBucket(func(bucketName []byte) error { + subBuckets = append(subBuckets, bucketName) + + return nil + }) if err != nil { return err } - _, err = transaction.CreateBucketIfNotExists([]byte(bucketName)) + err = transaction.DeleteBucket([]byte(bucketName)) + if err != nil { + return err + } + + bucket, err = transaction.CreateBucketIfNotExists([]byte(bucketName)) + if err != nil { + return err + } + + for _, subBucket := range subBuckets { + _, err := bucket.CreateBucketIfNotExists(subBucket) + if err != nil { + return err + } + } return err } diff --git a/pkg/meta/boltdb/boltdb_test.go b/pkg/meta/boltdb/boltdb_test.go index bd993735..8d894d9e 100644 --- a/pkg/meta/boltdb/boltdb_test.go +++ b/pkg/meta/boltdb/boltdb_test.go @@ -63,58 +63,6 @@ func TestWrapperErrors(t *testing.T) { ctx := userAc.DeriveContext(context.Background()) - Convey("ResetDB", func() { - Convey("RepoMetaBuck", func() { - err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error { - return tx.DeleteBucket([]byte(boltdb.RepoMetaBuck)) - }) - So(err, ShouldBeNil) - - err = boltdbWrapper.ResetDB() - So(err, ShouldNotBeNil) - }) - - Convey("ImageMetaBuck", func() { - err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error { - return tx.DeleteBucket([]byte(boltdb.ImageMetaBuck)) - }) - So(err, ShouldBeNil) - - err = boltdbWrapper.ResetDB() - So(err, ShouldNotBeNil) - }) - - Convey("RepoBlobsBuck", func() { - err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error { - return tx.DeleteBucket([]byte(boltdb.RepoBlobsBuck)) - }) - So(err, ShouldBeNil) - - err = boltdbWrapper.ResetDB() - So(err, ShouldNotBeNil) - }) - - Convey("UserAPIKeysBucket", func() { - err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error { - return tx.DeleteBucket([]byte(boltdb.UserAPIKeysBucket)) - }) - So(err, ShouldBeNil) - - err = boltdbWrapper.ResetDB() - So(err, ShouldNotBeNil) - }) - - Convey("UserDataBucket", func() { - err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error { - return tx.DeleteBucket([]byte(boltdb.UserDataBucket)) - }) - So(err, ShouldBeNil) - - err = boltdbWrapper.ResetDB() - So(err, ShouldNotBeNil) - }) - }) - Convey("RemoveRepoReference", func() { Convey("getProtoRepoMeta errors", func() { err := setRepoMeta("repo", badProtoBlob, boltdbWrapper.DB) @@ -192,6 +140,21 @@ func TestWrapperErrors(t *testing.T) { }) }) + Convey("GetRepoLastUpdated", func() { + Convey("bad blob in db", func() { + err := boltdbWrapper.DB.Update(func(tx *bbolt.Tx) error { + repoBlobsBuck := tx.Bucket([]byte(boltdb.RepoBlobsBuck)) + lastUpdatedBuck := repoBlobsBuck.Bucket([]byte(boltdb.RepoLastUpdatedBuck)) + + return lastUpdatedBuck.Put([]byte("repo"), []byte("bad-blob")) + }) + So(err, ShouldBeNil) + + lastUpdated := boltdbWrapper.GetRepoLastUpdated("repo") + So(lastUpdated, ShouldEqual, time.Time{}) + }) + }) + Convey("UpdateStatsOnDownload", func() { Convey("repo meta not found", func() { err = boltdbWrapper.UpdateStatsOnDownload("repo", "ref") diff --git a/pkg/meta/boltdb/buckets.go b/pkg/meta/boltdb/buckets.go index 90188974..89adefca 100644 --- a/pkg/meta/boltdb/buckets.go +++ b/pkg/meta/boltdb/buckets.go @@ -4,8 +4,12 @@ package boltdb const ( ImageMetaBuck = "ImageMeta" RepoMetaBuck = "RepoMeta" - RepoBlobsBuck = "RepoBlobsMeta" UserDataBucket = "UserData" VersionBucket = "Version" UserAPIKeysBucket = "UserAPIKeys" ) + +const ( + RepoBlobsBuck = "RepoBlobsMeta" + RepoLastUpdatedBuck = "RepoLastUpdated" // Sub-bucket +) diff --git a/pkg/meta/boltdb/parameters.go b/pkg/meta/boltdb/parameters.go index 31db7a49..dc3d5f2e 100644 --- a/pkg/meta/boltdb/parameters.go +++ b/pkg/meta/boltdb/parameters.go @@ -14,7 +14,7 @@ type DBParameters struct { func GetBoltDriver(params DBParameters) (*bolt.DB, error) { const perms = 0o600 - boltDB, err := bolt.Open(path.Join(params.RootDir, "repo.db"), perms, &bolt.Options{Timeout: time.Second * 10}) + boltDB, err := bolt.Open(path.Join(params.RootDir, "meta.db"), perms, &bolt.Options{Timeout: time.Second * 10}) if err != nil { return nil, err } diff --git a/pkg/meta/dynamodb/dynamodb.go b/pkg/meta/dynamodb/dynamodb.go index cfb84e0e..265397e1 100644 --- a/pkg/meta/dynamodb/dynamodb.go +++ b/pkg/meta/dynamodb/dynamodb.go @@ -90,6 +90,66 @@ func New(client *dynamodb.Client, params DBDriverParameters, log log.Logger, return &dynamoWrapper, nil } +func (dwr *DynamoDB) GetAllRepoNames() ([]string, error) { + ctx := context.Background() + attributeIterator := NewBaseDynamoAttributesIterator(dwr.Client, dwr.RepoMetaTablename, "TableKey", 0, dwr.Log) + + repoNames := []string{} + + repoNameAttribute, err := attributeIterator.First(ctx) + + for ; repoNameAttribute != nil; repoNameAttribute, err = attributeIterator.Next(ctx) { + if err != nil { + return []string{}, err + } + + var repoName string + + err := attributevalue.Unmarshal(repoNameAttribute, &repoName) + if err != nil { + continue + } + + repoNames = append(repoNames, repoName) + } + + return repoNames, nil +} + +func (dwr *DynamoDB) GetRepoLastUpdated(repo string) time.Time { + resp, err := dwr.Client.GetItem(context.Background(), &dynamodb.GetItemInput{ + TableName: aws.String(dwr.RepoBlobsTablename), + Key: map[string]types.AttributeValue{ + "TableKey": &types.AttributeValueMemberS{Value: repo}, + }, + ProjectionExpression: aws.String("RepoLastUpdated"), + }) + if err != nil { + return time.Time{} + } + + protoRepoLastUpdated := ×tamppb.Timestamp{} + repoLastUpdatedBlob := []byte{} + + if resp.Item != nil { + err = attributevalue.Unmarshal(resp.Item["RepoLastUpdated"], &repoLastUpdatedBlob) + if err != nil { + return time.Time{} + } + + if len(repoLastUpdatedBlob) > 0 { + err := proto.Unmarshal(repoLastUpdatedBlob, protoRepoLastUpdated) + if err != nil { + return time.Time{} + } + } + } + + lastUpdated := *mConvert.GetTime(protoRepoLastUpdated) + + return lastUpdated +} + func (dwr *DynamoDB) ImageTrustStore() mTypes.ImageTrustStore { return dwr.imgTrustStore } @@ -117,7 +177,7 @@ func (dwr *DynamoDB) SetProtoImageMeta(digest godigest.Digest, protoImageMeta *p ":ImageMeta": mdAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: digest.String(), }, }, @@ -136,7 +196,7 @@ func (dwr *DynamoDB) GetProtoImageMeta(ctx context.Context, digest godigest.Dige resp, err := dwr.Client.GetItem(ctx, &dynamodb.GetItemInput{ TableName: aws.String(dwr.ImageMetaTablename), Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{Value: digest.String()}, + "TableKey": &types.AttributeValueMemberS{Value: digest.String()}, }, }) if err != nil { @@ -185,7 +245,7 @@ func (dwr *DynamoDB) setProtoRepoMeta(repo string, repoMeta *proto_go.RepoMeta) ":RepoMeta": repoAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: repo, }, }, @@ -200,7 +260,7 @@ func (dwr *DynamoDB) getProtoRepoMeta(ctx context.Context, repo string) (*proto_ resp, err := dwr.Client.GetItem(ctx, &dynamodb.GetItemInput{ TableName: aws.String(dwr.RepoMetaTablename), Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{Value: repo}, + "TableKey": &types.AttributeValueMemberS{Value: repo}, }, }) if err != nil { @@ -354,11 +414,43 @@ func (dwr *DynamoDB) SetRepoReference(ctx context.Context, repo string, referenc return dwr.setProtoRepoMeta(repo, repoMeta) //nolint: contextcheck } +func (dwr *DynamoDB) updateRepoLastUpdated(ctx context.Context, repo string, time time.Time) error { + protoTime := timestamppb.New(time) + + protoTimeBlob, err := proto.Marshal(protoTime) + if err != nil { + return err + } + + mdAttributeValue, err := attributevalue.Marshal(protoTimeBlob) + if err != nil { + return err + } + + _, err = dwr.Client.UpdateItem(ctx, &dynamodb.UpdateItemInput{ + ExpressionAttributeNames: map[string]string{ + "#RLU": "RepoLastUpdated", + }, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":RepoLastUpdated": mdAttributeValue, + }, + Key: map[string]types.AttributeValue{ + "TableKey": &types.AttributeValueMemberS{ + Value: repo, + }, + }, + TableName: aws.String(dwr.RepoBlobsTablename), + UpdateExpression: aws.String("SET #RLU = :RepoLastUpdated"), + }) + + return err +} + func (dwr *DynamoDB) getProtoRepoBlobs(ctx context.Context, repo string) (*proto_go.RepoBlobs, error) { resp, err := dwr.Client.GetItem(ctx, &dynamodb.GetItemInput{ TableName: aws.String(dwr.RepoBlobsTablename), Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{Value: repo}, + "TableKey": &types.AttributeValueMemberS{Value: repo}, }, }) if err != nil { @@ -393,6 +485,18 @@ func (dwr *DynamoDB) getProtoRepoBlobs(ctx context.Context, repo string) (*proto } func (dwr *DynamoDB) setRepoBlobsInfo(repo string, repoBlobs *proto_go.RepoBlobs) error { + protoTime := timestamppb.Now() + + protoTimeBlob, err := proto.Marshal(protoTime) + if err != nil { + return err + } + + timeAttributeValue, err := attributevalue.Marshal(protoTimeBlob) + if err != nil { + return err + } + bytes, err := proto.Marshal(repoBlobs) if err != nil { return err @@ -406,17 +510,19 @@ func (dwr *DynamoDB) setRepoBlobsInfo(repo string, repoBlobs *proto_go.RepoBlobs _, err = dwr.Client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{ ExpressionAttributeNames: map[string]string{ "#RBI": "RepoBlobsInfo", + "#RLU": "RepoLastUpdated", }, ExpressionAttributeValues: map[string]types.AttributeValue{ - ":RepoBlobsInfo": mdAttributeValue, + ":RepoBlobsInfo": mdAttributeValue, + ":RepoLastUpdated": timeAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: repo, }, }, TableName: aws.String(dwr.RepoBlobsTablename), - UpdateExpression: aws.String("SET #RBI = :RepoBlobsInfo"), + UpdateExpression: aws.String("SET #RBI = :RepoBlobsInfo, #RLU = :RepoLastUpdated"), }) return err @@ -932,9 +1038,43 @@ func (dwr *DynamoDB) DecrementRepoStars(repo string) error { func (dwr *DynamoDB) SetRepoMeta(repo string, repoMeta mTypes.RepoMeta) error { protoRepoMeta := mConvert.GetProtoRepoMeta(repoMeta) + err := dwr.updateRepoLastUpdated(context.Background(), repo, time.Time{}) + if err != nil { + return err + } + return dwr.setProtoRepoMeta(repo, protoRepoMeta) } +func (dwr *DynamoDB) DeleteRepoMeta(repo string) error { + _, err := dwr.Client.TransactWriteItems(context.Background(), &dynamodb.TransactWriteItemsInput{ + TransactItems: []types.TransactWriteItem{ + { + Delete: &types.Delete{ + Key: map[string]types.AttributeValue{ + "TableKey": &types.AttributeValueMemberS{ + Value: repo, + }, + }, + TableName: aws.String(dwr.RepoMetaTablename), + }, + }, + { + Delete: &types.Delete{ + Key: map[string]types.AttributeValue{ + "TableKey": &types.AttributeValueMemberS{ + Value: repo, + }, + }, + TableName: aws.String(dwr.RepoBlobsTablename), + }, + }, + }, + }) + + return err +} + func (dwr *DynamoDB) GetReferrersInfo(repo string, referredDigest godigest.Digest, artifactTypes []string, ) ([]mTypes.ReferrerInfo, error) { repoMeta, err := dwr.GetRepoMeta(context.Background(), repo) @@ -1216,6 +1356,8 @@ func (dwr *DynamoDB) FilterImageMeta(ctx context.Context, digests []string, func (dwr *DynamoDB) RemoveRepoReference(repo, reference string, manifestDigest godigest.Digest, ) error { + ctx := context.Background() + protoRepoMeta, err := dwr.getProtoRepoMeta(context.Background(), repo) if err != nil { if errors.Is(err, zerr.ErrRepoMetaNotFound) { @@ -1295,7 +1437,7 @@ func (dwr *DynamoDB) RemoveRepoReference(repo, reference string, manifestDigest delete(protoRepoMeta.Referrers, manifestDigest.String()) } - repoBlobsInfo, err := dwr.getProtoRepoBlobs(context.Background(), repo) + repoBlobsInfo, err := dwr.getProtoRepoBlobs(ctx, repo) if err != nil { return err } @@ -1448,7 +1590,7 @@ func (dwr *DynamoDB) ToggleStarRepo(ctx context.Context, repo string) ( ":UserData": userAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: userid, }, }, @@ -1466,7 +1608,7 @@ func (dwr *DynamoDB) ToggleStarRepo(ctx context.Context, repo string) ( ":RepoMeta": repoAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: repo, }, }, @@ -1642,7 +1784,7 @@ func (dwr DynamoDB) AddUserAPIKey(ctx context.Context, hashedKey string, apiKeyD ":UserData": userAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: userid, }, }, @@ -1660,7 +1802,7 @@ func (dwr DynamoDB) AddUserAPIKey(ctx context.Context, hashedKey string, apiKeyD ":Identity": &types.AttributeValueMemberS{Value: userid}, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: hashedKey, }, }, @@ -1687,7 +1829,7 @@ func (dwr DynamoDB) DeleteUserAPIKey(ctx context.Context, keyID string) error { _, err = dwr.Client.DeleteItem(ctx, &dynamodb.DeleteItemInput{ TableName: aws.String(dwr.APIKeyTablename), Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{Value: hash}, + "TableKey": &types.AttributeValueMemberS{Value: hash}, }, }) if err != nil { @@ -1709,7 +1851,7 @@ func (dwr DynamoDB) GetUserAPIKeyInfo(hashedKey string) (string, error) { resp, err := dwr.Client.GetItem(context.Background(), &dynamodb.GetItemInput{ TableName: aws.String(dwr.APIKeyTablename), Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{Value: hashedKey}, + "TableKey": &types.AttributeValueMemberS{Value: hashedKey}, }, }) if err != nil { @@ -1745,7 +1887,7 @@ func (dwr DynamoDB) GetUserData(ctx context.Context) (mTypes.UserData, error) { resp, err := dwr.Client.GetItem(ctx, &dynamodb.GetItemInput{ TableName: aws.String(dwr.UserDataTablename), Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{Value: userid}, + "TableKey": &types.AttributeValueMemberS{Value: userid}, }, }) if err != nil { @@ -1789,7 +1931,7 @@ func (dwr DynamoDB) SetUserData(ctx context.Context, userData mTypes.UserData) e ":UserData": userAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: userid, }, }, @@ -1815,7 +1957,7 @@ func (dwr DynamoDB) DeleteUserData(ctx context.Context) error { _, err = dwr.Client.DeleteItem(ctx, &dynamodb.DeleteItemInput{ TableName: aws.String(dwr.UserDataTablename), Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{Value: userid}, + "TableKey": &types.AttributeValueMemberS{Value: userid}, }, }) @@ -1847,7 +1989,7 @@ func getBatchImageKeys(digests []string) []map[string]types.AttributeValue { for _, digest := range digests { result = append(result, map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: digest, }, }) @@ -1928,13 +2070,13 @@ func (dwr *DynamoDB) createTable(tableName string) error { TableName: aws.String(tableName), AttributeDefinitions: []types.AttributeDefinition{ { - AttributeName: aws.String("Key"), + AttributeName: aws.String("TableKey"), AttributeType: types.ScalarAttributeTypeS, }, }, KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String("Key"), + AttributeName: aws.String("TableKey"), KeyType: types.KeyTypeHash, }, }, @@ -1985,13 +2127,13 @@ func (dwr *DynamoDB) createVersionTable() error { TableName: aws.String(dwr.VersionTablename), AttributeDefinitions: []types.AttributeDefinition{ { - AttributeName: aws.String("Key"), + AttributeName: aws.String("TableKey"), AttributeType: types.ScalarAttributeTypeS, }, }, KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String("Key"), + AttributeName: aws.String("TableKey"), KeyType: types.KeyTypeHash, }, }, @@ -2024,7 +2166,7 @@ func (dwr *DynamoDB) createVersionTable() error { ":Version": mdAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: version.DBVersionKey, }, }, @@ -2044,7 +2186,7 @@ func (dwr *DynamoDB) getDBVersion() (string, error) { resp, err := dwr.Client.GetItem(context.TODO(), &dynamodb.GetItemInput{ TableName: aws.String(dwr.VersionTablename), Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{Value: version.DBVersionKey}, + "TableKey": &types.AttributeValueMemberS{Value: version.DBVersionKey}, }, }) if err != nil { diff --git a/pkg/meta/dynamodb/dynamodb_test.go b/pkg/meta/dynamodb/dynamodb_test.go index f1e716c9..79af8361 100644 --- a/pkg/meta/dynamodb/dynamodb_test.go +++ b/pkg/meta/dynamodb/dynamodb_test.go @@ -4,6 +4,7 @@ import ( "context" "os" "testing" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -879,6 +880,22 @@ func TestWrapperErrors(t *testing.T) { So(err, ShouldNotBeNil) }) + Convey("GetRepoLastUpdated", func() { + Convey("bad table", func() { + dynamoWrapper.RepoBlobsTablename = "bad-table" + + lastUpdated := dynamoWrapper.GetRepoLastUpdated("repo") + So(lastUpdated, ShouldEqual, time.Time{}) + }) + + Convey("unmarshal error", func() { + err := setRepoLastUpdated("repo", []byte("bad-blob"), dynamoWrapper) + So(err, ShouldBeNil) + lastUpdated := dynamoWrapper.GetRepoLastUpdated("repo") + So(lastUpdated, ShouldEqual, time.Time{}) + }) + }) + Convey("DeleteUserAPIKey returns nil", func() { userAc := reqCtx.NewUserAccessControl() userAc.SetUsername("email") @@ -1090,7 +1107,7 @@ func setRepoMeta(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB) er ":RepoMeta": userAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: repo, }, }, @@ -1101,6 +1118,31 @@ func setRepoMeta(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB) er return err } +func setRepoLastUpdated(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB) error { //nolint: unparam + lastUpdatedAttributeValue, err := attributevalue.Marshal(blob) + if err != nil { + return err + } + + _, err = dynamoWrapper.Client.UpdateItem(context.Background(), &dynamodb.UpdateItemInput{ + ExpressionAttributeNames: map[string]string{ + "#RLU": "RepoLastUpdated", + }, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":RepoLastUpdated": lastUpdatedAttributeValue, + }, + Key: map[string]types.AttributeValue{ + "TableKey": &types.AttributeValueMemberS{ + Value: repo, + }, + }, + TableName: aws.String(dynamoWrapper.RepoBlobsTablename), + UpdateExpression: aws.String("SET #RLU = :RepoLastUpdated"), + }) + + return err +} + func setRepoBlobInfo(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB) error { userAttributeValue, err := attributevalue.Marshal(blob) if err != nil { @@ -1115,7 +1157,7 @@ func setRepoBlobInfo(repo string, blob []byte, dynamoWrapper *mdynamodb.DynamoDB ":RepoBlobsInfo": userAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: repo, }, }, @@ -1140,7 +1182,7 @@ func setImageMeta(digest godigest.Digest, blob []byte, dynamoWrapper *mdynamodb. ":ImageMeta": userAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: digest.String(), }, }, @@ -1165,7 +1207,7 @@ func setBadUserData(client *dynamodb.Client, userDataTablename, userID string) e ":UserData": userAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: userID, }, }, @@ -1190,7 +1232,7 @@ func setVersion(client *dynamodb.Client, versionTablename string, version string ":Version": mdAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: "DBVersion", }, }, diff --git a/pkg/meta/dynamodb/iterator.go b/pkg/meta/dynamodb/iterator.go index cf8c057b..e605d788 100644 --- a/pkg/meta/dynamodb/iterator.go +++ b/pkg/meta/dynamodb/iterator.go @@ -50,8 +50,9 @@ func NewBaseDynamoAttributesIterator(client *dynamodb.Client, table, attribute s func (dii *BaseAttributesIterator) First(ctx context.Context) (types.AttributeValue, error) { scanOutput, err := dii.Client.Scan(ctx, &dynamodb.ScanInput{ - TableName: aws.String(dii.Table), - Limit: dii.readLimit, + TableName: aws.String(dii.Table), + Limit: dii.readLimit, + ProjectionExpression: aws.String(dii.Attribute), }) if err != nil { return &types.AttributeValueMemberBOOL{}, err diff --git a/pkg/meta/meta_test.go b/pkg/meta/meta_test.go index 4c343274..de7b7ef0 100644 --- a/pkg/meta/meta_test.go +++ b/pkg/meta/meta_test.go @@ -48,7 +48,7 @@ func getManifestDigest(md mTypes.ManifestMeta) string { return md.Digest.String( func TestBoltDB(t *testing.T) { Convey("BoltDB creation", t, func() { boltDBParams := boltdb.DBParameters{RootDir: t.TempDir()} - repoDBPath := path.Join(boltDBParams.RootDir, "repo.db") + repoDBPath := path.Join(boltDBParams.RootDir, "meta.db") boltDriver, err := boltdb.GetBoltDriver(boltDBParams) So(err, ShouldBeNil) @@ -85,7 +85,7 @@ func TestBoltDB(t *testing.T) { boltdbWrapper.SetImageTrustStore(imgTrustStore) defer func() { - os.Remove(path.Join(boltDBParams.RootDir, "repo.db")) + os.Remove(path.Join(boltDBParams.RootDir, "meta.db")) os.RemoveAll(path.Join(boltDBParams.RootDir, "_cosign")) os.RemoveAll(path.Join(boltDBParams.RootDir, "_notation")) }() @@ -2147,7 +2147,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func Convey("GetReferrersInfo", func() { repo := "repo" - tag := "tag" + tag := "test-tag" image := CreateRandomImage() err := metaDB.SetRepoReference(ctx, repo, tag, image.AsImageMeta()) @@ -2243,7 +2243,7 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func Convey("FilterRepos", func() { repo := "repoFilter" tag1 := "tag1" - tag2 := "tag2" + tag2 := "tag22" image := CreateImageWith().DefaultLayers().PlatformConfig("image-platform", "image-os").Build() err := metaDB.SetRepoReference(ctx, repo, tag1, image.AsImageMeta()) @@ -2371,6 +2371,71 @@ func RunMetaDBTests(t *testing.T, metaDB mTypes.MetaDB, preparationFuncs ...func So(repoMeta.IsStarred, ShouldBeTrue) So(repoMeta.Tags, ShouldContainKey, "tag") }) + + Convey("GetAllRepoNames", func() { + repo1 := "repo1" + repo2 := "repo2" + repo3 := "repo3" + imageMeta := CreateRandomImage().AsImageMeta() + + err := metaDB.SetRepoReference(ctx, repo1, "tag", imageMeta) + So(err, ShouldBeNil) + err = metaDB.SetRepoReference(ctx, repo2, "tag", imageMeta) + So(err, ShouldBeNil) + err = metaDB.SetRepoReference(ctx, repo3, "tag", imageMeta) + So(err, ShouldBeNil) + + repos, err := metaDB.GetAllRepoNames() + So(err, ShouldBeNil) + So(repos, ShouldContain, repo1) + So(repos, ShouldContain, repo2) + So(repos, ShouldContain, repo3) + + err = metaDB.DeleteRepoMeta(repo1) + So(err, ShouldBeNil) + + repos, err = metaDB.GetAllRepoNames() + So(err, ShouldBeNil) + So(repos, ShouldNotContain, repo1) + So(repos, ShouldContain, repo2) + So(repos, ShouldContain, repo3) + + err = metaDB.DeleteRepoMeta(repo2) + So(err, ShouldBeNil) + + repos, err = metaDB.GetAllRepoNames() + So(err, ShouldBeNil) + So(repos, ShouldNotContain, repo1) + So(repos, ShouldNotContain, repo2) + So(repos, ShouldContain, repo3) + + err = metaDB.SetRepoReference(ctx, repo1, "tag", imageMeta) + So(err, ShouldBeNil) + + repos, err = metaDB.GetAllRepoNames() + So(err, ShouldBeNil) + So(repos, ShouldContain, repo1) + So(repos, ShouldNotContain, repo2) + So(repos, ShouldContain, repo3) + + err = metaDB.DeleteRepoMeta(repo1) + So(err, ShouldBeNil) + + repos, err = metaDB.GetAllRepoNames() + So(err, ShouldBeNil) + So(repos, ShouldNotContain, repo1) + So(repos, ShouldNotContain, repo2) + So(repos, ShouldContain, repo3) + + err = metaDB.DeleteRepoMeta(repo3) + So(err, ShouldBeNil) + + repos, err = metaDB.GetAllRepoNames() + So(err, ShouldBeNil) + So(repos, ShouldNotContain, repo1) + So(repos, ShouldNotContain, repo2) + So(repos, ShouldNotContain, repo3) + }) }) } diff --git a/pkg/meta/parse.go b/pkg/meta/parse.go index f6979fbd..42462ba3 100644 --- a/pkg/meta/parse.go +++ b/pkg/meta/parse.go @@ -14,8 +14,7 @@ import ( "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/meta/convert" mTypes "zotregistry.io/zot/pkg/meta/types" - "zotregistry.io/zot/pkg/storage" - storageTypes "zotregistry.io/zot/pkg/storage/types" + stypes "zotregistry.io/zot/pkg/storage/types" ) const ( @@ -25,37 +24,85 @@ const ( // ParseStorage will sync all repos found in the rootdirectory of the oci layout that zot was deployed on with the // ParseStorage database. -func ParseStorage(metaDB mTypes.MetaDB, storeController storage.StoreController, log log.Logger) error { +func ParseStorage(metaDB mTypes.MetaDB, storeController stypes.StoreController, log log.Logger) error { log.Info().Msg("Started parsing storage and updating MetaDB") - allRepos, err := getAllRepos(storeController) + allStorageRepos, err := getAllRepos(storeController, log) if err != nil { - rootDir := storeController.DefaultStore.RootDir() - log.Error().Err(err).Str("rootDir", rootDir). - Msg("load-local-layout: failed to get all repo names present under rootDir") + return err + } + + allMetaDBRepos, err := metaDB.GetAllRepoNames() + if err != nil { + log.Error().Err(err). + Msg("load-metadb-layout: failed to get all repo names present in metadb") return err } - for i, repo := range allRepos { - log.Info().Int("total", len(allRepos)).Int("progress", i).Str("current-repo", repo). - Msgf("parsing next repo '%s'", repo) - - err := ParseRepo(repo, metaDB, storeController, log) + for _, repo := range getReposToBeDeleted(allStorageRepos, allMetaDBRepos) { + err := metaDB.DeleteRepoMeta(repo) if err != nil { - log.Error().Err(err).Str("repository", repo).Msg("load-local-layout: failed to sync repo") + log.Error().Err(err).Str("rootDir", storeController.GetImageStore(repo).RootDir()). + Str("repo", repo).Msg("load-metadb-layout: failed to get all repo names present in metadb") return err } } + for i, repo := range allStorageRepos { + log.Info().Int("total", len(allStorageRepos)).Int("progress", i).Str("current-repo", repo). + Msgf("parsing next repo '%s'", repo) + + imgStore := storeController.GetImageStore(repo) + + _, _, storageLastUpdated, err := imgStore.StatIndex(repo) + if err != nil { + log.Error().Err(err).Str("repository", repo).Msg("failed to sync repo") + + continue + } + + metaLastUpdated := metaDB.GetRepoLastUpdated(repo) + + if storageLastUpdated.Before(metaLastUpdated) { + continue + } + + err = ParseRepo(repo, metaDB, storeController, log) + if err != nil { + log.Error().Err(err).Str("repository", repo).Msg("failed to sync repo") + + continue + } + } + log.Info().Msg("Done parsing storage and updating MetaDB") return nil } +// getReposToBeDeleted will return all repos that are found in metaDB but not found in storage anymore. +func getReposToBeDeleted(allStorageRepos []string, allMetaDBRepos []string) []string { + toBeDeleted := []string{} + + storageRepoNameSet := map[string]struct{}{} + + for i := range allStorageRepos { + storageRepoNameSet[allStorageRepos[i]] = struct{}{} + } + + for _, metaDBRepo := range allMetaDBRepos { + if _, found := storageRepoNameSet[metaDBRepo]; !found { + toBeDeleted = append(toBeDeleted, metaDBRepo) + } + } + + return toBeDeleted +} + // ParseRepo reads the contents of a repo and syncs all images and signatures found. -func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController storage.StoreController, log log.Logger) error { +func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreController, log log.Logger) error { imageStore := storeController.GetImageStore(repo) var lockLatency time.Time @@ -120,16 +167,22 @@ func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController storage.StoreC return nil } -func getAllRepos(storeController storage.StoreController) ([]string, error) { - allRepos, err := storeController.DefaultStore.GetRepositories() +func getAllRepos(storeController stypes.StoreController, log log.Logger) ([]string, error) { + allRepos, err := storeController.GetDefaultImageStore().GetRepositories() if err != nil { + log.Error().Err(err).Str("rootDir", storeController.GetDefaultImageStore().RootDir()). + Msg("load-local-layout: failed to get all repo names present under rootDir") + return nil, err } - if storeController.SubStore != nil { - for _, store := range storeController.SubStore { + if storeController.GetImageSubStores() != nil { + for _, store := range storeController.GetImageSubStores() { substoreRepos, err := store.GetRepositories() if err != nil { + log.Error().Err(err).Str("rootDir", store.RootDir()). + Msg("load-local-layout: failed to get all repo names present under rootDir") + return nil, err } @@ -141,7 +194,7 @@ func getAllRepos(storeController storage.StoreController) ([]string, error) { } func GetSignatureLayersInfo(repo, tag, manifestDigest, signatureType string, manifestBlob []byte, - imageStore storageTypes.ImageStore, log log.Logger, + imageStore stypes.ImageStore, log log.Logger, ) ([]mTypes.LayerInfo, error) { switch signatureType { case zcommon.CosignSignature: @@ -154,7 +207,7 @@ func GetSignatureLayersInfo(repo, tag, manifestDigest, signatureType string, man } func getCosignSignatureLayersInfo( - repo, tag, manifestDigest string, manifestBlob []byte, imageStore storageTypes.ImageStore, log log.Logger, + repo, tag, manifestDigest string, manifestBlob []byte, imageStore stypes.ImageStore, log log.Logger, ) ([]mTypes.LayerInfo, error) { layers := []mTypes.LayerInfo{} @@ -197,7 +250,7 @@ func getCosignSignatureLayersInfo( } func getNotationSignatureLayersInfo( - repo, manifestDigest string, manifestBlob []byte, imageStore storageTypes.ImageStore, log log.Logger, + repo, manifestDigest string, manifestBlob []byte, imageStore stypes.ImageStore, log log.Logger, ) ([]mTypes.LayerInfo, error) { layers := []mTypes.LayerInfo{} @@ -250,7 +303,7 @@ func getNotationSignatureLayersInfo( // SetMetadataFromInput tries to set manifest metadata and update repo metadata by adding the current tag // (in case the reference is a tag). The function expects image manifests and indexes (multi arch images). func SetImageMetaFromInput(ctx context.Context, repo, reference, mediaType string, digest godigest.Digest, blob []byte, - imageStore storageTypes.ImageStore, metaDB mTypes.MetaDB, log log.Logger, + imageStore stypes.ImageStore, metaDB mTypes.MetaDB, log log.Logger, ) error { var imageMeta mTypes.ImageMeta diff --git a/pkg/meta/parse_test.go b/pkg/meta/parse_test.go index 286d838e..7c7f75bc 100644 --- a/pkg/meta/parse_test.go +++ b/pkg/meta/parse_test.go @@ -7,6 +7,7 @@ import ( "io" "os" "path" + "path/filepath" "testing" "time" @@ -24,9 +25,11 @@ import ( "zotregistry.io/zot/pkg/storage" "zotregistry.io/zot/pkg/storage/local" storageTypes "zotregistry.io/zot/pkg/storage/types" + tcommon "zotregistry.io/zot/pkg/test/common" "zotregistry.io/zot/pkg/test/deprecated" . "zotregistry.io/zot/pkg/test/image-utils" "zotregistry.io/zot/pkg/test/mocks" + ociutils "zotregistry.io/zot/pkg/test/oci-utils" "zotregistry.io/zot/pkg/test/signature" tskip "zotregistry.io/zot/pkg/test/skip" ) @@ -50,7 +53,7 @@ func TestParseStorageErrors(t *testing.T) { // sync repo fail err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", "")) - So(err, ShouldNotBeNil) + So(err, ShouldBeNil) Convey("getAllRepos errors", func() { imageStore1 := mocks.MockedImageStore{ @@ -73,6 +76,40 @@ func TestParseStorageErrors(t *testing.T) { err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", "")) So(err, ShouldNotBeNil) }) + + Convey("metaDB.GetAllRepoNames errors", func() { + metaDB.GetAllRepoNamesFn = func() ([]string, error) { return nil, ErrTestError } + + err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", "")) + So(err, ShouldNotBeNil) + }) + + Convey("metaDB.DeleteRepoMeta errors", func() { + imageStore1 := mocks.MockedImageStore{ + GetRepositoriesFn: func() ([]string, error) { return []string{"repo1", "repo2"}, nil }, + } + storeController := storage.StoreController{DefaultStore: imageStore1} + + metaDB.GetAllRepoNamesFn = func() ([]string, error) { return []string{"deleted"}, nil } + metaDB.DeleteRepoMetaFn = func(repo string) error { return ErrTestError } + + err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", "")) + So(err, ShouldNotBeNil) + }) + + Convey("StatIndex errors", func() { + imageStore1 := mocks.MockedImageStore{ + GetRepositoriesFn: func() ([]string, error) { return []string{"repo1", "repo2"}, nil }, + } + imageStore1.StatIndexFn = func(repo string) (bool, int64, time.Time, error) { + return false, 0, time.Time{}, ErrTestError + } + + storeController := storage.StoreController{DefaultStore: imageStore1} + + err := meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", "")) + So(err, ShouldBeNil) + }) }) Convey("Parse Repo", t, func() { @@ -250,16 +287,17 @@ func getIndexBlob(index ispec.Index) []byte { func TestParseStorageWithBoltDB(t *testing.T) { Convey("Boltdb", t, func() { rootDir := t.TempDir() + log := log.NewLogger("debug", "/dev/null") boltDB, err := boltdb.GetBoltDriver(boltdb.DBParameters{ RootDir: rootDir, }) So(err, ShouldBeNil) - metaDB, err := boltdb.New(boltDB, log.NewLogger("debug", "")) + metaDB, err := boltdb.New(boltDB, log) So(err, ShouldBeNil) - RunParseStorageTests(rootDir, metaDB) + RunParseStorageTests(rootDir, metaDB, log) }) } @@ -268,6 +306,7 @@ func TestParseStorageDynamoWrapper(t *testing.T) { Convey("Dynamodb", t, func() { rootDir := t.TempDir() + log := log.NewLogger("debug", "/dev/null") params := dynamodb.DBDriverParameters{ Endpoint: os.Getenv("DYNAMODBMOCK_ENDPOINT"), @@ -283,7 +322,7 @@ func TestParseStorageDynamoWrapper(t *testing.T) { dynamoClient, err := dynamodb.GetDynamoClient(params) So(err, ShouldBeNil) - dynamoWrapper, err := dynamodb.New(dynamoClient, params, log.NewLogger("debug", "")) + dynamoWrapper, err := dynamodb.New(dynamoClient, params, log) So(err, ShouldBeNil) err = dynamoWrapper.ResetTable(dynamoWrapper.RepoMetaTablename) @@ -295,29 +334,26 @@ func TestParseStorageDynamoWrapper(t *testing.T) { err = dynamoWrapper.ResetTable(dynamoWrapper.ImageMetaTablename) So(err, ShouldBeNil) - RunParseStorageTests(rootDir, dynamoWrapper) + RunParseStorageTests(rootDir, dynamoWrapper, log) }) } -func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { +func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB, log log.Logger) { + ctx := context.Background() + Convey("Test with simple case", func() { imageStore := local.NewImageStore(rootDir, false, false, - log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil) + log, monitoring.NewMetricsServer(false, log), nil, nil) storeController := storage.StoreController{DefaultStore: imageStore} manifests := []ispec.Manifest{} for i := 0; i < 3; i++ { - config, layers, manifest, err := deprecated.GetRandomImageComponents(100) //nolint:staticcheck - So(err, ShouldBeNil) + image := CreateRandomImage() //nolint:staticcheck - manifests = append(manifests, manifest) + manifests = append(manifests, image.Manifest) - err = WriteImageToFileSystem( - Image{ - Config: config, - Layers: layers, - Manifest: manifest, - }, repo, fmt.Sprintf("tag%d", i), storeController) + err := WriteImageToFileSystem( + image, repo, fmt.Sprintf("tag%d", i), storeController) So(err, ShouldBeNil) } @@ -364,18 +400,16 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { err = os.WriteFile(indexPath, buf, 0o600) So(err, ShouldBeNil) - err = meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", "")) + err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck So(err, ShouldBeNil) - repos, err := metaDB.GetMultipleRepoMeta(context.Background(), + repos, err := metaDB.GetMultipleRepoMeta(ctx, func(repoMeta mTypes.RepoMeta) bool { return true }) So(err, ShouldBeNil) So(len(repos), ShouldEqual, 1) So(len(repos[0].Tags), ShouldEqual, 2) - ctx := context.Background() - for tag, descriptor := range repos[0].Tags { imageManifestData, err := metaDB.GetFullImageMeta(ctx, repo, tag) So(err, ShouldBeNil) @@ -388,7 +422,7 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { Convey("Accept orphan signatures", func() { imageStore := local.NewImageStore(rootDir, false, false, - log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil) + log, monitoring.NewMetricsServer(false, log), nil, nil) storeController := storage.StoreController{DefaultStore: imageStore} // add an image @@ -424,10 +458,10 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { }, repo, signatureTag, storeController) So(err, ShouldBeNil) - err = meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", "")) + err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck So(err, ShouldBeNil) - repos, err := metaDB.GetMultipleRepoMeta(context.Background(), + repos, err := metaDB.GetMultipleRepoMeta(ctx, func(repoMeta mTypes.RepoMeta) bool { return true }) So(err, ShouldBeNil) @@ -443,7 +477,7 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { Convey("Check statistics after load", func() { imageStore := local.NewImageStore(rootDir, false, false, - log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil) + log, monitoring.NewMetricsServer(false, log), nil, nil) storeController := storage.StoreController{DefaultStore: imageStore} // add an image @@ -452,7 +486,7 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { err := WriteImageToFileSystem(image, repo, "tag", storeController) So(err, ShouldBeNil) - err = metaDB.SetRepoReference(context.Background(), repo, "tag", image.AsImageMeta()) + err = metaDB.SetRepoReference(ctx, repo, "tag", image.AsImageMeta()) So(err, ShouldBeNil) err = metaDB.IncrementRepoStars(repo) @@ -464,17 +498,17 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { err = metaDB.UpdateStatsOnDownload(repo, "tag") So(err, ShouldBeNil) - repoMeta, err := metaDB.GetRepoMeta(context.Background(), repo) + repoMeta, err := metaDB.GetRepoMeta(ctx, repo) So(err, ShouldBeNil) So(repoMeta.Statistics[image.DigestStr()].DownloadCount, ShouldEqual, 3) So(repoMeta.StarCount, ShouldEqual, 1) So(time.Now(), ShouldHappenAfter, repoMeta.Statistics[image.DigestStr()].LastPullTimestamp) - err = meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", "")) + err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck So(err, ShouldBeNil) - repoMeta, err = metaDB.GetRepoMeta(context.Background(), repo) + repoMeta, err = metaDB.GetRepoMeta(ctx, repo) So(err, ShouldBeNil) So(repoMeta.Statistics[image.DigestStr()].DownloadCount, ShouldEqual, 3) @@ -484,7 +518,7 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { // make sure pushTimestamp is always populated to not interfere with retention logic Convey("Always update pushTimestamp if its value is 0(time.Time{})", func() { imageStore := local.NewImageStore(rootDir, false, false, - log.NewLogger("debug", ""), monitoring.NewMetricsServer(false, log.NewLogger("debug", "")), nil, nil) + log, monitoring.NewMetricsServer(false, log), nil, nil) storeController := storage.StoreController{DefaultStore: imageStore} // add an image @@ -493,13 +527,13 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { err := WriteImageToFileSystem(image, repo, "tag", storeController) So(err, ShouldBeNil) - err = metaDB.SetRepoReference(context.Background(), repo, "tag", image.AsImageMeta()) + err = metaDB.SetRepoReference(ctx, repo, "tag", image.AsImageMeta()) So(err, ShouldBeNil) err = metaDB.UpdateStatsOnDownload(repo, "tag") So(err, ShouldBeNil) - repoMeta, err := metaDB.GetRepoMeta(context.Background(), repo) + repoMeta, err := metaDB.GetRepoMeta(ctx, repo) So(err, ShouldBeNil) So(repoMeta.Statistics[image.DigestStr()].DownloadCount, ShouldEqual, 1) @@ -516,16 +550,100 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB) { So(err, ShouldBeNil) // metaDB should detect that pushTimestamp is 0 and update it. - err = meta.ParseStorage(metaDB, storeController, log.NewLogger("debug", "")) + err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck So(err, ShouldBeNil) - repoMeta, err = metaDB.GetRepoMeta(context.Background(), repo) + repoMeta, err = metaDB.GetRepoMeta(ctx, repo) So(err, ShouldBeNil) So(repoMeta.Statistics[image.DigestStr()].DownloadCount, ShouldEqual, 1) So(repoMeta.DownloadCount, ShouldEqual, 1) So(repoMeta.Statistics[image.DigestStr()].PushTimestamp, ShouldHappenAfter, oldPushTimestamp) }) + + Convey("Parse 2 times and check correct update of the metaDB for modified and deleted repos", func() { + storeController := ociutils.GetDefaultStoreController(rootDir, log) + + notModifiedRepo := "not-modified-repo" + modifiedAddImageRepo := "modified-add-image-repo" + modifiedRemoveImageRepo := "modified-remove-image-repo" + deletedRepo := "deleted-repo" + addedRepo := "added-repo" + tag := "tag" + tag2 := "tag2" + newTag := "newTag" + + image := CreateRandomImage() + + err := WriteImageToFileSystem(image, notModifiedRepo, tag, storeController) + So(err, ShouldBeNil) + err = WriteImageToFileSystem(image, modifiedAddImageRepo, tag, storeController) + So(err, ShouldBeNil) + + err = WriteImageToFileSystem(image, modifiedRemoveImageRepo, tag, storeController) + So(err, ShouldBeNil) + err = WriteImageToFileSystem(image, modifiedRemoveImageRepo, tag2, storeController) + So(err, ShouldBeNil) + + err = WriteImageToFileSystem(image, deletedRepo, tag, storeController) + So(err, ShouldBeNil) + + err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck + So(err, ShouldBeNil) + + repoMetaList, err := metaDB.SearchRepos(ctx, "") + So(err, ShouldBeNil) + So(len(repoMetaList), ShouldEqual, 4) + + repoNames := tcommon.AccumulateField(repoMetaList, func(rm mTypes.RepoMeta) string { return rm.Name }) + So(repoNames, ShouldContain, notModifiedRepo) + So(repoNames, ShouldContain, modifiedAddImageRepo) + So(repoNames, ShouldContain, modifiedRemoveImageRepo) + So(repoNames, ShouldContain, deletedRepo) + + time.Sleep(time.Second) + + // Update the storage + + err = WriteImageToFileSystem(image, modifiedAddImageRepo, newTag, storeController) + So(err, ShouldBeNil) + + err = storeController.GetDefaultImageStore().DeleteImageManifest(modifiedRemoveImageRepo, tag2, false) + So(err, ShouldBeNil) + + err = os.RemoveAll(filepath.Join(rootDir, deletedRepo)) + So(err, ShouldBeNil) + + err = WriteImageToFileSystem(image, addedRepo, tag, storeController) + So(err, ShouldBeNil) + + // Parse again + err = meta.ParseStorage(metaDB, storeController, log) //nolint: contextcheck + So(err, ShouldBeNil) + + repoMetaList, err = metaDB.SearchRepos(ctx, "") + So(err, ShouldBeNil) + So(len(repoMetaList), ShouldEqual, 4) + + repoNames = tcommon.AccumulateField(repoMetaList, func(rm mTypes.RepoMeta) string { return rm.Name }) + So(repoNames, ShouldContain, notModifiedRepo) + So(repoNames, ShouldContain, modifiedAddImageRepo) + So(repoNames, ShouldContain, modifiedRemoveImageRepo) + So(repoNames, ShouldNotContain, deletedRepo) + So(repoNames, ShouldContain, addedRepo) + + repoMeta, err := metaDB.GetRepoMeta(ctx, modifiedAddImageRepo) + So(err, ShouldBeNil) + + So(repoMeta.Tags, ShouldContainKey, tag) + So(repoMeta.Tags, ShouldContainKey, newTag) + + repoMeta, err = metaDB.GetRepoMeta(ctx, modifiedRemoveImageRepo) + So(err, ShouldBeNil) + + So(repoMeta.Tags, ShouldContainKey, tag) + So(repoMeta.Tags, ShouldNotContainKey, tag2) + }) } func TestGetSignatureLayersInfo(t *testing.T) { diff --git a/pkg/meta/types/types.go b/pkg/meta/types/types.go index 0216b494..ea02cbdd 100644 --- a/pkg/meta/types/types.go +++ b/pkg/meta/types/types.go @@ -112,6 +112,9 @@ type MetaDB interface { //nolint:interfacebloat // SetRepoMeta returns RepoMetadata of a repo from the database SetRepoMeta(repo string, repoMeta RepoMeta) error + // DeleteRepoMeta + DeleteRepoMeta(repo string) error + // GetReferrersInfo returns a list of for all referrers of the given digest that match one of the // artifact types. GetReferrersInfo(repo string, referredDigest godigest.Digest, artifactTypes []string) ([]ReferrerInfo, error) @@ -136,6 +139,10 @@ type MetaDB interface { //nolint:interfacebloat // specific metadata such as star count, downloads other statistics ResetRepoReferences(repo string) error + GetRepoLastUpdated(repo string) time.Time + + GetAllRepoNames() ([]string, error) + // ResetDB will delete all data in the DB ResetDB() error diff --git a/pkg/meta/version/version_test.go b/pkg/meta/version/version_test.go index e56918d2..d131c34c 100644 --- a/pkg/meta/version/version_test.go +++ b/pkg/meta/version/version_test.go @@ -34,7 +34,7 @@ func TestVersioningBoltDB(t *testing.T) { log := log.NewLogger("debug", "") boltdbWrapper, err := boltdb.New(boltDriver, log) - defer os.Remove(path.Join(boltDBParams.RootDir, "repo.db")) + defer os.Remove(path.Join(boltDBParams.RootDir, "meta.db")) So(boltdbWrapper, ShouldNotBeNil) So(err, ShouldBeNil) @@ -200,7 +200,7 @@ func setDynamoDBVersion(client *dynamodb.Client, versTable, vers string) error { ":Version": mdAttributeValue, }, Key: map[string]types.AttributeValue{ - "Key": &types.AttributeValueMemberS{ + "TableKey": &types.AttributeValueMemberS{ Value: version.DBVersionKey, }, }, diff --git a/pkg/storage/gc/gc_test.go b/pkg/storage/gc/gc_test.go index dfa7aec0..0da142b9 100644 --- a/pkg/storage/gc/gc_test.go +++ b/pkg/storage/gc/gc_test.go @@ -51,8 +51,8 @@ var testCases = []struct { } func TestGarbageCollectAndRetention(t *testing.T) { - log := zlog.NewLogger("info", "") - audit := zlog.NewAuditLogger("debug", "") + log := zlog.NewLogger("info", "/dev/null") + audit := zlog.NewAuditLogger("debug", "/dev/null") metrics := monitoring.NewMetricsServer(false, log) diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index 57a845f7..7f5e75de 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -1505,6 +1505,25 @@ func (is *ImageStore) GetIndexContent(repo string) ([]byte, error) { return buf, nil } +func (is *ImageStore) StatIndex(repo string) (bool, int64, time.Time, error) { + repoIndexPath := path.Join(is.rootDir, repo, "index.json") + + fileInfo, err := is.storeDriver.Stat(repoIndexPath) + if err != nil { + if errors.As(err, &driver.PathNotFoundError{}) { + is.log.Error().Err(err).Str("indexFile", repoIndexPath).Msg("index.json doesn't exist") + + return false, 0, time.Time{}, zerr.ErrRepoNotFound + } + + is.log.Error().Err(err).Str("indexFile", repoIndexPath).Msg("failed to read index.json") + + return false, 0, time.Time{}, err + } + + return true, fileInfo.Size(), fileInfo.ModTime(), nil +} + func (is *ImageStore) PutIndexContent(repo string, index ispec.Index) error { dir := path.Join(is.rootDir, repo) diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 60452b45..0a4a71e1 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -2994,6 +2994,24 @@ func TestPullRange(t *testing.T) { }) } +func TestStatIndex(t *testing.T) { + Convey("NewImageStore", t, func() { + dir := t.TempDir() + log := zlog.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, nil) + + err := WriteImageToFileSystem(CreateRandomImage(), "repo", "tag", + storage.StoreController{DefaultStore: imgStore}) + So(err, ShouldBeNil) + + Convey("StatIndex PathNotFoundError", func() { + _, _, _, err := imgStore.StatIndex("not-found") + So(err, ShouldNotBeNil) + }) + }) +} + func TestStorageDriverErr(t *testing.T) { dir := t.TempDir() diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 03f76489..9338a1a5 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -54,6 +54,7 @@ type ImageStore interface { //nolint:interfacebloat CleanupRepo(repo string, blobs []godigest.Digest, removeRepo bool) (int, error) GetIndexContent(repo string) ([]byte, error) PutIndexContent(repo string, index ispec.Index) error + StatIndex(repo string) (bool, int64, time.Time, error) GetBlobContent(repo string, digest godigest.Digest) ([]byte, error) GetReferrers(repo string, digest godigest.Digest, artifactTypes []string) (ispec.Index, error) GetOrasReferrers(repo string, digest godigest.Digest, artifactType string) ([]artifactspec.Descriptor, error) diff --git a/pkg/test/common/utils.go b/pkg/test/common/utils.go index b33a0746..51e17b72 100644 --- a/pkg/test/common/utils.go +++ b/pkg/test/common/utils.go @@ -212,7 +212,7 @@ func GenerateRandomName() (string, int64) { return string(randomBytes), seed } -func AccumulateField[R any, T any](list []T, accFunc func(T) R) []R { +func AccumulateField[T any, R any](list []T, accFunc func(T) R) []R { result := make([]R, 0, len(list)) for i := range list { diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index 5835932f..b074e537 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -56,6 +56,15 @@ type MockedImageStore struct { CleanupRepoFn func(repo string, blobs []godigest.Digest, removeRepo bool) (int, error) PutIndexContentFn func(repo string, index ispec.Index) error PopulateStorageMetricsFn func(interval time.Duration, sch *scheduler.Scheduler) + StatIndexFn func(repo string) (bool, int64, time.Time, error) +} + +func (is MockedImageStore) StatIndex(repo string) (bool, int64, time.Time, error) { + if is.StatIndexFn != nil { + return is.StatIndexFn(repo) + } + + return true, 0, time.Time{}, nil } func (is MockedImageStore) Lock(t *time.Time) { diff --git a/pkg/test/mocks/repo_db_mock.go b/pkg/test/mocks/repo_db_mock.go index b9d43ff4..ce7d0c67 100644 --- a/pkg/test/mocks/repo_db_mock.go +++ b/pkg/test/mocks/repo_db_mock.go @@ -2,6 +2,7 @@ package mocks import ( "context" + "time" godigest "github.com/opencontainers/go-digest" @@ -9,6 +10,10 @@ import ( ) type MetaDBMock struct { + DeleteRepoMetaFn func(repo string) error + + GetRepoLastUpdatedFn func(repo string) time.Time + GetStarredReposFn func(ctx context.Context) ([]string, error) GetBookmarkedReposFn func(ctx context.Context) ([]string, error) @@ -98,9 +103,35 @@ type MetaDBMock struct { ResetRepoReferencesFn func(repo string) error + GetAllRepoNamesFn func() ([]string, error) + ResetDBFn func() error } +func (sdm MetaDBMock) DeleteRepoMeta(repo string) error { + if sdm.DeleteRepoMetaFn != nil { + return sdm.DeleteRepoMetaFn(repo) + } + + return nil +} + +func (sdm MetaDBMock) GetAllRepoNames() ([]string, error) { + if sdm.GetAllRepoNamesFn != nil { + return sdm.GetAllRepoNamesFn() + } + + return []string{}, nil +} + +func (sdm MetaDBMock) GetRepoLastUpdated(repo string) time.Time { + if sdm.GetRepoLastUpdatedFn != nil { + return sdm.GetRepoLastUpdatedFn(repo) + } + + return time.Time{} +} + func (sdm MetaDBMock) ResetDB() error { if sdm.ResetDBFn != nil { return sdm.ResetDBFn()