Implement MinIO as cloud storage for RESTAPI agent 22/1522/5
authorChen, Tingjie <tingjie.chen@intel.com>
Tue, 27 Aug 2019 07:38:45 +0000 (07:38 +0000)
committerChen, Tingjie <tingjie.chen@intel.com>
Tue, 17 Sep 2019 01:42:21 +0000 (01:42 +0000)
Current support with patchHandler, since MinIO support multipart upload
and resumable download, it is no need to add special implementation
with tus protocol for resumable upload on cloud storage.

Change-Id: Iab42487a5f19b83240551db89981cc59d8dfe4dd
Signed-off-by: Chen, Tingjie <tingjie.chen@intel.com>
cmd/bpa-restapi-agent/README.md
cmd/bpa-restapi-agent/api/api.go
cmd/bpa-restapi-agent/api/imagehandler.go
cmd/bpa-restapi-agent/internal/config/config.go
cmd/bpa-restapi-agent/internal/storage/minio.go [new file with mode: 0644]

index a74434c..27a7a51 100644 (file)
@@ -4,3 +4,31 @@ To run the server, follow these simple steps:
 ```
 go run main.go
 ```
+
+# Cloud Storage with MinIO
+
+Start MinIO server daemon with docker command before running REST API agent,
+default settings in config/config.go.
+AccessKeyID: ICN-ACCESSKEYID
+SecretAccessKey: ICN-SECRETACCESSKEY
+MinIO Port: 9000
+
+You can setup MinIO server my the following command with default credentials.
+```
+$ docker run -p 9000:9000 --name minio1 \
+  -e "MINIO_ACCESS_KEY=ICN-ACCESSKEYID" \
+  -e "MINIO_SECRET_KEY=ICN-SECRETACCESSKEY" \
+  -v /mnt/data:/data \
+  minio/minio server /data
+```
+Also there is a Kubernetes deployment for MinIO server in standalone mode.
+```
+$ cd deploy/kud-plugin-addons/minio
+$ ./install.sh
+```
+You can check the status by opening a browser: http://127.0.0.1:9000/
+
+MinIO Client implementation integrated in REST API agent and will automatically
+initialize in main.go, and create 3 buckets: binary, container, operatingsystem.
+The Upload image will "PUT" to corresponding buckets by HTTP PATCH request url.
+
index d70cd80..5533efb 100644 (file)
@@ -4,7 +4,10 @@
 package api
 
 import (
+       "log"
+
        image "bpa-restapi-agent/internal/app"
+       minio "bpa-restapi-agent/internal/storage"
 
        "github.com/gorilla/mux"
 )
@@ -16,11 +19,16 @@ func NewRouter(binaryClient image.ImageManager,
 
        router := mux.NewRouter()
 
+       minioInfo, err := minio.Initialize()
+       if err != nil {
+               log.Println("Error while initialize minio client: %s", err)
+       }
+
        //Setup the image uploaad api handler here
        if binaryClient == nil {
                binaryClient = image.NewBinaryImageClient()
        }
-       binaryHandler := imageHandler{client: binaryClient}
+       binaryHandler := imageHandler{client: binaryClient, minioI: minioInfo, storeName: "binary"}
        imgRouter := router.PathPrefix("/v1").Subrouter()
        imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/binary_images", binaryHandler.createHandler).Methods("POST")
        imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/binary_images/{imgname}", binaryHandler.getHandler).Methods("GET")
@@ -32,7 +40,7 @@ func NewRouter(binaryClient image.ImageManager,
        if containerClient == nil {
                containerClient = image.NewContainerImageClient()
        }
-       containerHandler := imageHandler{client: containerClient}
+       containerHandler := imageHandler{client: containerClient, minioI: minioInfo, storeName: "container"}
        imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/container_images", containerHandler.createHandler).Methods("POST")
        imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/container_images/{imgname}", containerHandler.getHandler).Methods("GET")
        imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/container_images/{imgname}", containerHandler.deleteHandler).Methods("DELETE")
@@ -43,7 +51,7 @@ func NewRouter(binaryClient image.ImageManager,
        if osClient == nil {
                osClient = image.NewOSImageClient()
        }
-       osHandler := imageHandler{client: osClient}
+       osHandler := imageHandler{client: osClient, minioI: minioInfo, storeName: "operatingsystem"}
        imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/os_images", osHandler.createHandler).Methods("POST")
        imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/os_images/{imgname}", osHandler.getHandler).Methods("GET")
        imgRouter.HandleFunc("/baremetalcluster/{owner}/{clustername}/os_images/{imgname}", osHandler.deleteHandler).Methods("DELETE")
index 0d7b787..d9b1844 100644 (file)
@@ -15,6 +15,7 @@ import (
        "strconv"
 
        image "bpa-restapi-agent/internal/app"
+       minioc "bpa-restapi-agent/internal/storage"
 
        "github.com/gorilla/mux"
 )
@@ -26,6 +27,8 @@ type imageHandler struct {
        // We will set this variable with a mock interface for testing
        client image.ImageManager
        dirPath string
+       minioI minioc.MinIOInfo
+       storeName string  // as minio client bucketname
 }
 
 // CreateHandler handles creation of the image entry in the database
@@ -176,6 +179,8 @@ func (h imageHandler) deleteHandler(w http.ResponseWriter, r *http.Request) {
                return
        }
 
+       h.minioI.DeleteImage(h.storeName, imageName)
+
        w.WriteHeader(http.StatusNoContent)
 }
 
@@ -262,6 +267,7 @@ func (h imageHandler) patchHandler(w http.ResponseWriter, r *http.Request) {
                e := "Upload already completed"
                w.WriteHeader(http.StatusUnprocessableEntity)
                w.Write([]byte(e))
+               log.Println("Upload already completed")
                return
        }
        off, err := strconv.Atoi(r.Header.Get("Upload-Offset"))
@@ -272,9 +278,10 @@ func (h imageHandler) patchHandler(w http.ResponseWriter, r *http.Request) {
        }
        log.Printf("Upload offset %d\n", off)
        if *file.ImageOffset != off {
-               e := fmt.Sprintf("Expected Offset %d got offset %d", *file.ImageOffset, off)
+               e := fmt.Sprintf("Expected Offset %d, actual offset %d", *file.ImageOffset, off)
                w.WriteHeader(http.StatusConflict)
                w.Write([]byte(e))
+               log.Printf("Expected Offset:%d doesn't match got offset:%d\n", *file.ImageOffset, off)
                return
        }
 
@@ -324,6 +331,16 @@ func (h imageHandler) patchHandler(w http.ResponseWriter, r *http.Request) {
                w.WriteHeader(http.StatusInternalServerError)
                return
        }
+
+       log.Printf("Start to Patch image, bucket: %s, image: %s, dirpath: %s, offset: %d, n: %d\n",
+               h.storeName, imageName, fp, *file.ImageOffset, n)
+       uploadbytes, err := h.minioI.PatchImage(h.storeName, imageName, fp, int64(*file.ImageOffset), int64(n))
+       if err != nil || uploadbytes == 0  {
+               log.Printf("MinIO upload with offset %d failed: %s", *file.ImageOffset, err)
+               w.WriteHeader(http.StatusInternalServerError)
+               return
+    }
+
        log.Println("number of bytes written ", n)
        no := *file.ImageOffset + n
        file.ImageOffset = &no
index e5d4f48..e4911d2 100644 (file)
@@ -11,6 +11,10 @@ type Configuration struct {
   DatabaseAddress string  `json:  "database-address"`
   DatabaseType    string  `json:  "database-type"`
   ServicePort     string  `json:  "service-port"`
+  MinIOAddress    string  `json:  "minio-address"`
+  MinIOPort       string  `json:  "minio-port"`
+  AccessKeyID     string  `json:  "access-key-id"`
+  SecretAccessKey string  `json:  "secret-access-key"`
 }
 
 var gConfig *Configuration
@@ -39,6 +43,10 @@ func defaultConfiguration() *Configuration {
     DatabaseAddress:    "127.0.0.1",
     DatabaseType:       "mongo",
     ServicePort:        "9015",
+    MinIOAddress:       "127.0.0.1",
+    MinIOPort:          "9000",
+    AccessKeyID:        "ICN-ACCESSKEYID",
+    SecretAccessKey:    "ICN-SECRETACCESSKEY",
   }
 }
 
diff --git a/cmd/bpa-restapi-agent/internal/storage/minio.go b/cmd/bpa-restapi-agent/internal/storage/minio.go
new file mode 100644 (file)
index 0000000..3eed689
--- /dev/null
@@ -0,0 +1,183 @@
+package storage
+
+import (
+    "github.com/minio/minio-go/v6"
+    "bpa-restapi-agent/internal/config"
+
+    "log"
+    "os"
+)
+
+type MinIOInfo struct {
+       minioC         *minio.Client            `json:"minio client"`
+}
+
+// Initialize the MinIO server, create buckets
+func Initialize() (MinIOInfo, error) {
+       endpoint := config.GetConfiguration().MinIOAddress + ":" + config.GetConfiguration().MinIOPort
+    accessKeyID := config.GetConfiguration().AccessKeyID
+    secretAccessKey := config.GetConfiguration().SecretAccessKey
+    useSSL := false
+
+    minioInfo := MinIOInfo{}
+    // Initialize minio client object.
+    minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, useSSL)
+    if err != nil {
+        log.Fatalln(err)
+        return minioInfo, err
+    }
+
+    // Make a new bucket.
+    bucketNames := []string{"binary", "container", "operatingsystem"}
+    location := "us-west-1"
+
+    for _, bucketName := range bucketNames {
+        err := minioClient.MakeBucket(bucketName, location)
+        if err != nil {
+            // Check to see if we already own this bucket (which happens if you run this twice)
+            exists, errBucketExists := minioClient.BucketExists(bucketName)
+            if errBucketExists == nil && exists {
+                log.Printf("We already own %s\n", bucketName)
+            } else {
+                log.Fatalln(err)
+                return minioInfo, err
+            }
+        } else {
+            log.Printf("Successfully created %s\n", bucketName)
+        }
+    }
+
+    minioInfo.minioC = minioClient
+    return minioInfo, nil
+}
+
+func (m MinIOInfo) PutImage(bucketName string, objName string, localPath string) (int64, error) {
+
+    //contentType := "multipart/form-data"
+    contentType := "application/octet-stream"
+
+    // Upload the zip file with FPutObject
+    n, err := m.minioC.FPutObject(bucketName, objName, localPath, minio.PutObjectOptions{ContentType:contentType})
+    if err != nil {
+               log.Fatalln(err)
+               return n, err
+    }
+
+    fileInfo, _ := os.Stat(localPath)
+    fileSize := fileInfo.Size()
+
+    if n != int64(fileSize) {
+        log.Printf("FPutObject failed %s of size %d\n", objName, n)
+        return n, err
+    }
+
+       log.Printf("Successfully uploaded %s of size %d\n", objName, n)
+       return n, nil
+}
+
+func (m MinIOInfo) PatchImage(bucketName string, objName string, localPath string, offset int64, objSize int64) (int64, error) {
+
+    var n = int64(0)
+
+    tempFile, err := os.Open(localPath)
+    if err != nil {
+        log.Fatalln(err)
+        return n, err
+    }
+
+    defer tempFile.Close()
+
+    if _, err := tempFile.Seek(offset, 0); err != nil {
+        log.Printf("PatchImage seek %s failed: %s", tempFile.Name(), err)
+        return n, err
+    }
+
+    objInfo, err := m.minioC.StatObject(bucketName, objName, minio.StatObjectOptions{})
+    var objHealthy = true
+    if err != nil {
+        objHealthy = false
+    } else if objInfo.Size != offset || objInfo.Size == 0 {
+        objHealthy = false
+    }
+
+    var objNameTemp = objName
+    if objHealthy {
+        objNameTemp = objName + ".tmp"
+    }
+
+    contentType := "application/octet-stream"
+    n, err = m.minioC.PutObject(bucketName, objNameTemp, tempFile, objSize, minio.PutObjectOptions{ContentType:contentType})
+    if err != nil {
+        log.Fatalln(err)
+        return n, err
+    }
+
+    if n != objSize {
+        log.Printf("PatchImage PutObject %s failed with bytes: %d", tempFile.Name(), n)
+        return n, err
+    }
+
+    if objHealthy {
+        src1 := minio.NewSourceInfo(bucketName, objName, nil)
+        src2 := minio.NewSourceInfo(bucketName, objNameTemp, nil)
+        srcs := []minio.SourceInfo{src1, src2}
+
+        dst, err := minio.NewDestinationInfo(bucketName, objName, nil, nil)
+        if err != nil {
+            log.Printf("NewDestinationInfo failed", err)
+            return n, err
+        }
+
+        // There is issue, the last src should be the smallest obj size
+        err = m.minioC.ComposeObject(dst, srcs)
+        if err != nil {
+            log.Printf("ComposeObject failed", err)
+            return n, err
+        }
+    }
+
+    log.Printf("Successfully PatchImage %s of size %d\n", objName, n)
+    return n, nil
+}
+
+func (m MinIOInfo) DeleteImage(bucketName string, objName string) (error) {
+
+    err := m.minioC.RemoveObject(bucketName, objName)
+    if err != nil {
+        log.Printf("MinIO Remove object %s failed\n", bucketName)
+        return err
+    }
+
+    return nil
+}
+
+func (m MinIOInfo) CleanupImages(bucketName string) (error) {
+    // create a done channel to control 'ListObjectsV2' go routine.
+    doneCh := make(chan struct{})
+    defer close(doneCh)
+
+    for objCh := range m.minioC.ListObjectsV2(bucketName, "", true, doneCh) {
+        if objCh.Err != nil {
+            return objCh.Err
+        }
+        if objCh.Key != "" {
+            err := m.minioC.RemoveObject(bucketName, objCh.Key)
+            if err != nil {
+                return err
+            }
+        }
+    }
+    for objPartInfo := range m.minioC.ListIncompleteUploads(bucketName, "", true, doneCh) {
+        if objPartInfo.Err != nil {
+            return objPartInfo.Err
+        }
+        if objPartInfo.Key != "" {
+            err := m.minioC.RemoveIncompleteUpload(bucketName, objPartInfo.Key)
+            if err != nil {
+                return err
+            }
+        }
+    }
+
+    return nil
+}