# # Copyright 2020 Huawei Technologies Co., Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import config from flask_sslify import SSLify from flask import Flask, request, jsonify, Response from flask_cors import CORS from influxdb import InfluxDBClient import json import requests import os import cv2 import os.path from os import path import base64 import time import sys app = Flask(__name__) CORS(app) sslify = SSLify(app) app.config['JSON_AS_ASCII'] = False app.config['UPLOAD_PATH'] = '/usr/app/images_result/' app.config['VIDEO_PATH'] = '/usr/app/test/resources/' app.config['supports_credentials'] = True app.config['CORS_SUPPORTS_CREDENTIALS'] = True app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 ALLOWED_EXTENSIONS = set(['png', 'jpg', 'jpeg']) ALLOWED_VIDEO_EXTENSIONS = {'mp4'} count = 0 listOfMsgs = [] listOfCameras = [] listOfVideos = [] class inventory_info: """ Store the data and manage multiple input video feeds """ def __init__(self, status="Needs Filling", time=0): self.type = "Shelf_INV1" self.labels = "Bottles" self.status = status self.currentCount = 1 self.maxCount = 5 self.time = time def setstatus(self, status): self.status = status def getstatus(self): return self.status def setcurrentCount(self, count): self.currentCount = count def getcurrentCount(self): return self.currentCount def setmaxCount(self, count): self.maxCount = count def getmaxCount(self): return self.maxCount def setlabel(self, labels): self.labels = labels def getlabel(self): return self.labels def settime(self, time): self.labels = time def gettime(self): return self.time # temporary copied capture_frame file to this due to docker issue for module # import class VideoCamera(object): """ opneCV to capture frame from a camera """ def __init__(self, url): self.video = cv2.VideoCapture(url) def delete(self): self.video.release() def get_frame(self): """ get a frame from camera url """ success, image = self.video.read() return success, image class VideoFile(object): """ opneCV to capture frame from a video stream """ def __init__(self, video_name): self.video = cv2.VideoCapture(app.config['VIDEO_PATH'] + video_name) def delete(self): self.video.release() def get_frame(self): """ get a frane from stream """ success, image = self.video.read() return success, image def shelf_inventory(video_capture, camera_info, true=None): """ shelf_inventory """ global count global mock_func labels = "Bottles" count_val = 'ObjCount' process_this_frame = 0 i = 0 url = config.detection_url + "detect" url_get = config.detection_url + "image" while True: success, frame = video_capture.get_frame() if not success: print('read frame from file is failed') break i = i+1 if i < 10: continue i = 0 if process_this_frame == 0: imencoded = cv2.imencode(".jpg", frame)[1] file = {'file': ( 'image.jpg', imencoded.tostring(), 'image/jpeg', {'Expires': '0'})} res = requests.post(url, files=file) data = json.loads(res.text) # get image response = requests.get(url_get) file = open(app.config['UPLOAD_PATH'] + "sample_image.jpg", "wb") file.write(response.content) file.close() inven_info = inventory_info() current_count = data[count_val] if (current_count >= 3): status = "Mostly Filled" elif (current_count == 2): status = "Partially Filled" else: status = "Needs Filling" inven_info.setlabel(labels) inven_info.setstatus(status) inven_info.setcurrentCount(current_count) time_sec = time.time() local_time = time.ctime(time_sec) inven_info.time = local_time store_info_db(inven_info) time.sleep(0.30) def db_drop_table(inven_info): """ cleanup measrurment before new trigger :param inven_info: inven_info object :return: None """ global db_client db_client.drop_measurement(inven_info.type) def store_info_db(inven_info): """ Send "shelf" data to InfluxDB :param inven_info: Inventry object :return: None """ global db_client json_body = [ { "measurement": inven_info.type, "tags": { "object": "bottles", }, "fields": { "time": inven_info.time, "status": inven_info.status, "currentCount": inven_info.currentCount, "maxCount": inven_info.maxCount, } }] db_client.write_points(json_body) def retrive_info_db(): """ Send "shelf" data to InfluxDB :param inven_info: Inventry object :return: None """ global db_client # get data last n data points from DB result = db_client.query('select * from Shelf_INV1 order by desc limit ' '1;') # Get points and iterate over each record points = result.get_points(tags={"object": "bottles"}) # clear the msg list # listOfMsgs.clear() del listOfMsgs[:] # iterate points and fill the records and insert to list for point in points: print("status: %s,Time: %s" % (point['status'], point['time'])) newdict = {"shelfName": 'Shelf_INV1', "ObjType": "bottles", "status": point['status'], "currentCount": point['currentCount'], "maxCount": point['maxCount'], "time": point['time']} listOfMsgs.insert(0, newdict) def create_database(): """ Connect to InfluxDB and create the database :return: None """ global db_client proxy = {"http": "http://{}:{}".format(config.IPADDRESS, config.PORT)} db_client = InfluxDBClient(host=config.IPADDRESS, port=config.PORT, proxies=proxy, database=config.DATABASE_NAME) db_client.create_database(config.DATABASE_NAME) @app.route('/v1/inventry/table', methods=['GET']) def inventry_table(): """ return inventry table :return: inventry table """ retrive_info_db() table = {"InventryData": listOfMsgs} return jsonify(table) @app.route('/v1/inventry/image', methods=['GET']) def detected_image(): """ detect images with imposed :return: result image """ detected_image = app.config['UPLOAD_PATH'] + 'sample_image.jpg' print('file exits:', str(path.exists(detected_image))) status = str(path.exists(detected_image)) if status == 'True': # as base64 string with open(detected_image, "rb") as img_file: jpeg_bin = base64.b64encode(img_file.read()) response = {'image': jpeg_bin} return jsonify(response) else: response = {'image': 'null'} print('file not exist') return jsonify(response) def allowed_videofile(filename): """ File types to upload:mp4 param: filename: """ return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_VIDEO_EXTENSIONS @app.route('/v1/monitor/video', methods=['POST']) def upload_video(): app.logger.info("Received message from ClientIP [" + request.remote_addr + "] Operation [" + request.method + "]" + " Resource [" + request.url + "]") print("videpath:" + app.config['VIDEO_PATH']) if 'file' in request.files: files = request.files.getlist("file") for file in files: if allowed_videofile(file.filename): file.save(os.path.join(app.config['VIDEO_PATH'], file.filename)) print('file path is:', app.config['VIDEO_PATH'] + file.filename) else: raise IOError('video format error') msg = {"responce": "failure"} return jsonify(msg) msg = {"responce": "success"} return jsonify(msg) def hash_func(camera_info): hash_string = camera_info["cameraNumber"] + \ camera_info["cameraLocation"] + \ camera_info["rtspUrl"] # readable_hash = hashlib.sha256(str(hash_string).encode( # 'utf-8')).hexdigest() readable_hash = hash(hash_string) if readable_hash < 0: readable_hash += sys.maxsize print(readable_hash) return readable_hash @app.route('/v1/monitor/cameras', methods=['POST']) def add_camera(): camera_detail = request.json app.logger.info("Received message from ClientIP [" + request.remote_addr + "] Operation [" + request.method + "]" + " Resource [" + request.url + "]") camera_id = hash_func(camera_detail) camera_id = str(camera_id) for camera_info in listOfCameras: if camera_id == camera_info["cameraID"]: msg = {"responce": "failure"} return jsonify(msg) break camera_info = {"cameraID": camera_id, "cameraNumber": camera_detail["cameraNumber"], "rtspUrl": camera_detail["rtspUrl"], "cameraLocation": camera_detail["cameraLocation"]} listOfCameras.append(camera_info) msg = {"responce": "success"} return jsonify(msg) @app.route('/v1/monitor/cameras/', methods=['GET']) def get_camera(cameraID): """ register camera with location """ app.logger.info("Received message from ClientIP [" + request.remote_addr + "] Operation [" + request.method + "]" + " Resource [" + request.url + "]") valid_id = 0 for camera_info in listOfCameras: # cameraID = int(cameraID) if cameraID == camera_info["cameraID"]: valid_id = 1 break if valid_id == 0: app.logger.info("camera ID is not valid") msg = {"responce": "failure"} return jsonify(msg) if "mp4" in camera_info["rtspUrl"]: video_file = VideoFile(camera_info["rtspUrl"]) video_dict = {camera_info["cameraNumber"]: video_file} listOfVideos.append(video_dict) # return Response(shelf_inventory(video_file, camera_info[ # "cameraNumber"]), # mimetype='multipart/x-mixed-replace; boundary=frame') shelf_inventory(video_file, camera_info["cameraNumber"]) app.logger.info("get_camera: Added json") msg = {"responce": "success"} return jsonify(msg) else: video_file = VideoCamera(camera_info["rtspUrl"]) video_dict = {camera_info["cameraNumber"]: video_file} listOfVideos.append(video_dict) return Response(shelf_inventory(video_file, camera_info["cameraNumber"]), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/v1/monitor/cameras/', methods=['DELETE']) def delete_camera(camera_name): app.logger.info("Received message from ClientIP [" + request.remote_addr + "] Operation [" + request.method + "]" + " Resource [" + request.url + "]") for video1 in listOfVideos: if camera_name in video1: video_obj = video1[camera_name] video_obj.delete() for camera in listOfCameras: if camera_name == camera["cameraNumber"]: listOfCameras.remove(camera) return Response("success") @app.route('/v1/monitor/cameras') def query_cameras(): app.logger.info("Received message from ClientIP [" + request.remote_addr + "] Operation [" + request.method + "]" + " Resource [" + request.url + "]") camera_info = {"roboCamera": listOfCameras} return jsonify(camera_info) @app.route('/', methods=['GET']) def hello_world(): app.logger.info("Received message from ClientIP [" + request.remote_addr + "] Operation [" + request.method + "]" + " Resource [" + request.url + "]") return Response("Hello MEC Developer") def start_server(handler): app.logger.addHandler(handler) create_database() if config.ssl_enabled: context = (config.ssl_certfilepath, config.ssl_keyfilepath) app.run(host=config.server_address, debug=True, ssl_context=context, threaded=True, port=config.server_port) else: app.run(host=config.server_address, debug=True, threaded=True, port=config.server_port)