# Copyright 2019 Nokia # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import re def to_json(data): return json.dumps(data, sort_keys=True, indent=4) class CsvConverter(object): def __init__(self, data, preferred_field_order=None, escape_newlines=True): self.data = data self.preferred_field_order = preferred_field_order self.escape_newlines = escape_newlines self.csv_data = None self._convert() def __str__(self): return self.convert() def convert(self): return self._render(CsvFormatter(self.csv_data)) def convert_to_ms_excel(self, text_fields=None): """ CSV that Microsoft Excel can read well. :param text_fields: list of columns to mark as text NOTE: must not be used for fields that can contain comma(,) or semicolon(;) as field will be split from these :return: """ return self._render(CsvMSFormatter(self.csv_data, text_fields=text_fields)) def _convert(self): if not isinstance(self.data, list): raise Exception('Input data given is NOT a list') if not self.data: self.csv_data = [] return if not isinstance(self.data[0], dict): raise Exception('First data element is NOT a dict') headers = [] possible_fields = list(set([key for i in self.data for key in i.keys()])) if self.preferred_field_order is not None: for preferred_field in self.preferred_field_order: if preferred_field in possible_fields: headers.append(preferred_field) possible_fields.remove(preferred_field) headers += sorted(possible_fields) self.csv_data = [headers] for obj in self.data: row_data = [] for header in headers: field = obj.get(header) if isinstance(field, (list, dict)): x = json.dumps(field, sort_keys=True) elif isinstance(field, unicode): x = field.encode('utf-8') else: x = str(field) row_data.append(x) self.csv_data.append(row_data) def _render(self, formatter): return formatter.format(self.escape_newlines) class CsvFormatter(object): def __init__(self, csv_data): self.csv_data = csv_data def format(self, escape_newlines=True): f_file = [] for record in self.csv_data: f_record = [] for field in record: f_field = self._field_formatter(field, escape_newlines) f_record.append(f_field) f_file.append(','.join(self._record_formatter(f_record))) return '\r\n'.join(self._file_formatter(f_file)) @staticmethod def _file_formatter(_file): return _file @staticmethod def _record_formatter(record): return ['"{}"'.format(i) for i in record] @staticmethod def _field_formatter(field, escape_newlines): out = field.replace('"', '""') if escape_newlines: out = out.replace('\n', '\\n') return out class CsvMSFormatter(CsvFormatter): max_cell_size = 32000 def __init__(self, csv_data, text_fields=None): super(CsvMSFormatter, self).__init__(csv_data) self.text_fields = text_fields def _file_formatter(self, _file): return ['sep=,'] + super(CsvMSFormatter, self)._file_formatter(_file) def _record_formatter(self, record): record = super(CsvMSFormatter, self)._record_formatter(record) if self.text_fields: formatted_record = [] for index, field in enumerate(record): heading = self.csv_data[0][index] if heading in self.text_fields: formatted_field = '=' + field else: formatted_field = field formatted_record.append(formatted_field) record = formatted_record return record def _field_formatter(self, field, escape_newlines): field = super(CsvMSFormatter, self)._field_formatter(field, escape_newlines) if len(field) > self.max_cell_size: field = field[:self.max_cell_size / 2] + "..." + field[-self.max_cell_size / 2:] if not re.match(r'^-\d+$', field) and re.match(r'^-.*$', field): return r'\{}'.format(field) return field