stream the transmission of blobs

This commit is contained in:
Wenkai Yin 2016-05-17 18:23:45 +08:00
parent 310460fa68
commit e4cb015d34

View File

@ -19,6 +19,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
@ -385,7 +386,7 @@ func (r *Repository) BlobExist(digest string) (bool, error) {
} }
// PullBlob ... // PullBlob ...
func (r *Repository) PullBlob(digest string) (size int64, data []byte, err error) { func (r *Repository) PullBlob(digest string) (size int64, data io.ReadCloser, err error) {
req, err := http.NewRequest("GET", buildBlobURL(r.Endpoint.String(), r.Name, digest), nil) req, err := http.NewRequest("GET", buildBlobURL(r.Endpoint.String(), r.Name, digest), nil)
if err != nil { if err != nil {
return return
@ -401,19 +402,19 @@ func (r *Repository) PullBlob(digest string) (size int64, data []byte, err error
return return
} }
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
if resp.StatusCode == http.StatusOK { if resp.StatusCode == http.StatusOK {
contengLength := resp.Header.Get(http.CanonicalHeaderKey("Content-Length")) contengLength := resp.Header.Get(http.CanonicalHeaderKey("Content-Length"))
size, err = strconv.ParseInt(contengLength, 10, 64) size, err = strconv.ParseInt(contengLength, 10, 64)
if err != nil { if err != nil {
return return
} }
data = b data = resp.Body
return
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return return
} }
@ -462,8 +463,8 @@ func (r *Repository) initiateBlobUpload(name string) (location, uploadUUID strin
return return
} }
func (r *Repository) monolithicBlobUpload(location, digest string, size int64, data []byte) error { func (r *Repository) monolithicBlobUpload(location, digest string, size int64, data io.Reader) error {
req, err := http.NewRequest("PUT", buildMonolithicBlobUploadURL(location, digest), bytes.NewReader(data)) req, err := http.NewRequest("PUT", buildMonolithicBlobUploadURL(location, digest), data)
if err != nil { if err != nil {
return err return err
} }
@ -496,17 +497,7 @@ func (r *Repository) monolithicBlobUpload(location, digest string, size int64, d
} }
// PushBlob ... // PushBlob ...
func (r *Repository) PushBlob(digest string, size int64, data []byte) error { func (r *Repository) PushBlob(digest string, size int64, data io.Reader) error {
exist, err := r.BlobExist(digest)
if err != nil {
return err
}
if exist {
log.Infof("blob already exists, skip pushing: %s %s", r.Name, digest)
return nil
}
location, _, err := r.initiateBlobUpload(r.Name) location, _, err := r.initiateBlobUpload(r.Name)
if err != nil { if err != nil {
return err return err