Add cloudtaf framework
[ta/cloudtaf.git] / libraries / openstackcli / test_openstack.py
1 # Copyright 2019 Nokia
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 # pylint: disable=redefined-outer-name
16 import os
17 import json
18 from collections import namedtuple
19 import pytest
20 from . openstackcli import (
21     OpenStack,
22     OpenStackCliError)
23
24
25 class MockResult(namedtuple('MockResult', ['status', 'stdout', 'stderr'])):
26     def __str__(self):
27         return ', '.join([
28             '{n}: {v!r}'.format(n=n, v=v) for n, v in self._asdict().items()])
29
30
31 def get_output():
32     return _get_content('service_show_neutron_expected_output.txt')
33
34
35 def _get_content(fname):
36     with open(os.path.join(os.path.dirname(__file__), fname)) as f:
37         return f.read()
38
39
40 @pytest.mark.parametrize('return_value', [
41     MockResult(status=0, stdout=get_output(), stderr=''),
42     MockResult(status='0', stdout=get_output(), stderr='')])
43 def test_openstack_run(cliwrapper, return_value):
44     cliwrapper.set_exec_func_and_return_value(
45         cliwrapper.remotesession.execute_command_in_target,
46         return_value=return_value)
47     cliwrapper.set_expected_cmd_postfix(' -f json')
48
49     ret = cliwrapper.run_with_verify(cliwrapper.cli.run, 'service show neutron')
50
51     runner_out = return_value.stdout
52     assert json.loads(runner_out) == ret
53
54
55 def test_run_ignore_output(cliwrapper):
56     return_value = MockResult(status=0, stdout='', stderr='')
57     cliwrapper.set_exec_func_and_return_value(
58         cliwrapper.remotesession.execute_command_in_target,
59         return_value=return_value)
60
61     cliwrapper.run_with_verify(cliwrapper.cli.run_ignore_output, 'cmd')
62
63
64 def test_run_raw(cliwrapper):
65     return_value = MockResult(status=0, stdout='output', stderr='')
66     cliwrapper.set_exec_func_and_return_value(
67         cliwrapper.remotesession.execute_command_in_target,
68         return_value=return_value)
69
70     assert cliwrapper.run_with_verify(cliwrapper.cli.run_raw,
71                                       'cmd') == return_value.stdout
72
73
74 def test_openstack_run_nohup(cliwrapper):
75     runner = cliwrapper.remotesession.get_remoterunner.return_value
76     return_value = 'pid'
77     cliwrapper.set_exec_func_and_return_value(
78         runner.execute_nohup_background_in_target,
79         return_value=return_value)
80
81     assert cliwrapper.run_with_verify(cliwrapper.cli.run_nohup,
82                                       'cmd') == return_value
83
84
85 @pytest.fixture
86 def openstack(mock_remotesession):
87     n = OpenStack()
88     n.initialize(mock_remotesession)
89     return n
90
91
92 class RemoteException(Exception):
93     pass
94
95
96 def raise_remoteexception():
97     raise RemoteException('message')
98
99
100 def test_openstack_run_runner_raises(mock_remotesession,
101                                      openstack):
102
103     mock_remotesession.execute_command_in_target.side_effect = (
104         lambda cmd, target: raise_remoteexception())
105
106     with pytest.raises(OpenStackCliError) as excinfo:
107         openstack.run('cmd')
108
109     assert str(excinfo.value) == (
110         "Remote command 'openstack --os-cloud default cmd -f json' "
111         "in target 'default' failed: RemoteException: message")
112
113
114 @pytest.fixture(params=[
115     MockResult(status=1, stdout='out', stderr=''),
116     MockResult(status=0, stdout='out', stderr='err'),
117     MockResult(status='zero', stdout='out', stderr='')])
118 def bad_mock_remotesession(request,
119                            mock_remotesession):
120     mock_remotesession.execute_command_in_target.return_value = (
121         request.param)
122     return mock_remotesession
123
124
125 def test_openstack_run_result_fail(bad_mock_remotesession,
126                                    methodfmt):
127     with pytest.raises(OpenStackCliError) as excinfo:
128         methodfmt.method('cmd')
129
130     execute = bad_mock_remotesession.execute_command_in_target
131     assert str(excinfo.value) == (
132         "Remote command 'openstack --os-cloud default cmd{fmt}' "
133         "in target 'default' failed: {return_value}".format(
134             fmt=methodfmt.fmt,
135             return_value=execute.return_value))
136
137
138 def test_run_nohup_runner_raises(mock_remotesession,
139                                  openstack):
140
141     runner = mock_remotesession.get_remoterunner.return_value
142     runner.execute_nohup_background_in_target.side_effect = (
143         lambda cmd, target: raise_remoteexception())
144
145     with pytest.raises(OpenStackCliError) as excinfo:
146         openstack.run_nohup('cmd')
147
148     assert str(excinfo.value) == (
149         "Remote command 'openstack --os-cloud default cmd' "
150         "in target 'default' failed: RemoteException: message")