3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from cmframework.server.cmprocessor import CMProcessor
20 from cmframework.apis.cmerror import CMError
21 from cmframework.server.cmcsn import CMCSN
22 from cmframework.server import cmchangemonitor
25 class CMProcessorAutomaticActivationTest(unittest.TestCase):
27 def backend_get_property(key):
28 if key == 'cloud.cmframework':
29 return '{"csn": {"global": 101, "nodes": {"node-a": 99, "node-b": 100, "node-c": 101}}}'
31 return '{"foo": "bar"}'
33 @mock.patch('cmframework.utils.cmflagfile.os')
34 @mock.patch('cmframework.server.cmprocessor.logging')
35 def test_set_property_automatic_activation_disabled(self, mock_logging, mock_flagfile_os):
36 mock_backend = mock.MagicMock()
37 mock_backend.get_property = CMProcessorAutomaticActivationTest.backend_get_property
39 mock_validator = mock.MagicMock()
40 mock_activator = mock.MagicMock()
41 mock_changemonitor = mock.MagicMock()
42 mock_activationstate_handler = mock.MagicMock()
43 mock_snapshot_handler = mock.MagicMock()
45 mock_flagfile_os.path = mock.MagicMock()
46 mock_flagfile_os.path.exists = mock.MagicMock()
47 mock_flagfile_os.path.exists.return_value = True
49 processor = CMProcessor(mock_backend, mock_validator, mock_activator,
50 mock_changemonitor, mock_activationstate_handler,
51 mock_snapshot_handler)
53 processor.set_property('foo', 'barbar')
55 mock_validator.validate_set.assert_called_once_with({'foo': 'barbar'})
56 mock_backend.set_properties.called_once_with({'foo': 'barbar'})
57 mock_activator.add_work.assert_not_called()
59 @mock.patch('cmframework.utils.cmflagfile.os')
60 @mock.patch('cmframework.server.cmprocessor.logging')
61 def test_delete_property_automatic_activation_disabled(self, mock_logging, mock_flagfile_os):
62 mock_backend = mock.MagicMock()
63 mock_backend.get_property = CMProcessorAutomaticActivationTest.backend_get_property
65 mock_validator = mock.MagicMock()
66 mock_activator = mock.MagicMock()
67 mock_changemonitor = mock.MagicMock()
68 mock_activationstate_handler = mock.MagicMock()
69 mock_snapshot_handler = mock.MagicMock()
71 mock_flagfile_os.path = mock.MagicMock()
72 mock_flagfile_os.path.exists = mock.MagicMock()
73 mock_flagfile_os.path.exists.return_value = True
75 processor = CMProcessor(mock_backend, mock_validator, mock_activator,
76 mock_changemonitor, mock_activationstate_handler,
77 mock_snapshot_handler)
79 processor.delete_property('foo')
81 mock_validator.validate_delete.assert_called_once_with(['foo'])
82 mock_backend.delete_property.assert_called_once_with('foo')
83 mock_activator.add_work.assert_not_called()
85 @mock.patch('cmframework.utils.cmflagfile.os')
86 @mock.patch('cmframework.server.cmprocessor.logging')
87 @mock.patch('cmframework.server.cmprocessor.cmactivationwork.CMActivationWork')
88 @mock.patch.object(CMProcessor, '_clear_reboot_requests')
89 @mock.patch.object(CMCSN, 'sync_node_csn')
90 def test_activate_node_automatic_activation_disabled(self,
92 mock_clear_reboot_requests,
96 mock_backend = mock.MagicMock()
97 mock_backend.get_property = CMProcessorAutomaticActivationTest.backend_get_property
99 mock_validator = mock.MagicMock()
100 mock_activator = mock.MagicMock()
101 mock_changemonitor = mock.MagicMock()
102 mock_activationstate_handler = mock.MagicMock()
103 mock_snapshot_handler = mock.MagicMock()
105 mock_work.return_value.get_result = mock.MagicMock()
106 mock_work.return_value.get_result.return_value = None
108 mock_flagfile_os.path = mock.MagicMock()
109 mock_flagfile_os.path.exists = mock.MagicMock()
110 mock_flagfile_os.path.exists.return_value = True
112 mock_work.OPER_NODE = mock.MagicMock()
114 processor = CMProcessor(mock_backend, mock_validator, mock_activator,
115 mock_changemonitor, mock_activationstate_handler,
116 mock_snapshot_handler)
118 self.assertEqual(processor.activate_node('node-b'), False)
120 mock_clear_reboot_requests.assert_not_called()
121 mock_work.assert_not_called()
122 mock_activator.add_work.assert_not_called()
123 mock_sync_node_csn.assert_not_called()
125 if __name__ == '__main__':