0
Fork 0
mirror of https://github.com/project-zot/zot.git synced 2025-01-20 22:52:51 -05:00
zot/pkg/storage/storage.go
Ramkumar Chinchani 2fd87b6a86 pkg/api: use a rwlock when accessing storage
The original patch used a mutex, however, the workload patterns are
likely to be read-heavy, so use a rwlock instead.
2020-03-20 10:58:21 -07:00

894 lines
21 KiB
Go

package storage
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
"github.com/anuvu/zot/errors"
zlog "github.com/anuvu/zot/pkg/log"
guuid "github.com/gofrs/uuid"
"github.com/openSUSE/umoci"
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/rs/zerolog"
)
const (
// BlobUploadDir defines the upload directory for blob uploads.
BlobUploadDir = ".uploads"
)
// BlobUpload models and upload request.
type BlobUpload struct {
StoreName string
ID string
}
// ImageStore provides the image storage operations.
type ImageStore struct {
rootDir string
lock *sync.RWMutex
blobUploads map[string]BlobUpload
log zerolog.Logger
}
// NewImageStore returns a new image store backed by a file storage.
func NewImageStore(rootDir string, log zlog.Logger) *ImageStore {
is := &ImageStore{rootDir: rootDir,
lock: &sync.RWMutex{},
blobUploads: make(map[string]BlobUpload),
log: log.With().Caller().Logger(),
}
if _, err := os.Stat(rootDir); os.IsNotExist(err) {
_ = os.MkdirAll(rootDir, 0700)
}
return is
}
// InitRepo creates an image repository under this store.
func (is *ImageStore) InitRepo(name string) error {
repoDir := path.Join(is.rootDir, name)
if fi, err := os.Stat(repoDir); err == nil && fi.IsDir() {
return nil
}
// create repo dir
ensureDir(repoDir)
// create "blobs" subdir
dir := path.Join(repoDir, "blobs")
ensureDir(dir)
// create BlobUploadDir subdir
dir = path.Join(repoDir, BlobUploadDir)
ensureDir(dir)
// "oci-layout" file - create if it doesn't exist
ilPath := path.Join(repoDir, ispec.ImageLayoutFile)
if _, err := os.Stat(ilPath); err != nil {
il := ispec.ImageLayout{Version: ispec.ImageLayoutVersion}
buf, err := json.Marshal(il)
if err != nil {
is.log.Panic().Err(err).Msg("unable to marshal JSON")
}
if err := ioutil.WriteFile(ilPath, buf, 0644); err != nil {
is.log.Panic().Err(err).Str("file", ilPath).Msg("unable to write file")
}
}
// "index.json" file - create if it doesn't exist
indexPath := path.Join(repoDir, "index.json")
if _, err := os.Stat(indexPath); err != nil {
index := ispec.Index{}
index.SchemaVersion = 2
buf, err := json.Marshal(index)
if err != nil {
is.log.Panic().Err(err).Msg("unable to marshal JSON")
}
if err := ioutil.WriteFile(indexPath, buf, 0644); err != nil {
is.log.Panic().Err(err).Str("file", indexPath).Msg("unable to write file")
}
}
return nil
}
// ValidateRepo validates that the repository layout is complaint with the OCI repo layout.
func (is *ImageStore) ValidateRepo(name string) (bool, error) {
// https://github.com/opencontainers/image-spec/blob/master/image-layout.md#content
// at least, expect exactly 4 entries - ["blobs", "oci-layout", "index.json"] and BlobUploadDir
// in each image store
dir := path.Join(is.rootDir, name)
if !dirExists(dir) {
return false, errors.ErrRepoNotFound
}
files, err := ioutil.ReadDir(dir)
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("unable to read directory")
return false, errors.ErrRepoNotFound
}
if len(files) != 4 {
return false, nil
}
found := map[string]bool{
"blobs": false,
ispec.ImageLayoutFile: false,
"index.json": false,
BlobUploadDir: false,
}
for _, file := range files {
if file.Name() == "blobs" && !file.IsDir() {
return false, nil
}
found[file.Name()] = true
}
for k, v := range found {
if !v && k != BlobUploadDir {
return false, nil
}
}
buf, err := ioutil.ReadFile(path.Join(dir, ispec.ImageLayoutFile))
if err != nil {
return false, err
}
var il ispec.ImageLayout
if err := json.Unmarshal(buf, &il); err != nil {
return false, err
}
if il.Version != ispec.ImageLayoutVersion {
return false, errors.ErrRepoBadVersion
}
return true, nil
}
// GetRepositories returns a list of all the repositories under this store.
func (is *ImageStore) GetRepositories() ([]string, error) {
dir := is.rootDir
_, err := ioutil.ReadDir(dir)
if err != nil {
is.log.Error().Err(err).Msg("failure walking storage root-dir")
return nil, err
}
stores := make([]string, 0)
err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
return nil
}
rel, err := filepath.Rel(is.rootDir, path)
if err != nil {
return nil
}
if ok, err := is.ValidateRepo(rel); !ok || err != nil {
return nil
}
is.log.Debug().Str("dir", path).Str("name", info.Name()).Msg("found image store")
stores = append(stores, rel)
return nil
})
return stores, err
}
// GetImageTags returns a list of image tags available in the specified repository.
func (is *ImageStore) GetImageTags(repo string) ([]string, error) {
dir := path.Join(is.rootDir, repo)
if !dirExists(dir) {
return nil, errors.ErrRepoNotFound
}
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
return nil, errors.ErrRepoNotFound
}
var index ispec.Index
if err := json.Unmarshal(buf, &index); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return nil, errors.ErrRepoNotFound
}
tags := make([]string, 0)
for _, manifest := range index.Manifests {
v, ok := manifest.Annotations[ispec.AnnotationRefName]
if ok {
tags = append(tags, v)
}
}
return tags, nil
}
// GetImageManifest returns the image manifest of an image in the specific repository.
func (is *ImageStore) GetImageManifest(repo string, reference string) ([]byte, string, string, error) {
dir := path.Join(is.rootDir, repo)
if !dirExists(dir) {
return nil, "", "", errors.ErrRepoNotFound
}
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
if os.IsNotExist(err) {
return nil, "", "", errors.ErrRepoNotFound
}
return nil, "", "", err
}
var index ispec.Index
if err := json.Unmarshal(buf, &index); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return nil, "", "", err
}
found := false
var digest godigest.Digest
mediaType := ""
for _, m := range index.Manifests {
if reference == m.Digest.String() {
digest = m.Digest
mediaType = m.MediaType
found = true
break
}
v, ok := m.Annotations[ispec.AnnotationRefName]
if ok && v == reference {
digest = m.Digest
mediaType = m.MediaType
found = true
break
}
}
if !found {
return nil, "", "", errors.ErrManifestNotFound
}
p := path.Join(dir, "blobs")
p = path.Join(p, digest.Algorithm().String())
p = path.Join(p, digest.Encoded())
buf, err = ioutil.ReadFile(p)
if err != nil {
is.log.Error().Err(err).Str("blob", p).Msg("failed to read manifest")
if os.IsNotExist(err) {
return nil, "", "", errors.ErrManifestNotFound
}
return nil, "", "", err
}
var manifest ispec.Manifest
if err := json.Unmarshal(buf, &manifest); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return nil, "", "", err
}
return buf, digest.String(), mediaType, nil
}
// PutImageManifest adds an image manifest to the repository.
func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType string,
body []byte) (string, error) {
if err := is.InitRepo(repo); err != nil {
return "", err
}
if mediaType != ispec.MediaTypeImageManifest {
return "", errors.ErrBadManifest
}
if len(body) == 0 {
return "", errors.ErrBadManifest
}
var m ispec.Manifest
if err := json.Unmarshal(body, &m); err != nil {
return "", errors.ErrBadManifest
}
if m.SchemaVersion != 2 {
is.log.Error().Int("SchemaVersion", m.SchemaVersion).Msg("invalid manifest")
return "", errors.ErrBadManifest
}
for _, l := range m.Layers {
digest := l.Digest
blobPath := is.BlobPath(repo, digest)
if _, err := os.Stat(blobPath); err != nil {
return digest.String(), errors.ErrBlobNotFound
}
}
mDigest := godigest.FromBytes(body)
refIsDigest := false
d, err := godigest.Parse(reference)
if err == nil {
if d.String() != mDigest.String() {
is.log.Error().Str("actual", mDigest.String()).Str("expected", d.String()).
Msg("manifest digest is not valid")
return "", errors.ErrBadManifest
}
refIsDigest = true
}
dir := path.Join(is.rootDir, repo)
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
return "", err
}
var index ispec.Index
if err := json.Unmarshal(buf, &index); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return "", errors.ErrRepoBadVersion
}
updateIndex := true
// create a new descriptor
desc := ispec.Descriptor{MediaType: mediaType, Size: int64(len(body)), Digest: mDigest,
Platform: &ispec.Platform{Architecture: "amd64", OS: "linux"}}
if !refIsDigest {
desc.Annotations = map[string]string{ispec.AnnotationRefName: reference}
}
for i, m := range index.Manifests {
if reference == m.Digest.String() {
// nothing changed, so don't update
desc = m
updateIndex = false
break
}
v, ok := m.Annotations[ispec.AnnotationRefName]
if ok && v == reference {
if m.Digest.String() == mDigest.String() {
// nothing changed, so don't update
desc = m
updateIndex = false
break
}
// manifest contents have changed for the same tag
desc = m
desc.Digest = mDigest
index.Manifests = append(index.Manifests[:i], index.Manifests[i+1:]...)
break
}
}
if !updateIndex {
return desc.Digest.String(), nil
}
// write manifest to "blobs"
dir = path.Join(is.rootDir, repo)
dir = path.Join(dir, "blobs")
dir = path.Join(dir, mDigest.Algorithm().String())
_ = os.MkdirAll(dir, 0755)
file := path.Join(dir, mDigest.Encoded())
if err := ioutil.WriteFile(file, body, 0644); err != nil {
return "", err
}
// now update "index.json"
index.Manifests = append(index.Manifests, desc)
dir = path.Join(is.rootDir, repo)
file = path.Join(dir, "index.json")
buf, err = json.Marshal(index)
if err != nil {
return "", err
}
if err := ioutil.WriteFile(file, buf, 0644); err != nil {
return "", err
}
oci, err := umoci.OpenLayout(dir)
if err != nil {
return "", err
}
defer oci.Close()
if err := oci.GC(context.Background()); err != nil {
return "", err
}
return desc.Digest.String(), nil
}
// DeleteImageManifest deletes the image manifest from the repository.
func (is *ImageStore) DeleteImageManifest(repo string, reference string) error {
dir := path.Join(is.rootDir, repo)
if !dirExists(dir) {
return errors.ErrRepoNotFound
}
_, err := godigest.Parse(reference)
if err != nil {
is.log.Error().Err(err).Msg("invalid reference")
return errors.ErrManifestNotFound
}
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
return err
}
var index ispec.Index
if err := json.Unmarshal(buf, &index); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
return err
}
found := false
var digest godigest.Digest
var i int
var m ispec.Descriptor
for i, m = range index.Manifests {
if reference == m.Digest.String() {
digest = m.Digest
found = true
break
}
}
if !found {
return errors.ErrManifestNotFound
}
// remove the manifest entry, not preserving order
index.Manifests[i] = index.Manifests[len(index.Manifests)-1]
index.Manifests = index.Manifests[:len(index.Manifests)-1]
// now update "index.json"
dir = path.Join(is.rootDir, repo)
file := path.Join(dir, "index.json")
buf, err = json.Marshal(index)
if err != nil {
return err
}
if err := ioutil.WriteFile(file, buf, 0644); err != nil {
return err
}
oci, err := umoci.OpenLayout(dir)
if err != nil {
return err
}
defer oci.Close()
if err := oci.GC(context.Background()); err != nil {
return err
}
p := path.Join(dir, "blobs")
p = path.Join(p, digest.Algorithm().String())
p = path.Join(p, digest.Encoded())
_ = os.Remove(p)
return nil
}
// BlobUploadPath returns the upload path for a blob in this store.
func (is *ImageStore) BlobUploadPath(repo string, uuid string) string {
dir := path.Join(is.rootDir, repo)
blobUploadPath := path.Join(dir, BlobUploadDir)
blobUploadPath = path.Join(blobUploadPath, uuid)
return blobUploadPath
}
// NewBlobUpload returns the unique ID for an upload in progress.
func (is *ImageStore) NewBlobUpload(repo string) (string, error) {
if err := is.InitRepo(repo); err != nil {
return "", err
}
uuid, err := guuid.NewV4()
if err != nil {
return "", err
}
u := uuid.String()
blobUploadPath := is.BlobUploadPath(repo, u)
file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0600)
if err != nil {
return "", errors.ErrRepoNotFound
}
defer file.Close()
return u, nil
}
// GetBlobUpload returns the current size of a blob upload.
func (is *ImageStore) GetBlobUpload(repo string, uuid string) (int64, error) {
blobUploadPath := is.BlobUploadPath(repo, uuid)
fi, err := os.Stat(blobUploadPath)
if err != nil {
if os.IsNotExist(err) {
return -1, errors.ErrUploadNotFound
}
return -1, err
}
return fi.Size(), nil
}
// PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns
// the number of actual bytes to the blob.
func (is *ImageStore) PutBlobChunkStreamed(repo string, uuid string, body io.Reader) (int64, error) {
if err := is.InitRepo(repo); err != nil {
return -1, err
}
blobUploadPath := is.BlobUploadPath(repo, uuid)
_, err := os.Stat(blobUploadPath)
if err != nil {
return -1, errors.ErrUploadNotFound
}
file, err := os.OpenFile(
blobUploadPath,
os.O_WRONLY|os.O_CREATE,
0600,
)
if err != nil {
is.log.Fatal().Err(err).Msg("failed to open file")
}
defer file.Close()
if _, err := file.Seek(0, io.SeekEnd); err != nil {
is.log.Fatal().Err(err).Msg("failed to seek file")
}
n, err := io.Copy(file, body)
return n, err
}
// PutBlobChunk writes another chunk of data to the specified blob. It returns
// the number of actual bytes to the blob.
func (is *ImageStore) PutBlobChunk(repo string, uuid string, from int64, to int64,
body io.Reader) (int64, error) {
if err := is.InitRepo(repo); err != nil {
return -1, err
}
blobUploadPath := is.BlobUploadPath(repo, uuid)
fi, err := os.Stat(blobUploadPath)
if err != nil {
return -1, errors.ErrUploadNotFound
}
if from != fi.Size() {
is.log.Error().Int64("expected", from).Int64("actual", fi.Size()).
Msg("invalid range start for blob upload")
return -1, errors.ErrBadUploadRange
}
file, err := os.OpenFile(
blobUploadPath,
os.O_WRONLY|os.O_CREATE,
0600,
)
if err != nil {
is.log.Fatal().Err(err).Msg("failed to open file")
}
defer file.Close()
if _, err := file.Seek(from, io.SeekStart); err != nil {
is.log.Fatal().Err(err).Msg("failed to seek file")
}
n, err := io.Copy(file, body)
return n, err
}
// BlobUploadInfo returns the current blob size in bytes.
func (is *ImageStore) BlobUploadInfo(repo string, uuid string) (int64, error) {
blobUploadPath := is.BlobUploadPath(repo, uuid)
fi, err := os.Stat(blobUploadPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob")
return -1, err
}
size := fi.Size()
return size, nil
}
// FinishBlobUpload finalizes the blob upload and moves blob the repository.
func (is *ImageStore) FinishBlobUpload(repo string, uuid string, body io.Reader, digest string) error {
dstDigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return errors.ErrBadBlobDigest
}
src := is.BlobUploadPath(repo, uuid)
_, err = os.Stat(src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to stat blob")
return errors.ErrUploadNotFound
}
f, err := os.Open(src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")
return errors.ErrUploadNotFound
}
srcDigest, err := godigest.FromReader(f)
f.Close()
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")
return errors.ErrBadBlobDigest
}
if srcDigest != dstDigest {
is.log.Error().Str("srcDigest", srcDigest.String()).
Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest")
return errors.ErrBadBlobDigest
}
dir := path.Join(is.rootDir, repo)
dir = path.Join(dir, "blobs")
dir = path.Join(dir, dstDigest.Algorithm().String())
_ = os.MkdirAll(dir, 0755)
dst := is.BlobPath(repo, dstDigest)
// move the blob from uploads to final dest
_ = os.Rename(src, dst)
return err
}
// FullBlobUpload handles a full blob upload, and no partial session is created
func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, digest string) (string, int64, error) {
if err := is.InitRepo(repo); err != nil {
return "", -1, err
}
dstDigest, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return "", -1, errors.ErrBadBlobDigest
}
u, err := guuid.NewV4()
if err != nil {
return "", -1, err
}
uuid := u.String()
src := is.BlobUploadPath(repo, uuid)
f, err := os.Create(src)
if err != nil {
is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob")
return "", -1, errors.ErrUploadNotFound
}
defer f.Close()
digester := sha256.New()
mw := io.MultiWriter(f, digester)
n, err := io.Copy(mw, body)
if err != nil {
return "", -1, err
}
srcDigest := godigest.NewDigestFromEncoded(godigest.SHA256, fmt.Sprintf("%x", digester.Sum(nil)))
if srcDigest != dstDigest {
is.log.Error().Str("srcDigest", srcDigest.String()).
Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest")
return "", -1, errors.ErrBadBlobDigest
}
dir := path.Join(is.rootDir, repo)
dir = path.Join(dir, "blobs")
dir = path.Join(dir, dstDigest.Algorithm().String())
_ = os.MkdirAll(dir, 0755)
dst := is.BlobPath(repo, dstDigest)
// move the blob from uploads to final dest
_ = os.Rename(src, dst)
return uuid, n, err
}
// DeleteBlobUpload deletes an existing blob upload that is currently in progress.
func (is *ImageStore) DeleteBlobUpload(repo string, uuid string) error {
blobUploadPath := is.BlobUploadPath(repo, uuid)
_ = os.Remove(blobUploadPath)
return nil
}
// BlobPath returns the repository path of a blob.
func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string {
dir := path.Join(is.rootDir, repo)
blobPath := path.Join(dir, "blobs")
blobPath = path.Join(blobPath, digest.Algorithm().String())
blobPath = path.Join(blobPath, digest.Encoded())
return blobPath
}
// CheckBlob verifies a blob and returns true if the blob is correct.
func (is *ImageStore) CheckBlob(repo string, digest string,
mediaType string) (bool, int64, error) {
d, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return false, -1, errors.ErrBadBlobDigest
}
blobPath := is.BlobPath(repo, d)
blobInfo, err := os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return false, -1, errors.ErrBlobNotFound
}
return true, blobInfo.Size(), nil
}
// GetBlob returns a stream to read the blob.
// FIXME: we should probably parse the manifest and use (digest, mediaType) as a
// blob selector instead of directly downloading the blob
func (is *ImageStore) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) {
d, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return nil, -1, errors.ErrBadBlobDigest
}
blobPath := is.BlobPath(repo, d)
blobInfo, err := os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return nil, -1, errors.ErrBlobNotFound
}
blobReader, err := os.Open(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob")
return nil, -1, err
}
return blobReader, blobInfo.Size(), nil
}
// DeleteBlob removes the blob from the repository.
func (is *ImageStore) DeleteBlob(repo string, digest string) error {
d, err := godigest.Parse(digest)
if err != nil {
is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest")
return errors.ErrBlobNotFound
}
blobPath := is.BlobPath(repo, d)
_, err = os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
return errors.ErrBlobNotFound
}
_ = os.Remove(blobPath)
return nil
}
// garbage collection
// Scrub will clean up all unreferenced blobs.
// TODO
func Scrub(dir string, fix bool) error {
return nil
}
// utility routines
func dirExists(d string) bool {
fi, err := os.Stat(d)
if err != nil && os.IsNotExist(err) {
return false
}
if !fi.IsDir() {
return false
}
return true
}
func ensureDir(dir string) {
if err := os.MkdirAll(dir, 0755); err != nil {
panic(err)
}
}