Initial commit
[ta/config-manager.git] / cmframework / test / cmprocessor_automatic_activation_test.py
1 # Copyright 2019 Nokia
2
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import unittest
16 import mock
17 import json
18
19 from cmframework.server.cmprocessor import CMProcessor
20 from cmframework.apis.cmerror import CMError
21 from cmframework.server.cmcsn import CMCSN
22 from cmframework.server import cmchangemonitor
23
24
25 class CMProcessorAutomaticActivationTest(unittest.TestCase):
26     @staticmethod
27     def backend_get_property(key):
28         if key == 'cloud.cmframework':
29             return '{"csn": {"global": 101, "nodes": {"node-a": 99, "node-b": 100, "node-c": 101}}}'
30         elif key == 'foo':
31             return '{"foo": "bar"}'
32
33     @mock.patch('cmframework.utils.cmflagfile.os')
34     @mock.patch('cmframework.server.cmprocessor.logging')
35     def test_set_property_automatic_activation_disabled(self, mock_logging, mock_flagfile_os):
36         mock_backend = mock.MagicMock()
37         mock_backend.get_property = CMProcessorAutomaticActivationTest.backend_get_property
38
39         mock_validator = mock.MagicMock()
40         mock_activator = mock.MagicMock()
41         mock_changemonitor = mock.MagicMock()
42         mock_activationstate_handler = mock.MagicMock()
43         mock_snapshot_handler = mock.MagicMock()
44
45         mock_flagfile_os.path = mock.MagicMock()
46         mock_flagfile_os.path.exists = mock.MagicMock()
47         mock_flagfile_os.path.exists.return_value = True
48
49         processor = CMProcessor(mock_backend, mock_validator, mock_activator,
50                                 mock_changemonitor, mock_activationstate_handler,
51                                 mock_snapshot_handler)
52
53         processor.set_property('foo', 'barbar')
54
55         mock_validator.validate_set.assert_called_once_with({'foo': 'barbar'})
56         mock_backend.set_properties.called_once_with({'foo': 'barbar'})
57         mock_activator.add_work.assert_not_called()
58
59     @mock.patch('cmframework.utils.cmflagfile.os')
60     @mock.patch('cmframework.server.cmprocessor.logging')
61     def test_delete_property_automatic_activation_disabled(self, mock_logging, mock_flagfile_os):
62         mock_backend = mock.MagicMock()
63         mock_backend.get_property = CMProcessorAutomaticActivationTest.backend_get_property
64
65         mock_validator = mock.MagicMock()
66         mock_activator = mock.MagicMock()
67         mock_changemonitor = mock.MagicMock()
68         mock_activationstate_handler = mock.MagicMock()
69         mock_snapshot_handler = mock.MagicMock()
70
71         mock_flagfile_os.path = mock.MagicMock()
72         mock_flagfile_os.path.exists = mock.MagicMock()
73         mock_flagfile_os.path.exists.return_value = True
74
75         processor = CMProcessor(mock_backend, mock_validator, mock_activator,
76                                 mock_changemonitor, mock_activationstate_handler,
77                                 mock_snapshot_handler)
78
79         processor.delete_property('foo')
80
81         mock_validator.validate_delete.assert_called_once_with(['foo'])
82         mock_backend.delete_property.assert_called_once_with('foo')
83         mock_activator.add_work.assert_not_called()
84
85     @mock.patch('cmframework.utils.cmflagfile.os')
86     @mock.patch('cmframework.server.cmprocessor.logging')
87     @mock.patch('cmframework.server.cmprocessor.cmactivationwork.CMActivationWork')
88     @mock.patch.object(CMProcessor, '_clear_reboot_requests')
89     @mock.patch.object(CMCSN, 'sync_node_csn')
90     def test_activate_node_automatic_activation_disabled(self,
91                                                          mock_sync_node_csn,
92                                                          mock_clear_reboot_requests,
93                                                          mock_work,
94                                                          mock_logging,
95                                                          mock_flagfile_os):
96         mock_backend = mock.MagicMock()
97         mock_backend.get_property = CMProcessorAutomaticActivationTest.backend_get_property
98
99         mock_validator = mock.MagicMock()
100         mock_activator = mock.MagicMock()
101         mock_changemonitor = mock.MagicMock()
102         mock_activationstate_handler = mock.MagicMock()
103         mock_snapshot_handler = mock.MagicMock()
104
105         mock_work.return_value.get_result = mock.MagicMock()
106         mock_work.return_value.get_result.return_value = None
107
108         mock_flagfile_os.path = mock.MagicMock()
109         mock_flagfile_os.path.exists = mock.MagicMock()
110         mock_flagfile_os.path.exists.return_value = True
111
112         mock_work.OPER_NODE = mock.MagicMock()
113
114         processor = CMProcessor(mock_backend, mock_validator, mock_activator,
115                                 mock_changemonitor, mock_activationstate_handler,
116                                 mock_snapshot_handler)
117
118         self.assertEqual(processor.activate_node('node-b'), False)
119
120         mock_clear_reboot_requests.assert_not_called()
121         mock_work.assert_not_called()
122         mock_activator.add_work.assert_not_called()
123         mock_sync_node_csn.assert_not_called()
124
125 if __name__ == '__main__':
126     unittest.main()