From ce88c65155a2387621da0cbec6f2d6238c7d53a5 Mon Sep 17 00:00:00 2001 From: "Chen, Tingjie" Date: Tue, 27 Aug 2019 07:38:45 +0000 Subject: [PATCH] Implement MinIO as cloud storage for RESTAPI agent Current support with patchHandler, since MinIO support multipart upload and resumable download, it is no need to add special implementation with tus protocol for resumable upload on cloud storage. Signed-off-by: Chen, Tingjie Change-Id: I0e928d3401168ea946b5d6790312842e0a54784d --- cmd/bpa-restapi-agent/README.md | 28 ++++ cmd/bpa-restapi-agent/api/api.go | 14 +- cmd/bpa-restapi-agent/api/imagehandler.go | 19 ++- cmd/bpa-restapi-agent/internal/config/config.go | 8 ++ cmd/bpa-restapi-agent/internal/storage/minio.go | 183 ++++++++++++++++++++++++ 5 files changed, 248 insertions(+), 4 deletions(-) create mode 100644 cmd/bpa-restapi-agent/internal/storage/minio.go diff --git a/cmd/bpa-restapi-agent/README.md b/cmd/bpa-restapi-agent/README.md index a74434c..27a7a51 100644 --- a/cmd/bpa-restapi-agent/README.md +++ b/cmd/bpa-restapi-agent/README.md @@ -4,3 +4,31 @@ To run the server, follow these simple steps: ``` go run main.go ``` + +# Cloud Storage with MinIO + +Start MinIO server daemon with docker command before running REST API agent, +default settings in config/config.go. +AccessKeyID: ICN-ACCESSKEYID +SecretAccessKey: ICN-SECRETACCESSKEY +MinIO Port: 9000 + +You can setup MinIO server my the following command with default credentials. +``` +$ docker run -p 9000:9000 --name minio1 \ + -e "MINIO_ACCESS_KEY=ICN-ACCESSKEYID" \ + -e "MINIO_SECRET_KEY=ICN-SECRETACCESSKEY" \ + -v /mnt/data:/data \ + minio/minio server /data +``` +Also there is a Kubernetes deployment for MinIO server in standalone mode. +``` +$ cd deploy/kud-plugin-addons/minio +$ ./install.sh +``` +You can check the status by opening a browser: http://127.0.0.1:9000/ + +MinIO Client implementation integrated in REST API agent and will automatically +initialize in main.go, and create 3 buckets: binary, container, operatingsystem. +The Upload image will "PUT" to corresponding buckets by HTTP PATCH request url. + diff --git a/cmd/bpa-restapi-agent/api/api.go b/cmd/bpa-restapi-agent/api/api.go index d70cd80..5533efb 100644 --- a/cmd/bpa-restapi-agent/api/api.go +++ b/cmd/bpa-restapi-agent/api/api.go @@ -4,7 +4,10 @@ package api import ( + "log" + image "bpa-restapi-agent/internal/app" + minio "bpa-restapi-agent/internal/storage" "github.com/gorilla/mux" ) @@ -16,11 +19,16 @@ func NewRouter(binaryClient image.ImageManager, router := mux.NewRouter() + minioInfo, err := minio.Initialize() + if err != nil { + log.Println("Error while initialize minio client: %s", err) + } + //Setup the image uploaad api handler here if binaryClient == nil { binaryClient = image.NewBinaryImageClient() } - binaryHandler := imageHandler{client: binaryClient} + binaryHandler := imageHandler{client: binaryClient, minioI: minioInfo, storeName: "binary"} imgRouter := router.PathPrefix("/v1").Subrouter() imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/binary_images", binaryHandler.createHandler).Methods("POST") imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/binary_images/{imgname}", binaryHandler.getHandler).Methods("GET") @@ -32,7 +40,7 @@ func NewRouter(binaryClient image.ImageManager, if containerClient == nil { containerClient = image.NewContainerImageClient() } - containerHandler := imageHandler{client: containerClient} + containerHandler := imageHandler{client: containerClient, minioI: minioInfo, storeName: "container"} imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/container_images", containerHandler.createHandler).Methods("POST") imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/container_images/{imgname}", containerHandler.getHandler).Methods("GET") imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/container_images/{imgname}", containerHandler.deleteHandler).Methods("DELETE") @@ -43,7 +51,7 @@ func NewRouter(binaryClient image.ImageManager, if osClient == nil { osClient = image.NewOSImageClient() } - osHandler := imageHandler{client: osClient} + osHandler := imageHandler{client: osClient, minioI: minioInfo, storeName: "operatingsystem"} imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/os_images", osHandler.createHandler).Methods("POST") imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/os_images/{imgname}", osHandler.getHandler).Methods("GET") imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/os_images/{imgname}", osHandler.deleteHandler).Methods("DELETE") diff --git a/cmd/bpa-restapi-agent/api/imagehandler.go b/cmd/bpa-restapi-agent/api/imagehandler.go index 0d7b787..d9b1844 100644 --- a/cmd/bpa-restapi-agent/api/imagehandler.go +++ b/cmd/bpa-restapi-agent/api/imagehandler.go @@ -15,6 +15,7 @@ import ( "strconv" image "bpa-restapi-agent/internal/app" + minioc "bpa-restapi-agent/internal/storage" "github.com/gorilla/mux" ) @@ -26,6 +27,8 @@ type imageHandler struct { // We will set this variable with a mock interface for testing client image.ImageManager dirPath string + minioI minioc.MinIOInfo + storeName string // as minio client bucketname } // CreateHandler handles creation of the image entry in the database @@ -176,6 +179,8 @@ func (h imageHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { return } + h.minioI.DeleteImage(h.storeName, imageName) + w.WriteHeader(http.StatusNoContent) } @@ -262,6 +267,7 @@ func (h imageHandler) patchHandler(w http.ResponseWriter, r *http.Request) { e := "Upload already completed" w.WriteHeader(http.StatusUnprocessableEntity) w.Write([]byte(e)) + log.Println("Upload already completed") return } off, err := strconv.Atoi(r.Header.Get("Upload-Offset")) @@ -272,9 +278,10 @@ func (h imageHandler) patchHandler(w http.ResponseWriter, r *http.Request) { } log.Printf("Upload offset %d\n", off) if *file.ImageOffset != off { - e := fmt.Sprintf("Expected Offset %d got offset %d", *file.ImageOffset, off) + e := fmt.Sprintf("Expected Offset %d, actual offset %d", *file.ImageOffset, off) w.WriteHeader(http.StatusConflict) w.Write([]byte(e)) + log.Printf("Expected Offset:%d doesn't match got offset:%d\n", *file.ImageOffset, off) return } @@ -324,6 +331,16 @@ func (h imageHandler) patchHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } + + log.Printf("Start to Patch image, bucket: %s, image: %s, dirpath: %s, offset: %d, n: %d\n", + h.storeName, imageName, fp, *file.ImageOffset, n) + uploadbytes, err := h.minioI.PatchImage(h.storeName, imageName, fp, int64(*file.ImageOffset), int64(n)) + if err != nil || uploadbytes == 0 { + log.Printf("MinIO upload with offset %d failed: %s", *file.ImageOffset, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + log.Println("number of bytes written ", n) no := *file.ImageOffset + n file.ImageOffset = &no diff --git a/cmd/bpa-restapi-agent/internal/config/config.go b/cmd/bpa-restapi-agent/internal/config/config.go index e5d4f48..e4911d2 100644 --- a/cmd/bpa-restapi-agent/internal/config/config.go +++ b/cmd/bpa-restapi-agent/internal/config/config.go @@ -11,6 +11,10 @@ type Configuration struct { DatabaseAddress string `json: "database-address"` DatabaseType string `json: "database-type"` ServicePort string `json: "service-port"` + MinIOAddress string `json: "minio-address"` + MinIOPort string `json: "minio-port"` + AccessKeyID string `json: "access-key-id"` + SecretAccessKey string `json: "secret-access-key"` } var gConfig *Configuration @@ -39,6 +43,10 @@ func defaultConfiguration() *Configuration { DatabaseAddress: "127.0.0.1", DatabaseType: "mongo", ServicePort: "9015", + MinIOAddress: "127.0.0.1", + MinIOPort: "9000", + AccessKeyID: "ICN-ACCESSKEYID", + SecretAccessKey: "ICN-SECRETACCESSKEY", } } diff --git a/cmd/bpa-restapi-agent/internal/storage/minio.go b/cmd/bpa-restapi-agent/internal/storage/minio.go new file mode 100644 index 0000000..3eed689 --- /dev/null +++ b/cmd/bpa-restapi-agent/internal/storage/minio.go @@ -0,0 +1,183 @@ +package storage + +import ( + "github.com/minio/minio-go/v6" + "bpa-restapi-agent/internal/config" + + "log" + "os" +) + +type MinIOInfo struct { + minioC *minio.Client `json:"minio client"` +} + +// Initialize the MinIO server, create buckets +func Initialize() (MinIOInfo, error) { + endpoint := config.GetConfiguration().MinIOAddress + ":" + config.GetConfiguration().MinIOPort + accessKeyID := config.GetConfiguration().AccessKeyID + secretAccessKey := config.GetConfiguration().SecretAccessKey + useSSL := false + + minioInfo := MinIOInfo{} + // Initialize minio client object. + minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, useSSL) + if err != nil { + log.Fatalln(err) + return minioInfo, err + } + + // Make a new bucket. + bucketNames := []string{"binary", "container", "operatingsystem"} + location := "us-west-1" + + for _, bucketName := range bucketNames { + err := minioClient.MakeBucket(bucketName, location) + if err != nil { + // Check to see if we already own this bucket (which happens if you run this twice) + exists, errBucketExists := minioClient.BucketExists(bucketName) + if errBucketExists == nil && exists { + log.Printf("We already own %s\n", bucketName) + } else { + log.Fatalln(err) + return minioInfo, err + } + } else { + log.Printf("Successfully created %s\n", bucketName) + } + } + + minioInfo.minioC = minioClient + return minioInfo, nil +} + +func (m MinIOInfo) PutImage(bucketName string, objName string, localPath string) (int64, error) { + + //contentType := "multipart/form-data" + contentType := "application/octet-stream" + + // Upload the zip file with FPutObject + n, err := m.minioC.FPutObject(bucketName, objName, localPath, minio.PutObjectOptions{ContentType:contentType}) + if err != nil { + log.Fatalln(err) + return n, err + } + + fileInfo, _ := os.Stat(localPath) + fileSize := fileInfo.Size() + + if n != int64(fileSize) { + log.Printf("FPutObject failed %s of size %d\n", objName, n) + return n, err + } + + log.Printf("Successfully uploaded %s of size %d\n", objName, n) + return n, nil +} + +func (m MinIOInfo) PatchImage(bucketName string, objName string, localPath string, offset int64, objSize int64) (int64, error) { + + var n = int64(0) + + tempFile, err := os.Open(localPath) + if err != nil { + log.Fatalln(err) + return n, err + } + + defer tempFile.Close() + + if _, err := tempFile.Seek(offset, 0); err != nil { + log.Printf("PatchImage seek %s failed: %s", tempFile.Name(), err) + return n, err + } + + objInfo, err := m.minioC.StatObject(bucketName, objName, minio.StatObjectOptions{}) + var objHealthy = true + if err != nil { + objHealthy = false + } else if objInfo.Size != offset || objInfo.Size == 0 { + objHealthy = false + } + + var objNameTemp = objName + if objHealthy { + objNameTemp = objName + ".tmp" + } + + contentType := "application/octet-stream" + n, err = m.minioC.PutObject(bucketName, objNameTemp, tempFile, objSize, minio.PutObjectOptions{ContentType:contentType}) + if err != nil { + log.Fatalln(err) + return n, err + } + + if n != objSize { + log.Printf("PatchImage PutObject %s failed with bytes: %d", tempFile.Name(), n) + return n, err + } + + if objHealthy { + src1 := minio.NewSourceInfo(bucketName, objName, nil) + src2 := minio.NewSourceInfo(bucketName, objNameTemp, nil) + srcs := []minio.SourceInfo{src1, src2} + + dst, err := minio.NewDestinationInfo(bucketName, objName, nil, nil) + if err != nil { + log.Printf("NewDestinationInfo failed", err) + return n, err + } + + // There is issue, the last src should be the smallest obj size + err = m.minioC.ComposeObject(dst, srcs) + if err != nil { + log.Printf("ComposeObject failed", err) + return n, err + } + } + + log.Printf("Successfully PatchImage %s of size %d\n", objName, n) + return n, nil +} + +func (m MinIOInfo) DeleteImage(bucketName string, objName string) (error) { + + err := m.minioC.RemoveObject(bucketName, objName) + if err != nil { + log.Printf("MinIO Remove object %s failed\n", bucketName) + return err + } + + return nil +} + +func (m MinIOInfo) CleanupImages(bucketName string) (error) { + // create a done channel to control 'ListObjectsV2' go routine. + doneCh := make(chan struct{}) + defer close(doneCh) + + for objCh := range m.minioC.ListObjectsV2(bucketName, "", true, doneCh) { + if objCh.Err != nil { + return objCh.Err + } + if objCh.Key != "" { + err := m.minioC.RemoveObject(bucketName, objCh.Key) + if err != nil { + return err + } + } + } + for objPartInfo := range m.minioC.ListIncompleteUploads(bucketName, "", true, doneCh) { + if objPartInfo.Err != nil { + return objPartInfo.Err + } + if objPartInfo.Key != "" { + err := m.minioC.RemoveIncompleteUpload(bucketName, objPartInfo.Key) + if err != nil { + return err + } + } + } + + return nil +} -- 2.16.6