from abc import ABC
from typing import Generic
from typing import Iterable
from typing import TypeVar
from gavel.logic.logic import LogicElement
from gavel.logic.problem import Problem
from gavel.logic.problem import Sentence, Import
from gavel.logic.proof import Proof
import multiprocessing as mp
Parseable = TypeVar("Parseable")
Target = TypeVar("Target")
[docs]class Parser(Generic[Parseable, Target]):
[docs] def parse(self, structure: Parseable, *args, **kwargs) -> Target:
"""
Transforms the input structure into metadata as used by the
OpenEnergyPlatform
Parameters
----------
inp: str
The input string that should be parsed into OEP metadata
Returns
-------
OEPMetadata
OEP metadata represented by `inp`
"""
raise NotImplementedError
[docs]class StringBasedParser(Parser, ABC):
[docs] def load_single_from_string(self, string: str, *args, **kwargs) -> Parseable:
"""
Load a string into the structure represented by the dialect
Parameters
----------
string
Returns
-------
"""
raise NotImplementedError
[docs] def parse_single_from_string(
self,
string: str,
load_args=None,
parse_args=None,
load_kwargs=None,
parse_kwargs=None,
) -> Target:
"""
Parse a string into OEPMetadata
Parameters
----------
string
Returns
-------
"""
return self.parse(
self.load_single_from_string(
string, *(load_args or []), **(load_kwargs or {})
),
*(parse_args or []),
**(parse_kwargs or {})
)
[docs] @staticmethod
def _unpack_file(*args, **kwargs) -> Iterable[str]:
"""
Parameters
----------
Returns
-------
"""
with open(*args, **kwargs) as inp:
return inp.readlines()
def parse_from_file(self, file_path, *args, **kwargs) -> Iterable[Target]:
with mp.Pool() as pool:
for obj in pool.imap(self.parse_single_from_string, self.stream_formulas(file_path)):
yield obj
[docs] def is_valid(self, inp: str) -> bool:
"""
Verify if `inp` is a sting representation that is parsable by this
parser
Parameters
----------
inp: str
String to verify
Returns
-------
bool:
Indicated whether this object is parsable or not
"""
raise NotImplementedError
def is_file_valid(self, *args, **kwargs):
return self.is_valid(self.__unpack_file(*args, **kwargs)) #
def stream_formula_lines(self, lines: Iterable[str], **kwargs):
raise NotImplementedError
def stream_formulas(self, path, *args, **kwargs):
return self.stream_formula_lines(self._unpack_file(path))
def load_many(
self, lines: Iterable[str], *args, **kwargs
) -> Iterable[LogicElement]:
#with mp.Pool() as pool:
return map(
self.load_single_from_string, self.stream_formula_lines(lines, **kwargs)
)
[docs]class LogicParser(Parser[Parseable, LogicElement]):
pass
[docs]class ProblemParser(Parser[Parseable, Problem]):
logic_parser_cls = LogicParser
def __init__(self, *args, **kwargs):
self.logic_parser = self.logic_parser_cls(*args, **kwargs)
[docs] def parse(self, inp, *args, **kwargs):
premises = []
conjectures = []
imports = []
with mp.Pool() as pool:
for s in pool.imap(self.logic_parser.parse_single_from_string, self.logic_parser.stream_formula_lines(inp)):
if isinstance(s, Sentence):
if s.is_conjecture():
conjectures.append(s)
else:
premises.append(s)
elif isinstance(s, Import):
imports.append(s)
else:
raise ParserException("Unknown element:" + str(s))
for s in imports:
for imported_premise in self.logic_parser.parse_from_file(
s.path):
premises.append(imported_premise)
for c in conjectures:
yield Problem(premises, c)
[docs]class ParserException(Exception):
pass
[docs]class ProofParser(Parser[Parseable, Proof]):
pass