Remove BPA operator
[icn.git] / cmd / bpa-restapi-agent / internal / db / mongo.go
1 package db
2
3 import (
4         "golang.org/x/net/context"
5         "log"
6
7         "bpa-restapi-agent/internal/config"
8
9         pkgerrors "github.com/pkg/errors"
10         "go.mongodb.org/mongo-driver/bson"
11         "go.mongodb.org/mongo-driver/bson/primitive"
12         "go.mongodb.org/mongo-driver/mongo"
13         "go.mongodb.org/mongo-driver/mongo/options"
14 )
15
16 // MongoCollection defines the a subset of MongoDB operations
17 // Note: This interface is defined mainly for mock testing
18 type MongoCollection interface {
19         InsertOne(ctx context.Context, document interface{},
20                 opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error)
21         FindOne(ctx context.Context, filter interface{},
22                 opts ...*options.FindOneOptions) *mongo.SingleResult
23         FindOneAndUpdate(ctx context.Context, filter interface{},
24                 update interface{}, opts ...*options.FindOneAndUpdateOptions) *mongo.SingleResult
25         DeleteOne(ctx context.Context, filter interface{},
26                 opts ...*options.DeleteOptions) (*mongo.DeleteResult, error)
27         Find(ctx context.Context, filter interface{},
28                 opts ...*options.FindOptions) (*mongo.Cursor, error)
29 }
30
31 // MongoStore is an implementation of the db.Store interface
32 type MongoStore struct {
33         db *mongo.Database
34 }
35
36 // This exists only for allowing us to mock the collection object
37 // for testing purposes
38 var getCollection = func(coll string, m *MongoStore) MongoCollection {
39         return m.db.Collection(coll)
40 }
41
42 // This exists only for allowing us to mock the DecodeBytes function
43 // Mainly because we cannot construct a SingleResult struct from our
44 // tests. All fields in that struct are private.
45 var decodeBytes = func(sr *mongo.SingleResult) (bson.Raw, error) {
46         return sr.DecodeBytes()
47 }
48
49 // These exists only for allowing us to mock the cursor.Next function
50 // Mainly because we cannot construct a mongo.Cursor struct from our
51 // tests. All fields in that struct are private and there is no public
52 // constructor method.
53 var cursorNext = func(ctx context.Context, cursor *mongo.Cursor) bool {
54         return cursor.Next(ctx)
55 }
56 var cursorClose = func(ctx context.Context, cursor *mongo.Cursor) error {
57         return cursor.Close(ctx)
58 }
59
60 // NewMongoStore initializes a Mongo Database with the name provided
61 // If a database with that name exists, it will be returned
62 func NewMongoStore(name string, store *mongo.Database) (Store, error) {
63         if store == nil {
64                 ip := "mongodb://" + config.GetConfiguration().DatabaseAddress + ":27017"
65                 clientOptions := options.Client()
66                 clientOptions.ApplyURI(ip)
67                 mongoClient, err := mongo.NewClient(clientOptions)
68                 if err != nil {
69                         return nil, err
70                 }
71
72                 err = mongoClient.Connect(context.Background())
73                 if err != nil {
74                         return nil, err
75                 }
76                 store = mongoClient.Database(name)
77         }
78
79         return &MongoStore{
80                 db: store,
81         }, nil
82 }
83
84 // HealthCheck verifies if the database is up and running
85 func (m *MongoStore) HealthCheck() error {
86
87         _, err := decodeBytes(m.db.RunCommand(context.Background(), bson.D{{"serverStatus", 1}}))
88         if err != nil {
89                 return pkgerrors.Wrap(err, "Error getting server status")
90         }
91
92         return nil
93 }
94
95 // validateParams checks to see if any parameters are empty
96 func (m *MongoStore) validateParams(args ...interface{}) bool {
97         for _, v := range args {
98                 val, ok := v.(string)
99                 if ok {
100                         if val == "" {
101                                 return false
102                         }
103                 } else {
104                         if v == nil {
105                                 return false
106                         }
107                 }
108         }
109
110         return true
111 }
112
113 // Create is used to create a DB entry
114 func (m *MongoStore) Create(coll string, key Key, tag string, data interface{}) error {
115         if data == nil || !m.validateParams(coll, key, tag) {
116                 return pkgerrors.New("No Data to store")
117         }
118
119         c := getCollection(coll, m)
120         ctx := context.Background()
121
122         //Insert the data and then add the objectID to the masterTable
123         res, err := c.InsertOne(ctx, bson.D{
124                 {tag, data},
125         })
126         if err != nil {
127                 return pkgerrors.Errorf("Error inserting into database: %s", err.Error())
128         }
129
130         //Add objectID of created data to masterKey document
131         //Create masterkey document if it does not exist
132         filter := bson.D{{"key", key}}
133
134         _, err = decodeBytes(
135                 c.FindOneAndUpdate(
136                         ctx,
137                         filter,
138                         bson.D{
139                                 {"$set", bson.D{
140                                         {tag, res.InsertedID},
141                                 }},
142                         },
143                         options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After)))
144
145         if err != nil {
146                 return pkgerrors.Errorf("Error updating master table: %s", err.Error())
147         }
148
149         return nil
150 }
151
152 // Update is used to update a DB entry
153 func (m *MongoStore) Update(coll string, key Key, tag string, data interface{}) error {
154         if data == nil || !m.validateParams(coll, key, tag) {
155                 return pkgerrors.New("No Data to update")
156         }
157
158         c := getCollection(coll, m)
159         ctx := context.Background()
160
161         //Get the masterkey document based on given key
162         filter := bson.D{{"key", key}}
163         keydata, err := decodeBytes(c.FindOne(context.Background(), filter))
164         if err != nil {
165                 return pkgerrors.Errorf("Error finding master table: %s", err.Error())
166         }
167
168         //Read the tag objectID from document
169         tagoid, ok := keydata.Lookup(tag).ObjectIDOK()
170         if !ok {
171                 return pkgerrors.Errorf("Error finding objectID for tag %s", tag)
172         }
173
174         //Update the document with new data
175         filter = bson.D{{"_id", tagoid}}
176
177         _, err = decodeBytes(
178                 c.FindOneAndUpdate(
179                         ctx,
180                         filter,
181                         bson.D{
182                                 {"$set", bson.D{
183                                         {tag, data},
184                                 }},
185                         },
186                         options.FindOneAndUpdate().SetReturnDocument(options.After)))
187
188         if err != nil {
189                 return pkgerrors.Errorf("Error updating record: %s", err.Error())
190         }
191
192         return nil
193 }
194
195 // Unmarshal implements an unmarshaler for bson data that
196 // is produced from the mongo database
197 func (m *MongoStore) Unmarshal(inp []byte, out interface{}) error {
198         err := bson.Unmarshal(inp, out)
199         if err != nil {
200                 return pkgerrors.Wrap(err, "Unmarshaling bson")
201         }
202         return nil
203 }
204
205 // Read method returns the data stored for this key and for this particular tag
206 func (m *MongoStore) Read(coll string, key Key, tag string) ([]byte, error) {
207         if !m.validateParams(coll, key, tag) {
208                 return nil, pkgerrors.New("Mandatory fields are missing")
209         }
210
211         c := getCollection(coll, m)
212         ctx := context.Background()
213
214         //Get the masterkey document based on given key
215         filter := bson.D{{"key", key}}
216         keydata, err := decodeBytes(c.FindOne(context.Background(), filter))
217         if err != nil {
218                 return nil, pkgerrors.Errorf("Error finding master table: %s", err.Error())
219         }
220
221         //Read the tag objectID from document
222         tagoid, ok := keydata.Lookup(tag).ObjectIDOK()
223         if !ok {
224                 return nil, pkgerrors.Errorf("Error finding objectID for tag %s", tag)
225         }
226
227         //Use tag objectID to read the data from store
228         filter = bson.D{{"_id", tagoid}}
229         tagdata, err := decodeBytes(c.FindOne(ctx, filter))
230         if err != nil {
231                 return nil, pkgerrors.Errorf("Error reading found object: %s", err.Error())
232         }
233
234         //Return the data as a byte array
235         //Convert string data to byte array using the built-in functions
236         switch tagdata.Lookup(tag).Type {
237         case bson.TypeString:
238                 return []byte(tagdata.Lookup(tag).StringValue()), nil
239         default:
240                 return tagdata.Lookup(tag).Value, nil
241         }
242 }
243
244 // Helper function that deletes an object by its ID
245 func (m *MongoStore) deleteObjectByID(coll string, objID primitive.ObjectID) error {
246
247         c := getCollection(coll, m)
248         ctx := context.Background()
249
250         _, err := c.DeleteOne(ctx, bson.D{{"_id", objID}})
251         if err != nil {
252                 return pkgerrors.Errorf("Error Deleting from database: %s", err.Error())
253         }
254
255         log.Printf("Deleted Obj with ID %s", objID.String())
256         return nil
257 }
258
259 // Delete method removes a document from the Database that matches key
260 // TODO: delete all referenced docs if tag is empty string
261 func (m *MongoStore) Delete(coll string, key Key, tag string) error {
262         if !m.validateParams(coll, key, tag) {
263                 return pkgerrors.New("Mandatory fields are missing")
264         }
265
266         c := getCollection(coll, m)
267         ctx := context.Background()
268
269         //Get the masterkey document based on given key
270         filter := bson.D{{"key", key}}
271         //Remove the tag ID entry from masterkey table
272         update := bson.D{
273                 {
274                         "$unset", bson.D{
275                                 {tag, ""},
276                         },
277                 },
278         }
279         keydata, err := decodeBytes(c.FindOneAndUpdate(ctx, filter, update,
280                 options.FindOneAndUpdate().SetReturnDocument(options.Before)))
281         if err != nil {
282                 //No document was found. Return nil.
283                 if err == mongo.ErrNoDocuments {
284                         return nil
285                 }
286                 //Return any other error that was found.
287                 return pkgerrors.Errorf("Error decoding master table after update: %s",
288                         err.Error())
289         }
290
291         //Read the tag objectID from document
292         elems, err := keydata.Elements()
293         if err != nil {
294                 return pkgerrors.Errorf("Error reading elements from database: %s", err.Error())
295         }
296
297         tagoid, ok := keydata.Lookup(tag).ObjectIDOK()
298         if !ok {
299                 return pkgerrors.Errorf("Error finding objectID for tag %s", tag)
300         }
301
302         //Use tag objectID to read the data from store
303         err = m.deleteObjectByID(coll, tagoid)
304         if err != nil {
305                 return pkgerrors.Errorf("Error deleting from database: %s", err.Error())
306         }
307
308         //Delete master table if no more tags left
309         //_id, key and tag should be elements in before doc
310         //if master table needs to be removed too
311         if len(elems) == 3 {
312                 keyid, ok := keydata.Lookup("_id").ObjectIDOK()
313                 if !ok {
314                         return pkgerrors.Errorf("Error finding objectID for key %s", key)
315                 }
316                 err = m.deleteObjectByID(coll, keyid)
317                 if err != nil {
318                         return pkgerrors.Errorf("Error deleting master table from database: %s", err.Error())
319                 }
320         }
321
322         return nil
323 }
324
325 // ReadAll is used to get all documents in db of a particular tag
326 func (m *MongoStore) ReadAll(coll, tag string) (map[string][]byte, error) {
327         if !m.validateParams(coll, tag) {
328                 return nil, pkgerrors.New("Missing collection or tag name")
329         }
330
331         c := getCollection(coll, m)
332         ctx := context.Background()
333
334         //Get all master tables in this collection
335         filter := bson.D{
336                 {"key", bson.D{
337                         {"$exists", true},
338                 }},
339         }
340         cursor, err := c.Find(ctx, filter)
341         if err != nil {
342                 return nil, pkgerrors.Errorf("Error reading from database: %s", err.Error())
343         }
344         defer cursorClose(ctx, cursor)
345
346         //Iterate over all the master tables
347         result := make(map[string][]byte)
348         for cursorNext(ctx, cursor) {
349                 d := cursor.Current
350
351                 //Read key of each master table
352                 key, ok := d.Lookup("key").DocumentOK()
353                 if !ok {
354                         //Throw error if key is not found
355                         pkgerrors.New("Unable to read key from mastertable")
356                 }
357
358                 //Get objectID of tag document
359                 tid, ok := d.Lookup(tag).ObjectIDOK()
360                 if !ok {
361                         log.Printf("Did not find tag: %s", tag)
362                         continue
363                 }
364
365                 //Find tag document and unmarshal it into []byte
366                 tagData, err := decodeBytes(c.FindOne(ctx, bson.D{{"_id", tid}}))
367                 if err != nil {
368                         log.Printf("Unable to decode tag data %s", err.Error())
369                         continue
370                 }
371                 result[key.String()] = tagData.Lookup(tag).Value
372         }
373
374         if len(result) == 0 {
375                 return result, pkgerrors.Errorf("Did not find any objects with tag: %s", tag)
376         }
377
378         return result, nil
379 }