# -*- coding: utf-8 -*-
"""Module defining parsers. The ``ConfigParser`` is an extended
``configparser`` that can use a specification file to define types/default
values for the config files options, as well as to validate the file against.
The ``EpochParser`` is a parser which organizes options by date. An option
value is found by matching the option in the section with the latest datetime
before the given datetime."""
import collections
import configparser
import datetime
import io
import re
from typing import List, TypeVar, TextIO
import dateutil.parser
OptionValue = TypeVar(
"OptionValue", bool, float, int, str, List[bool], List[float], List[int], List[str]
)
DateValue = TypeVar("DateValue", str, datetime.datetime)
FileType = TypeVar("FileType", str, TextIO)
OptionSpec = collections.namedtuple("OptionSpec", "required type default list")
OptionSpec.__doc__ = """Specification for an option"""
TYPES = {"bool": bool, "boolean": bool, "float": float, "int": int, "str": str}
identifier_re = re.compile('[^,="]')
whitespace_re = re.compile("\s")
listtypes_re = re.compile("List\[(.*)\]")
def _parse_specline_tokens(specline: str) -> OptionSpec:
"""Generator to tokenize spec line
Parameters
----------
specline : str
spec line to tokenize
Yields
------
str
"""
identifier = ""
in_quote = False
for c in specline:
if c == '"':
if in_quote:
yield identifier
identifier = ""
in_quote = False
else:
in_quote = True
else:
if in_quote:
identifier += c
else:
if whitespace_re.match(c) and identifier == "":
continue
if identifier_re.match(c):
identifier += c
elif c in {"=", ","}:
if identifier != "":
yield identifier
identifier = ""
yield c
elif whitespace_re.match(c):
if identifier != "":
yield identifier
identifier = ""
else:
# invalid token
raise ValueError(f"invalid token {c}")
if identifier != "":
yield identifier
def _parse_list(list_expr):
list_expr = list_expr.strip()
if list_expr[0] != "[" or list_expr[-1] != "]":
raise ValueError(f'invalid list expr "{list_expr}"')
list_expr = list_expr[1:-1]
return list_expr.split(", ")
def _str2type(s: str) -> OptionValue:
"""Convert a string type specification to the actual type
Parameters
----------
s : str
string specification of a type
Returns
-------
type
"""
if s.lower() in set(TYPES):
return TYPES[s.lower()]
else:
raise ValueError(f"invalid type: {s}")
def _convert(value: str, type_value: type, is_list: bool = False) -> OptionValue:
"""Convert a value to a given type
Parameters
----------
value : str
value to convert
type_value : type
type to convert `value` to
is_list : bool
whether ``value`` is a list or just a scalar
Returns
-------
scalar or List with same type as type_value
"""
if is_list:
return [_convert(v, type_value, False) for v in _parse_list(value) if v != ""]
else:
if type_value == bool:
if value.lower() in {"yes", "true", "1"}:
return True
elif value.lower() in {"no", "false", "0"}:
return False
else:
raise ValueError(f'invalid value "{value}" for bool type')
else:
return type_value(value)
def _parse_specline(specline: str) -> OptionSpec:
"""Parse a spec line
Parameters
----------
specline : str
spec line to parse
Returns
-------
NamedTuple
fields required, type, default, and list
"""
tokens = _parse_specline_tokens(specline)
# default attributes
required = False
type_value = str
default = None
is_list = False
try:
while True:
attr_name = next(tokens)
next(tokens) # equals sign
attr_value = next(tokens)
# handle attribute
if attr_name.lower() == "required":
required = _convert(attr_value, bool, False)
elif attr_name.lower() == "type":
m = listtypes_re.match(attr_value)
if m:
type_value = _str2type(m[1])
is_list = True
else:
type_value = _str2type(attr_value)
elif attr_name.lower() == "default":
default = attr_value
else:
raise ValueError(f"invalid attribute {attr_name}")
next(tokens) # comma
except StopIteration:
pass
# convert default value to spec type
if default is not None:
default = _convert(default, type_value, is_list)
return OptionSpec(required=required, type=type_value, default=default, list=is_list)
[docs]
class ConfigParser(configparser.ConfigParser):
"""ConfigParser subclass which can verify a config file against a
specification and uses types/defaults from the specification.
"""
def __init__(self, spec_filename: str = None, **kwargs) -> None:
"""Create a ConfigParser object
spec_filename : str
file to use as a specification
"""
super().__init__(**kwargs)
self.spec_filename = spec_filename
if spec_filename is None:
self.specification = None
else:
self.specification = configparser.ConfigParser()
self.specification.read(spec_filename)
[docs]
def get(
self,
section: str,
option: str,
raw: bool = False,
use_spec: bool = True,
**kwargs,
) -> OptionValue:
"""Get an option using the type and default from the specification file
Parameters
----------
section : str
section name
option : str
option name
raw : bool
set to True to not interpolate
use_spec : bool
set to False to not use the specification
"""
if self.specification is None or use_spec is False:
return super().get(section, option, raw=raw, **kwargs)
specline = self.specification.get(section, option)
spec = _parse_specline(specline)
if not super().has_option(section, option):
return spec.default
value = super().get(section, option, raw=raw, **kwargs)
return value if raw else _convert(value, spec.type, spec.list)
def _write(self, fileobject: TextIO) -> None:
"""Write the configuration to a file-like object
Parameters
----------
fileobject : TextIO
file-like object to write to
"""
cp = self.specification if self.specification is not None else self
max_len = 0
for s in cp.sections():
for o in cp.options(s):
max_len = max(max_len, len(o))
new_line = "\n"
first_section = True
for s in cp.sections():
new_line = "" if first_section else "\n"
fileobject.write(f"{new_line}[{s}]\n")
first_section = False
for o in cp.options(s):
v = self.get(s, o)
fileobject.write(f"{o:{max_len}s} = {v}\n")
[docs]
def write(self, file: FileType, space_around_delimiters: bool = True) -> None:
"""Write config file to a file-like object
Parameters
----------
file : FileType
file-like object to write to
space_around_delimiters : bool
whether to put spaces around the delimiter, i.e., ":" or "="
"""
if isinstance(file, str):
with open(file, "w") as f:
self._write(f)
else:
self._write(file)
def __repr__(self) -> str:
"""Representation of config file"""
return f'{self.__class__.__name__}("{self.spec_filename}")'
def __str__(self) -> str:
"""Config file as a string"""
f = io.StringIO()
self._write(f)
f.seek(0)
return f.read()
[docs]
def is_valid(self, allow_extra_options: bool = False) -> bool:
"""Verify that the ``ConfigParser`` matches the specification. A
``ConfigParser`` without a spec is automatically valid.
"""
if self.specification is None:
return True
# check that every option given by f is in specification
if not allow_extra_options:
for s in self.sections():
for o in self.options(s):
if self.has_option("DEFAULT", o):
continue
if not self.specification.has_option(s, o):
print(f"section={s}, option={o}")
return False
# check that all options without a default value are given by f
for s in self.specification.sections():
for o in self.specification.options(s):
if self.specification.has_option("DEFAULT", o):
continue
specline = self.specification.get(s, o)
spec = _parse_specline(specline)
if spec.required and not self.has_option(s, o):
return False
# TODO: make sure values are the correct type?
return True
[docs]
class EpochConfigParser:
"""EpochConfigParser parses config files with dates as section name. Retrieving an
option for a given date returns the option value on the date closest, but
before, the given date.
"""
def __init__(self, spec_filename: str = None, **kwargs) -> None:
self.spec = ConfigParser(spec_filename, **kwargs)
self.config = ConfigParser(**kwargs)
self._date = None
self._formats = None
@property
def date(self):
return self._date
@date.setter
def date(self, date: DateValue):
"""
Parameters
----------
date : DateValue
date as a string or ``datetime.datetime``
"""
self._date = self._parse_datetime(date)
def _parse_datetime(self, d: DateValue) -> datetime.datetime:
if isinstance(d, datetime.datetime):
return d
else:
if self._formats is None:
return dateutil.parser.parse(d)
else:
for f in self._formats:
try:
dt = datetime.datetime.strptime(d, f)
return dt
except ValueError:
pass
@property
def formats(self):
return self._formats
@formats.setter
def formats(self, formats: List[str]):
"""
Parameters
----------
formats : List[str]
formats to use for parsing dates via ``datetime.datetime.strptime``
"""
self._formats = formats
[docs]
def read(self, files):
"""Attempt to read and parse an iterable of filenames, returning a list
of filenames which were successfully parsed.
"""
return self.config.read(files)
[docs]
def get(
self, option: str, date: DateValue = None, raw: bool = False, **kwargs
) -> OptionValue:
"""Get an option using the type and default from the specification file
Parameters
----------
option : str
option name
date : FileValue
date as a string or ``datetime.datetime``
raw : bool
set to True is disable interpolation
"""
dt = self._date if date is None else self._parse_datetime(date)
if dt is None:
raise KeyError("no date for access given")
specs = {
k: _parse_specline(v) for k, v in self.spec.specification.defaults().items()
}
epoch_names = self.config.sections()
epoch_dts = [self._parse_datetime(s) for s in epoch_names]
sorted_epoch_dts = sorted(zip(epoch_dts, epoch_names), key=lambda x: x[0])
value = specs[option].default
for e_dt, e_name in sorted_epoch_dts:
if e_dt <= dt and self.config.has_option(e_name, option):
value = _convert(
self.config.get(e_name, option),
specs[option].type,
specs[option].list,
)
return value
def _write(self, fileobject: TextIO) -> None:
"""Write the configuration to a file-like object
Parameters
----------
fileobject : TextIO
file-like object to write to
"""
self.config.write(fileobject)
[docs]
def write(self, file: FileType, space_around_delimiters: bool = True) -> None:
"""Write config file to a file-like object
Parameters
----------
file : FileType
file-like object to write to
space_around_delimiters : bool
whether to put spaces around the delimiter, i.e., ":" or "="
"""
if isinstance(file, str):
with open(file, "w") as f:
self._write(f, space_around_delimiters=space_around_delimiters)
else:
self._write(file, space_around_delimiters=space_around_delimiters)
def __repr__(self) -> str:
"""Representation of config file"""
return f'{self.__class__.__name__}("{self.spec.spec_filename}")'
def __str__(self) -> str:
"""Config file as a string"""
f = io.StringIO()
self._write(f)
f.seek(0)
return f.read()
[docs]
def is_valid(self, allow_extra_options: bool = False) -> bool:
"""Verify that the `EpochParser` matches the specification. A
`configparser` without a spec is automatically valid.
"""
if self.spec is None:
return True
# check to make sure sections are dates
for s in self.config.sections():
try:
dateutil.parser.parse(s)
except ValueError:
return False
# check options are in spec
if not allow_extra_options:
for s in self.config.sections():
for o in self.config.options(s):
if not self.spec.has_option("DEFAULT", o):
return False
return True