3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from threading import Thread
18 class CMActivatorWorker(Thread):
19 def __init__(self, activator, index, lock, parallel=False):
20 super(CMActivatorWorker, self).__init__()
22 self.activator = activator
25 self.parallel = parallel
30 return 'worker-{}'.format(self.index)
32 def _handle_work(self, work):
33 logging.debug('%s handling work: %s', self, work)
35 for handler in self.activator.get_handlers():
37 logging.info('%s activating using %s', self, handler.__class__.__name__)
38 handler_failures = handler.activate(work)
40 logging.error('%s activation failed, error count=%s',
42 len(handler_failures))
43 failures[handler.__class__.__name__] = handler_failures
44 except Exception as exp: # pylint: disable=broad-except
45 logging.error('%s activation failed with error %s', self, str(exp))
46 failures[handler.__class__.__name__] = str(exp)
48 logging.debug('%s handled work: %s', self, work)
50 work.add_result(failures)
55 work = self.activator.get_parallel_work()
56 with self.lock.reader():
57 self._handle_work(work)
59 work = self.activator.get_work()
60 with self.lock.writer():
61 self._handle_work(work)