0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-13 22:50:38 -05:00
zot/pkg/storage/storage.go
Shivam Mishra 3a30290e08 Using "destRecord" as a path in DeleteBlob function instead of "dst".
dstRecord :- blob path stored in cache.
dst :- blob path that is trying to be uploaded.

Currently, if the actual blob on disk may have been removed by GC/delete, during syncing the cache dst is being passed to DeleteBlob function and retry section is being continuously called because DeleteBlob function never deletes dst path (doesn't exist in db), dstRecord should be passed into DeleteBlob function because dstRecord is actual blob path stored in db.

If dst and dstRecord path value is same then this issue will not be produced and DeleteBlob method will delete the blob info from cache but if both are different then DeleteBlob method will try to delete dst path which is not in cache.

Note:- boltdb delete method return nil even when value doesn't exist (https://godoc.org/github.com/boltdb/bolt#Bucket.Delete)
2020-08-12 10:06:20 -07:00

1064 lines
27 KiB
Go

package storage
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
"time"
"github.com/anuvu/zot/errors"
zlog "github.com/anuvu/zot/pkg/log"
apexlog "github.com/apex/log"
guuid "github.com/gofrs/uuid"
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/umoci"
"github.com/opencontainers/umoci/oci/casext"
"github.com/rs/zerolog"
)
const (
// BlobUploadDir defines the upload directory for blob uploads.
BlobUploadDir = ".uploads"
schemaVersion = 2
gcDelay = 1 * time.Hour
)
// BlobUpload models and upload request.
type BlobUpload struct {
StoreName string
ID string
}
// ImageStore provides the image storage operations.
type ImageStore struct {
rootDir string
lock *sync.RWMutex
blobUploads map[string]BlobUpload
cache *Cache
gc bool
dedupe bool
log zerolog.Logger
}
// NewImageStore returns a new image store backed by a file storage.
func NewImageStore(rootDir string, gc bool, dedupe bool, log zlog.Logger) *ImageStore {
if _, err := os.Stat(rootDir); os.IsNotExist(err) {
if err := os.MkdirAll(rootDir, 0700); err != nil {
log.Error().Err(err).Str("rootDir", rootDir).Msg("unable to create root dir")
return nil
}
}
is := &ImageStore{
rootDir: rootDir,
lock: &sync.RWMutex{},
blobUploads: make(map[string]BlobUpload),
gc: gc,
dedupe: dedupe,
log: log.With().Caller().Logger(),
}
if dedupe {
is.cache = NewCache(rootDir, "cache", log)
}
if gc {
// we use umoci GC to perform garbage-collection, but it uses its own logger
// - so capture those logs, could be useful
apexlog.SetLevel(apexlog.DebugLevel)
apexlog.SetHandler(apexlog.HandlerFunc(func(entry *apexlog.Entry) error {
e := log.Debug()
for k, v := range entry.Fields {
e = e.Interface(k, v)
}
e.Msg(entry.Message)
return nil
}))
}
return is
}
// RLock read-lock.
func (is *ImageStore) RLock() {
is.lock.RLock()
}
// RUnlock read-unlock.
func (is *ImageStore) RUnlock() {
is.lock.RUnlock()
}
// Lock write-lock.
func (is *ImageStore) Lock() {
is.lock.Lock()
}
// Unlock write-unlock.
func (is *ImageStore) Unlock() {
is.lock.Unlock()
}
// InitRepo creates an image repository under this store.
func (is *ImageStore) InitRepo(name string) error {
repoDir := path.Join(is.rootDir, name)
if fi, err := os.Stat(repoDir); err == nil && fi.IsDir() {
return nil
}
// create "blobs" subdir
ensureDir(path.Join(repoDir, "blobs"), is.log)
// create BlobUploadDir subdir
ensureDir(path.Join(repoDir, BlobUploadDir), is.log)
// "oci-layout" file - create if it doesn't exist
ilPath := path.Join(repoDir, ispec.ImageLayoutFile)
if _, err := os.Stat(ilPath); err != nil {
il := ispec.ImageLayout{Version: ispec.ImageLayoutVersion}
buf, err := json.Marshal(il)
if err != nil {
is.log.Panic().Err(err).Msg("unable to marshal JSON")
}
if err := ioutil.WriteFile(ilPath, buf, 0644); err != nil { //nolint: gosec
is.log.Error().Err(err).Str("file", ilPath).Msg("unable to write file")
return err
}
}
// "index.json" file - create if it doesn't exist
indexPath := path.Join(repoDir, "index.json")
if _, err := os.Stat(indexPath); err != nil {
index := ispec.Index{}
index.SchemaVersion = 2
buf, err := json.Marshal(index)
if err != nil {
is.log.Panic().Err(err).Msg("unable to marshal JSON")
}
if err := ioutil.WriteFile(indexPath, buf, 0644); err != nil { //nolint: gosec
is.log.Error().Err(err).Str("file", indexPath).Msg("unable to write file")
return err
}
}
return nil
}
// ValidateRepo validates that the repository layout is complaint with the OCI repo layout.
func (is *ImageStore) ValidateRepo(name string) (bool, error) {
// https://github.com/opencontainers/image-spec/blob/master/image-layout.md#content
// at least, expect at least 3 entries - ["blobs", "oci-layout", "index.json"]
// and an additional/optional BlobUploadDir in each image store
dir := path.Join(is.rootDir, name)
if !dirExists(dir) {
return false, errors.ErrRepoNotFound
}
files, err := ioutil.ReadDir(dir)
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("unable to read directory")
return false, errors.ErrRepoNotFound
}
// nolint:gomnd
if len(files) < 3 {
return false, errors.ErrRepoBadVersion
}
found := map[string]bool{
"blobs": false,
ispec.ImageLayoutFile: false,
"index.json": false,
}
for _, file := range files {
if file.Name() == "blobs" && !file.IsDir() {
return false, nil
}
found[file.Name()] = true
}
for k, v := range found {
if !v && k != BlobUploadDir {
return false, nil
}
}
buf, err := ioutil.ReadFile(path.Join(dir, ispec.ImageLayoutFile))
if err != nil {
return false, err
}
var il ispec.ImageLayout
if err := json.Unmarshal(buf, &il); err != nil {
return false, err
}
if il.Version != ispec.ImageLayoutVersion {
return false, errors.ErrRepoBadVersion
}
return true, nil
}
// GetRepositories returns a list of all the repositories under this store.
func (is *ImageStore) GetRepositories() ([]string, error) {
dir := is.rootDir
_, err := ioutil.ReadDir(dir)
if err != nil {
is.log.Error().Err(err).Msg("failure walking storage root-dir")
return nil, err
}
stores := make([]string, 0)
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
return nil
}
rel, err := filepath.Rel(is.rootDir, path)
if err != nil {
return nil
}
if ok, err := is.ValidateRepo(rel); !ok || err != nil {
return nil
}
//is.log.Debug().Str("dir", path).Str("name", info.Name()).Msg("found image store")
stores = append(stores, rel)
return nil
})
return stores, err
}
// GetImageTags returns a list of image tags available in the specified repository.
func (is *ImageStore) GetImageTags(repo string) ([]string, error) {
dir := path.Join(is.rootDir, repo)
if !dirExists(dir) {
return nil, errors.ErrRepoNotFound
}
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
return nil, errors.ErrRepoNotFound
}
var index ispec.Index
if err := json.Unmarshal(buf, &index); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return nil, errors.ErrRepoNotFound
}
tags := make([]string, 0)
for _, manifest := range index.Manifests {
v, ok := manifest.Annotations[ispec.AnnotationRefName]
if ok {
tags = append(tags, v)
}
}
return tags, nil
}
// GetImageManifest returns the image manifest of an image in the specific repository.
func (is *ImageStore) GetImageManifest(repo string, reference string) ([]byte, string, string, error) {
dir := path.Join(is.rootDir, repo)
if !dirExists(dir) {
return nil, "", "", errors.ErrRepoNotFound
}
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
if os.IsNotExist(err) {
return nil, "", "", errors.ErrRepoNotFound
}
return nil, "", "", err
}
var index ispec.Index
if err := json.Unmarshal(buf, &index); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return nil, "", "", err
}
found := false
var digest godigest.Digest
mediaType := ""
for _, m := range index.Manifests {
if reference == m.Digest.String() {
digest = m.Digest
mediaType = m.MediaType
found = true
break
}
v, ok := m.Annotations[ispec.AnnotationRefName]
if ok && v == reference {
digest = m.Digest
mediaType = m.MediaType
found = true
break
}
}
if !found {
return nil, "", "", errors.ErrManifestNotFound
}
p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded())
buf, err = ioutil.ReadFile(p)
if err != nil {
is.log.Error().Err(err).Str("blob", p).Msg("failed to read manifest")
if os.IsNotExist(err) {
return nil, "", "", errors.ErrManifestNotFound
}
return nil, "", "", err
}
var manifest ispec.Manifest
if err := json.Unmarshal(buf, &manifest); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return nil, "", "", err
}
return buf, digest.String(), mediaType, nil
}
// PutImageManifest adds an image manifest to the repository.
func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType string,
body []byte) (string, error) {
if err := is.InitRepo(repo); err != nil {
is.log.Debug().Err(err).Msg("init repo")
return "", err
}
if mediaType != ispec.MediaTypeImageManifest {
is.log.Debug().Interface("actual", mediaType).
Interface("expected", ispec.MediaTypeImageManifest).Msg("bad manifest media type")
return "", errors.ErrBadManifest
}
if len(body) == 0 {
is.log.Debug().Int("len", len(body)).Msg("invalid body length")
return "", errors.ErrBadManifest
}
var m ispec.Manifest
if err := json.Unmarshal(body, &m); err != nil {
is.log.Error().Err(err).Msg("unable to unmarshal JSON")
return "", errors.ErrBadManifest
}
if m.SchemaVersion != schemaVersion {
is.log.Error().Int("SchemaVersion", m.SchemaVersion).Msg("invalid manifest")
return "", errors.ErrBadManifest
}
for _, l := range m.Layers {
digest := l.Digest
blobPath := is.BlobPath(repo, digest)
is.log.Info().Str("blobPath", blobPath).Str("reference", reference).Msg("manifest layers")
if _, err := os.Stat(blobPath); err != nil {
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to find blob")
return digest.String(), errors.ErrBlobNotFound
}
}
mDigest := godigest.FromBytes(body)
refIsDigest := false
d, err := godigest.Parse(reference)
if err == nil {
if d.String() != mDigest.String() {
is.log.Error().Str("actual", mDigest.String()).Str("expected", d.String()).
Msg("manifest digest is not valid")
return "", errors.ErrBadManifest
}
refIsDigest = true
}
dir := path.Join(is.rootDir, repo)
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
return "", err
}
var index ispec.Index
if err := json.Unmarshal(buf, &index); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return "", errors.ErrRepoBadVersion
}
updateIndex := true
// create a new descriptor
desc := ispec.Descriptor{MediaType: mediaType, Size: int64(len(body)), Digest: mDigest,
Platform: &ispec.Platform{Architecture: "amd64", OS: "linux"}}
if !refIsDigest {
desc.Annotations = map[string]string{ispec.AnnotationRefName: reference}
}
for i, m := range index.Manifests {
if reference == m.Digest.String() {
// nothing changed, so don't update
desc = m
updateIndex = false
break
}
v, ok := m.Annotations[ispec.AnnotationRefName]
if ok && v == reference {
if m.Digest.String() == mDigest.String() {
// nothing changed, so don't update
desc = m
updateIndex = false
break
}
// manifest contents have changed for the same tag,
// so update index.json descriptor
is.log.Info().
Int64("old size", desc.Size).
Int64("new size", int64(len(body))).
Str("old digest", desc.Digest.String()).
Str("new digest", mDigest.String()).
Msg("updating existing tag with new manifest contents")
desc = m
desc.Size = int64(len(body))
desc.Digest = mDigest
index.Manifests = append(index.Manifests[:i], index.Manifests[i+1:]...)
break
}
}
if !updateIndex {
return desc.Digest.String(), nil
}
// write manifest to "blobs"
dir = path.Join(is.rootDir, repo, "blobs", mDigest.Algorithm().String())
ensureDir(dir, is.log)
file := path.Join(dir, mDigest.Encoded())
if err := ioutil.WriteFile(file, body, 0600); err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to write")
return "", err
}
// now update "index.json"
index.Manifests = append(index.Manifests, desc)
dir = path.Join(is.rootDir, repo)
file = path.Join(dir, "index.json")
buf, err = json.Marshal(index)
if err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to marshal JSON")
return "", err
}
if err := ioutil.WriteFile(file, buf, 0644); err != nil { //nolint: gosec
is.log.Error().Err(err).Str("file", file).Msg("unable to write")
return "", err
}
if is.gc {
oci, err := umoci.OpenLayout(dir)
if err != nil {
return "", err
}
defer oci.Close()
if err := oci.GC(context.Background(), ifOlderThan(is, repo, gcDelay)); err != nil {
return "", err
}
}
return desc.Digest.String(), nil
}
// DeleteImageManifest deletes the image manifest from the repository.
func (is *ImageStore) DeleteImageManifest(repo string, reference string) error {
dir := path.Join(is.rootDir, repo)
if !dirExists(dir) {
return errors.ErrRepoNotFound
}
// as per spec "reference" can only be a digest and not a tag
digest, err := godigest.Parse(reference)
if err != nil {
is.log.Error().Err(err).Msg("invalid reference")
return errors.ErrBadManifest
}
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
return err
}
var index ispec.Index
if err := json.Unmarshal(buf, &index); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return err
}
found := false
var m ispec.Descriptor
// we are deleting, so keep only those manifests that don't match
outIndex := index
outIndex.Manifests = []ispec.Descriptor{}
for _, m = range index.Manifests {
if reference == m.Digest.String() {
found = true
continue
}
outIndex.Manifests = append(outIndex.Manifests, m)
}
if !found {
return errors.ErrManifestNotFound
}
// now update "index.json"
dir = path.Join(is.rootDir, repo)
file := path.Join(dir, "index.json")
buf, err = json.Marshal(outIndex)
if err != nil {
return err
}
if err := ioutil.WriteFile(file, buf, 0644); err != nil { //nolint: gosec
return err
}
if is.gc {
oci, err := umoci.OpenLayout(dir)
if err != nil {
return err
}
defer oci.Close()
if err := oci.GC(context.Background(), ifOlderThan(is, repo, gcDelay)); err != nil {
return err
}
}
p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded())
_ = os.Remove(p)
return nil
}
// BlobUploadPath returns the upload path for a blob in this store.
func (is *ImageStore) BlobUploadPath(repo string, uuid string) string {
dir := path.Join(is.rootDir, repo)
blobUploadPath := path.Join(dir, BlobUploadDir, uuid)
return blobUploadPath
}
// NewBlobUpload returns the unique ID for an upload in progress.
func (is *ImageStore) NewBlobUpload(repo string) (string, error) {
if err := is.InitRepo(repo); err != nil {
return "", err
}
uuid, err := guuid.NewV4()
if err != nil {
return "", err
}
u := uuid.String()
blobUploadPath := is.BlobUploadPath(repo, u)
file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0600)
if err != nil {
return "", errors.ErrRepoNotFound
}
defer file.Close()
return u, nil
}
// GetBlobUpload returns the current size of a blob upload.
func (is *ImageStore) GetBlobUpload(repo string, uuid string) (int64, error) {
blobUploadPath := is.BlobUploadPath(repo, uuid)
fi, err := os.Stat(blobUploadPath)
if err != nil {
if os.IsNotExist(err) {
return -1, errors.ErrUploadNotFound
}
return -1, err
}
return fi.Size(), nil
}
// PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns
// the number of actual bytes to the blob.
func (is *ImageStore) PutBlobChunkStreamed(repo string, uuid string, body io.Reader) (int64, error) {
if err := is.InitRepo(repo); err != nil {
return -1, err
}
blobUploadPath := is.BlobUploadPath(repo, uuid)
_, err := os.Stat(blobUploadPath)
if err != nil {
return -1, errors.ErrUploadNotFound
}
file, err := os.OpenFile(
blobUploadPath,
os.O_WRONLY|os.O_CREATE,
0600,
)
if err != nil {
is.log.Fatal().Err(err).Msg("failed to open file")
}
defer file.Close()
if _, err := file.Seek(0, io.SeekEnd); err != nil {
is.log.Fatal().Err(err).Msg("failed to seek file")
}
n, err := io.Copy(file, body)
return n, err
}
// PutBlobChunk writes another chunk of data to the specified blob. It returns
// the number of actual bytes to the blob.
func (is *ImageStore) PutBlobChunk(repo string, uuid string, from int64, to int64,
body io.Reader) (int64, error) {
if err := is.InitRepo(repo); err != nil {
return -1, err
}
blobUploadPath := is.BlobUploadPath(repo, uuid)
fi, err := os.Stat(blobUploadPath)
if err != nil {
return -1, errors.ErrUploadNotFound
}
if from != fi.Size() {
is.log.Error().Int64("expected", from).Int64("actual", fi.Size()).
Msg("invalid range start for blob upload")
return -1, errors.ErrBadUploadRange
}
file, err := os.OpenFile(
blobUploadPath,
os.O_WRONLY|os.O_CREATE,
0600,
)
if err != nil {
is.log.Fatal().Err(err).Msg("failed to open file")
}
defer file.Close()
if _, err := file.Seek(from, io.SeekStart); err != nil {
is.log.Fatal().Err(err).Msg("failed to seek file")
}
n, err := io.Copy(file, body)
return n, err
}
// BlobUploadInfo returns the current blob size in bytes.
func (is *ImageStore) BlobUploadInfo(repo string, uuid string) (int64, error) {
blobUploadPath := is.BlobUploadPath(repo, uuid)
fi, err := os.Stat(blobUploadPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob")
return -1, err
}
size := fi.Size()
return size, nil
}
// FinishBlobUpload finalizes the blob upload and moves blob the repository.
func (is *ImageStore) FinishBlobUpload(repo string, uuid string, body io.Reader, digest string) error {
dstDigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return errors.ErrBadBlobDigest
}
src := is.BlobUploadPath(repo, uuid)
_, err = os.Stat(src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to stat blob")
return errors.ErrUploadNotFound
}
f, err := os.Open(src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")
return errors.ErrUploadNotFound
}
srcDigest, err := godigest.FromReader(f)
f.Close()
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")
return errors.ErrBadBlobDigest
}
if srcDigest != dstDigest {
is.log.Error().Str("srcDigest", srcDigest.String()).
Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest")
return errors.ErrBadBlobDigest
}
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
ensureDir(dir, is.log)
dst := is.BlobPath(repo, dstDigest)
if is.dedupe && is.cache != nil {
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")
return err
}
} else {
if err := os.Rename(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to finish blob")
return err
}
}
return nil
}
// FullBlobUpload handles a full blob upload, and no partial session is created.
func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, digest string) (string, int64, error) {
if err := is.InitRepo(repo); err != nil {
return "", -1, err
}
dstDigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return "", -1, errors.ErrBadBlobDigest
}
u, err := guuid.NewV4()
if err != nil {
return "", -1, err
}
uuid := u.String()
src := is.BlobUploadPath(repo, uuid)
f, err := os.Create(src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")
return "", -1, errors.ErrUploadNotFound
}
defer f.Close()
digester := sha256.New()
mw := io.MultiWriter(f, digester)
n, err := io.Copy(mw, body)
if err != nil {
return "", -1, err
}
srcDigest := godigest.NewDigestFromEncoded(godigest.SHA256, fmt.Sprintf("%x", digester.Sum(nil)))
if srcDigest != dstDigest {
is.log.Error().Str("srcDigest", srcDigest.String()).
Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest")
return "", -1, errors.ErrBadBlobDigest
}
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
ensureDir(dir, is.log)
dst := is.BlobPath(repo, dstDigest)
if is.dedupe && is.cache != nil {
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")
return "", -1, err
}
} else {
if err := os.Rename(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to finish blob")
return "", -1, err
}
}
return uuid, n, nil
}
// nolint:interfacer
func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error {
retry:
is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: ENTER")
dstRecord, err := is.cache.GetBlob(dstDigest.String())
// nolint:goerr113
if err != nil && err != errors.ErrCacheMiss {
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to lookup blob record")
return err
}
if dstRecord == "" {
if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil {
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record")
return err
}
// move the blob from uploads to final dest
if err := os.Rename(src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob")
return err
}
is.log.Debug().Str("src", src).Str("dst", dst).Msg("dedupe: rename")
} else {
dstRecord = path.Join(is.rootDir, dstRecord)
dstRecordFi, err := os.Stat(dstRecord)
if err != nil {
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat")
// the actual blob on disk may have been removed by GC, so sync the cache
if err := is.cache.DeleteBlob(dstDigest.String(), dstRecord); err != nil {
// nolint:lll
is.log.Error().Err(err).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: unable to delete blob record")
return err
}
goto retry
}
dstFi, err := os.Stat(dst)
if err != nil && !os.IsNotExist(err) {
is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat")
return err
}
if !os.SameFile(dstFi, dstRecordFi) {
if err := os.Link(dstRecord, dst); err != nil {
is.log.Error().Err(err).Str("blobPath", dst).Str("link", dstRecord).Msg("dedupe: unable to hard link")
return err
}
}
if err := os.Remove(src); err != nil {
is.log.Error().Err(err).Str("src", src).Msg("dedupe: uname to remove blob")
return err
}
is.log.Debug().Str("src", src).Msg("dedupe: remove")
}
return nil
}
// DeleteBlobUpload deletes an existing blob upload that is currently in progress.
func (is *ImageStore) DeleteBlobUpload(repo string, uuid string) error {
blobUploadPath := is.BlobUploadPath(repo, uuid)
if err := os.Remove(blobUploadPath); err != nil {
is.log.Error().Err(err).Str("blobUploadPath", blobUploadPath).Msg("error deleting blob upload")
return err
}
return nil
}
// BlobPath returns the repository path of a blob.
func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string {
return path.Join(is.rootDir, repo, "blobs", digest.Algorithm().String(), digest.Encoded())
}
// CheckBlob verifies a blob and returns true if the blob is correct.
func (is *ImageStore) CheckBlob(repo string, digest string,
mediaType string) (bool, int64, error) {
d, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return false, -1, errors.ErrBadBlobDigest
}
blobPath := is.BlobPath(repo, d)
blobInfo, err := os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return false, -1, errors.ErrBlobNotFound
}
return true, blobInfo.Size(), nil
}
// GetBlob returns a stream to read the blob.
// FIXME: we should probably parse the manifest and use (digest, mediaType) as a
// blob selector instead of directly downloading the blob.
func (is *ImageStore) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) {
d, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return nil, -1, errors.ErrBadBlobDigest
}
blobPath := is.BlobPath(repo, d)
blobInfo, err := os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return nil, -1, errors.ErrBlobNotFound
}
blobReader, err := os.Open(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
return nil, -1, err
}
return blobReader, blobInfo.Size(), nil
}
// DeleteBlob removes the blob from the repository.
func (is *ImageStore) DeleteBlob(repo string, digest string) error {
d, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return errors.ErrBlobNotFound
}
blobPath := is.BlobPath(repo, d)
_, err = os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return errors.ErrBlobNotFound
}
if is.cache != nil {
if err := is.cache.DeleteBlob(digest, blobPath); err != nil {
is.log.Error().Err(err).Str("digest", digest).Str("blobPath", blobPath).Msg("unable to remove blob path from cache")
return err
}
}
if err := os.Remove(blobPath); err != nil {
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path")
return err
}
return nil
}
// garbage collection
// Scrub will clean up all unreferenced blobs.
// TODO.
func Scrub(dir string, fix bool) error {
return nil
}
// utility routines
func dirExists(d string) bool {
fi, err := os.Stat(d)
if err != nil && os.IsNotExist(err) {
return false
}
if !fi.IsDir() {
return false
}
return true
}
func ensureDir(dir string, log zerolog.Logger) {
if err := os.MkdirAll(dir, 0755); err != nil {
log.Panic().Err(err).Str("dir", dir).Msg("unable to create dir")
}
}
func ifOlderThan(is *ImageStore, repo string, delay time.Duration) casext.GCPolicy {
return func(ctx context.Context, digest godigest.Digest) (bool, error) {
blobPath := is.BlobPath(repo, digest)
fi, err := os.Stat(blobPath)
if err != nil {
return false, err
}
if fi.ModTime().Add(delay).After(time.Now()) {
return false, nil
}
is.log.Info().Str("digest", digest.String()).Str("blobPath", blobPath).Msg("perform GC on blob")
return true, nil
}
}