diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..6beba64 --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,19 @@ +# .readthedocs.yaml +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Build documentation in the docs/ directory with Sphinx +sphinx: + configuration: docs/source/conf.py + +# Optionally build your docs in additional formats such as PDF +formats: all + +# Optionally set the version of Python and requirements required to build your docs +python: + version: 3.7 + install: + - requirements: docs/requirements.txt diff --git a/README.rst b/README.rst index 4c67048..3033913 100644 --- a/README.rst +++ b/README.rst @@ -33,7 +33,7 @@ Install Supported Python versions: -* Python >= 3.6 +* Python >= 3.6.2 * PyPy3 **Install:** diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..2ca4275 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,2 @@ +sphinx +sphinx-autodoc-typehints diff --git a/docs/source/_static/.keep b/docs/source/_static/.keep new file mode 100644 index 0000000..e69de29 diff --git a/docs/source/conf.py b/docs/source/conf.py index 382cc59..748ced0 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -7,7 +7,6 @@ import sys from pathlib import Path -from overpy import __about__ as overpy_about # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the @@ -25,6 +24,7 @@ # ones. extensions = [ 'sphinx.ext.autodoc', + 'sphinx_autodoc_typehints', 'sphinx.ext.todo', 'sphinx.ext.coverage', 'sphinx.ext.viewcode', @@ -50,10 +50,15 @@ # |version| and |release|, also used in various other places throughout the # built documents. # -# The short X.Y version. -version = overpy_about.__version__ -# The full version, including alpha/beta/rc tags. -release = overpy_about.__version__ +try: + from overpy import __about__ as overpy_about + # The short X.Y version. + version = overpy_about.__version__ + # The full version, including alpha/beta/rc tags. + release = overpy_about.__version__ +except ImportError: + version = "unknown" + release = version # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. @@ -93,6 +98,9 @@ # If true, keep warnings as "system message" paragraphs in the built documents. #keep_warnings = False +# sphinx-autodoc-typehints options +# If True, add stub documentation for undocumented parameters to be able to add type info. +always_document_param_types = True # -- Options for HTML output ---------------------------------------------- diff --git a/overpy/__init__.py b/overpy/__init__.py index 92565af..a85bb01 100644 --- a/overpy/__init__.py +++ b/overpy/__init__.py @@ -4,9 +4,11 @@ from urllib.request import urlopen from urllib.error import HTTPError from xml.sax import handler, make_parser +import xml.etree.ElementTree import json import re import time +from typing import Any, Callable, ClassVar, Dict, List, NoReturn, Optional, Tuple, Type, TypeVar, Union from overpy import exception # Ignore flake8 F401 warning for unused vars @@ -15,12 +17,14 @@ __uri__, __version__ ) +ElementTypeVar = TypeVar("ElementTypeVar", bound="Element") + XML_PARSER_DOM = 1 XML_PARSER_SAX = 2 # Try to convert some common attributes # http://wiki.openstreetmap.org/wiki/Elements#Common_attributes -GLOBAL_ATTRIBUTE_MODIFIERS = { +GLOBAL_ATTRIBUTE_MODIFIERS: Dict[str, Callable] = { "changeset": int, "timestamp": lambda ts: datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ"), "uid": int, @@ -29,14 +33,15 @@ } -def is_valid_type(element, cls): +def is_valid_type( + element: Union["Area", "Node", "Relation", "Way"], + cls: Type[Union["Area", "Element", "Node", "Relation", "Way"]]) -> bool: """ Test if an element is of a given type. - :param Element() element: The element instance to test - :param Element cls: The element class to test + :param element: The element instance to test + :param cls: The element class to test :return: False or True - :rtype: Boolean """ return isinstance(element, cls) and element.id is not None @@ -46,31 +51,28 @@ class Overpass: Class to access the Overpass API :cvar default_max_retry_count: Global max number of retries (Default: 0) + :cvar default_read_chunk_size: Max size of each chunk read from the server response :cvar default_retry_timeout: Global time to wait between tries (Default: 1.0s) + :cvar default_url: Default URL of the Overpass server """ - default_max_retry_count = 0 - default_read_chunk_size = 4096 - default_retry_timeout = 1.0 - default_url = "http://overpass-api.de/api/interpreter" + default_max_retry_count: ClassVar[int] = 0 + default_read_chunk_size: ClassVar[int] = 4096 + default_retry_timeout: ClassVar[float] = 1.0 + default_url: ClassVar[str] = "http://overpass-api.de/api/interpreter" def __init__( self, - read_chunk_size=None, - url=None, - xml_parser=XML_PARSER_SAX, - max_retry_count=None, - retry_timeout=None): + read_chunk_size: Optional[int] = None, + url: Optional[str] = None, + xml_parser: int = XML_PARSER_SAX, + max_retry_count: int = None, + retry_timeout: float = None): """ :param read_chunk_size: Max size of each chunk read from the server response - :type read_chunk_size: Integer :param url: Optional URL of the Overpass server. Defaults to http://overpass-api.de/api/interpreter - :type url: str :param xml_parser: The xml parser to use - :type xml_parser: Integer :param max_retry_count: Max number of retries (Default: default_max_retry_count) - :type max_retry_count: Integer :param retry_timeout: Time to wait between tries (Default: default_retry_timeout) - :type retry_timeout: float """ self.url = self.default_url if url is not None: @@ -93,11 +95,11 @@ def __init__( self.xml_parser = xml_parser @staticmethod - def _handle_remark_msg(msg): + def _handle_remark_msg(msg: str) -> NoReturn: """ Try to parse the message provided with the remark tag or element. - :param str msg: The message + :param msg: The message :raises overpy.exception.OverpassRuntimeError: If message starts with 'runtime error:' :raises overpy.exception.OverpassRuntimeRemark: If message starts with 'runtime remark:' :raises overpy.exception.OverpassUnknownError: If we are unable to identify the error @@ -109,20 +111,19 @@ def _handle_remark_msg(msg): raise exception.OverpassRuntimeRemark(msg=msg) raise exception.OverpassUnknownError(msg=msg) - def query(self, query): + def query(self, query: Union[bytes, str]) -> "Result": """ Query the Overpass API - :param String|Bytes query: The query string in Overpass QL + :param query: The query string in Overpass QL :return: The parsed result - :rtype: overpy.Result """ if not isinstance(query, bytes): query = query.encode("utf-8") - retry_num = 0 - retry_exceptions = [] - do_retry = True if self.max_retry_count > 0 else False + retry_num: int = 0 + retry_exceptions: List[exception.OverPyException] = [] + do_retry: bool = True if self.max_retry_count > 0 else False while retry_num <= self.max_retry_count: if retry_num > 0: time.sleep(self.retry_timeout) @@ -140,6 +141,7 @@ def query(self, query): response = response + data f.close() + current_exception: exception.OverPyException if f.code == 200: content_type = f.getheader("Content-Type") @@ -156,14 +158,14 @@ def query(self, query): continue if f.code == 400: - msgs = [] - for msg in self._regex_extract_error_msg.finditer(response): - tmp = self._regex_remove_tag.sub(b"", msg.group("msg")) + msgs: List[str] = [] + for msg_raw in self._regex_extract_error_msg.finditer(response): + msg_clean_bytes = self._regex_remove_tag.sub(b"", msg_raw.group("msg")) try: - tmp = tmp.decode("utf-8") + msg = msg_clean_bytes.decode("utf-8") except UnicodeDecodeError: - tmp = repr(tmp) - msgs.append(tmp) + msg = repr(msg_clean_bytes) + msgs.append(msg) current_exception = exception.OverpassBadRequest( query, @@ -175,14 +177,14 @@ def query(self, query): continue if f.code == 429: - current_exception = exception.OverpassTooManyRequests + current_exception = exception.OverpassTooManyRequests() if not do_retry: raise current_exception retry_exceptions.append(current_exception) continue if f.code == 504: - current_exception = exception.OverpassGatewayTimeout + current_exception = exception.OverpassGatewayTimeout() if not do_retry: raise current_exception retry_exceptions.append(current_exception) @@ -196,33 +198,28 @@ def query(self, query): raise exception.MaxRetriesReached(retry_count=retry_num, exceptions=retry_exceptions) - def parse_json(self, data, encoding="utf-8"): + def parse_json(self, data: Union[bytes, str], encoding: str = "utf-8") -> "Result": """ Parse raw response from Overpass service. :param data: Raw JSON Data - :type data: String or Bytes :param encoding: Encoding to decode byte string - :type encoding: String :return: Result object - :rtype: overpy.Result """ if isinstance(data, bytes): data = data.decode(encoding) - data = json.loads(data, parse_float=Decimal) - if "remark" in data: - self._handle_remark_msg(msg=data.get("remark")) - return Result.from_json(data, api=self) + data_parsed: dict = json.loads(data, parse_float=Decimal) + if "remark" in data_parsed: + self._handle_remark_msg(msg=data_parsed.get("remark")) + return Result.from_json(data_parsed, api=self) - def parse_xml(self, data, encoding="utf-8", parser=None): + def parse_xml(self, data: Union[bytes, str], encoding: str = "utf-8", parser: Optional[int] = None): """ :param data: Raw XML Data - :type data: String or Bytes :param encoding: Encoding to decode byte string - :type encoding: String + :param parser: The XML parser to use :return: Result object - :rtype: overpy.Result """ if parser is None: parser = self.xml_parser @@ -242,63 +239,81 @@ class Result: Class to handle the result. """ - def __init__(self, elements=None, api=None): + def __init__( + self, + elements: Optional[List[Union["Area", "Node", "Relation", "Way"]]] = None, + api: Optional[Overpass] = None): """ - :param List elements: - :param api: - :type api: overpy.Overpass + :param elements: List of elements to initialize the result with + :param api: The API object to load additional resources and elements """ if elements is None: elements = [] - self._areas = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Area)) - self._nodes = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Node)) - self._ways = OrderedDict((element.id, element) for element in elements if is_valid_type(element, Way)) - self._relations = OrderedDict((element.id, element) - for element in elements if is_valid_type(element, Relation)) - self._class_collection_map = {Node: self._nodes, Way: self._ways, Relation: self._relations, Area: self._areas} + self._areas: Dict[int, Union["Area", "Node", "Relation", "Way"]] = OrderedDict( + (element.id, element) for element in elements if is_valid_type(element, Area) + ) + self._nodes = OrderedDict( + (element.id, element) for element in elements if is_valid_type(element, Node) + ) + self._ways = OrderedDict( + (element.id, element) for element in elements if is_valid_type(element, Way) + ) + self._relations = OrderedDict( + (element.id, element) for element in elements if is_valid_type(element, Relation) + ) + self._class_collection_map: Dict[Any, Any] = { + Node: self._nodes, + Way: self._ways, + Relation: self._relations, + Area: self._areas + } self.api = api - def expand(self, other): + def expand(self, other: "Result"): """ Add all elements from an other result to the list of elements of this result object. It is used by the auto resolve feature. :param other: Expand the result with the elements from this result. - :type other: overpy.Result :raises ValueError: If provided parameter is not instance of :class:`overpy.Result` """ if not isinstance(other, Result): raise ValueError("Provided argument has to be instance of overpy:Result()") - other_collection_map = {Node: other.nodes, Way: other.ways, Relation: other.relations, Area: other.areas} + other_collection_map: Dict[Type["Element"], List[Union["Area", "Node", "Relation", "Way"]]] = { + Area: other.areas, + Node: other.nodes, + Relation: other.relations, + Way: other.ways + } for element_type, own_collection in self._class_collection_map.items(): for element in other_collection_map[element_type]: if is_valid_type(element, element_type) and element.id not in own_collection: own_collection[element.id] = element - def append(self, element): + def append(self, element: Union["Area", "Node", "Relation", "Way"]): """ Append a new element to the result. :param element: The element to append - :type element: overpy.Element """ if is_valid_type(element, Element): self._class_collection_map[element.__class__].setdefault(element.id, element) - def get_elements(self, filter_cls, elem_id=None): + def get_elements( + self, + filter_cls: Type[ElementTypeVar], + elem_id: Optional[int] = None) -> List[ElementTypeVar]: """ Get a list of elements from the result and filter the element type by a class. :param filter_cls: :param elem_id: ID of the object - :type elem_id: Integer :return: List of available elements - :rtype: List """ - result = [] + result: List[ElementTypeVar] = [] if elem_id is not None: try: result = [self._class_collection_map[filter_cls][elem_id]] @@ -309,39 +324,40 @@ def get_elements(self, filter_cls, elem_id=None): result.append(e) return result - def get_ids(self, filter_cls): + def get_ids( + self, + filter_cls: Type[Union["Area", "Node", "Relation", "Way"]]) -> List[int]: """ + Get all Element IDs - :param filter_cls: - :return: + :param filter_cls: Only IDs of elements with this type + :return: List of IDs """ return list(self._class_collection_map[filter_cls].keys()) - def get_node_ids(self): + def get_node_ids(self) -> List[int]: return self.get_ids(filter_cls=Node) - def get_way_ids(self): + def get_way_ids(self) -> List[int]: return self.get_ids(filter_cls=Way) - def get_relation_ids(self): + def get_relation_ids(self) -> List[int]: return self.get_ids(filter_cls=Relation) - def get_area_ids(self): + def get_area_ids(self) -> List[int]: return self.get_ids(filter_cls=Area) @classmethod - def from_json(cls, data, api=None): + def from_json(cls, data: dict, api: Optional[Overpass] = None) -> "Result": """ Create a new instance and load data from json object. :param data: JSON data returned by the Overpass API - :type data: Dict :param api: - :type api: overpy.Overpass :return: New instance of Result object - :rtype: overpy.Result """ result = cls(api=api) + elem_cls: Type[Union["Area", "Node", "Relation", "Way"]] for elem_cls in [Node, Way, Relation, Area]: for element in data.get("elements", []): e_type = element.get("type") @@ -351,7 +367,11 @@ def from_json(cls, data, api=None): return result @classmethod - def from_xml(cls, data, api=None, parser=None): + def from_xml( + cls, + data: Union[str, xml.etree.ElementTree.Element], + api: Optional[Overpass] = None, + parser: Optional[int] = None) -> "Result": """ Create a new instance and load data from xml data or object. @@ -361,13 +381,9 @@ def from_xml(cls, data, api=None, parser=None): The parser is set to DOM if an xml.etree.ElementTree.Element is provided as data value. :param data: Root element - :type data: str | xml.etree.ElementTree.Element :param api: The instance to query additional information if required. - :type api: Overpass :param parser: Specify the parser to use(DOM or SAX)(Default: None = autodetect, defaults to SAX) - :type parser: Integer | None :return: New instance of Result object - :rtype: Result """ if parser is None: if isinstance(data, str): @@ -385,6 +401,7 @@ def from_xml(cls, data, api=None, parser=None): else: raise exception.OverPyException("Unable to detect data type.") + elem_cls: Type[Union["Area", "Node", "Relation", "Way"]] for elem_cls in [Node, Way, Relation, Area]: for child in root: if child.tag.lower() == elem_cls._type_value: @@ -392,25 +409,25 @@ def from_xml(cls, data, api=None, parser=None): elif parser == XML_PARSER_SAX: from io import StringIO + if not isinstance(data, str): + raise ValueError("data must be of type str if using the SAX parser") source = StringIO(data) sax_handler = OSMSAXHandler(result) - parser = make_parser() - parser.setContentHandler(sax_handler) - parser.parse(source) + sax_parser = make_parser() + sax_parser.setContentHandler(sax_handler) + sax_parser.parse(source) else: # ToDo: better exception raise Exception("Unknown XML parser") return result - def get_area(self, area_id, resolve_missing=False): + def get_area(self, area_id: int, resolve_missing: bool = False) -> "Area": """ Get an area by its ID. :param area_id: The area ID - :type area_id: Integer :param resolve_missing: Query the Overpass API if the area is missing in the result set. :return: The area - :rtype: overpy.Area :raises overpy.exception.DataIncomplete: The requested way is not available in the result cache. :raises overpy.exception.DataIncomplete: If resolve_missing is True and the area can't be resolved. """ @@ -437,25 +454,22 @@ def get_area(self, area_id, resolve_missing=False): return areas[0] - def get_areas(self, area_id=None, **kwargs): + def get_areas(self, area_id: Optional[int] = None) -> List["Area"]: """ Alias for get_elements() but filter the result by Area :param area_id: The Id of the area - :type area_id: Integer :return: List of elements """ - return self.get_elements(Area, elem_id=area_id, **kwargs) + return self.get_elements(Area, elem_id=area_id) - def get_node(self, node_id, resolve_missing=False): + def get_node(self, node_id: int, resolve_missing: bool = False) -> "Node": """ Get a node by its ID. :param node_id: The node ID - :type node_id: Integer :param resolve_missing: Query the Overpass API if the node is missing in the result set. :return: The node - :rtype: overpy.Node :raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache. :raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved. """ @@ -482,7 +496,7 @@ def get_node(self, node_id, resolve_missing=False): return nodes[0] - def get_nodes(self, node_id=None, **kwargs): + def get_nodes(self, node_id: Optional[int] = None) -> List["Node"]: """ Alias for get_elements() but filter the result by Node() @@ -490,17 +504,15 @@ def get_nodes(self, node_id=None, **kwargs): :type node_id: Integer :return: List of elements """ - return self.get_elements(Node, elem_id=node_id, **kwargs) + return self.get_elements(Node, elem_id=node_id) - def get_relation(self, rel_id, resolve_missing=False): + def get_relation(self, rel_id: int, resolve_missing: bool = False) -> "Relation": """ Get a relation by its ID. :param rel_id: The relation ID - :type rel_id: Integer :param resolve_missing: Query the Overpass API if the relation is missing in the result set. :return: The relation - :rtype: overpy.Relation :raises overpy.exception.DataIncomplete: The requested relation is not available in the result cache. :raises overpy.exception.DataIncomplete: If resolve_missing is True and the relation can't be resolved. """ @@ -527,25 +539,22 @@ def get_relation(self, rel_id, resolve_missing=False): return relations[0] - def get_relations(self, rel_id=None, **kwargs): + def get_relations(self, rel_id: int = None) -> List["Relation"]: """ Alias for get_elements() but filter the result by Relation :param rel_id: Id of the relation - :type rel_id: Integer :return: List of elements """ - return self.get_elements(Relation, elem_id=rel_id, **kwargs) + return self.get_elements(Relation, elem_id=rel_id) - def get_way(self, way_id, resolve_missing=False): + def get_way(self, way_id: int, resolve_missing: bool = False) -> "Way": """ Get a way by its ID. :param way_id: The way ID - :type way_id: Integer :param resolve_missing: Query the Overpass API if the way is missing in the result set. :return: The way - :rtype: overpy.Way :raises overpy.exception.DataIncomplete: The requested way is not available in the result cache. :raises overpy.exception.DataIncomplete: If resolve_missing is True and the way can't be resolved. """ @@ -572,15 +581,14 @@ def get_way(self, way_id, resolve_missing=False): return ways[0] - def get_ways(self, way_id=None, **kwargs): + def get_ways(self, way_id: Optional[int] = None) -> List["Way"]: """ Alias for get_elements() but filter the result by Way :param way_id: The Id of the way - :type way_id: Integer :return: List of elements """ - return self.get_elements(Way, elem_id=way_id, **kwargs) + return self.get_elements(Way, elem_id=way_id) area_ids = property(get_area_ids) areas = property(get_areas) @@ -597,33 +605,32 @@ class Element: Base element """ - def __init__(self, attributes=None, result=None, tags=None): + _type_value: str + + def __init__(self, attributes: Optional[dict] = None, result: Optional[Result] = None, tags: Optional[Dict] = None): """ :param attributes: Additional attributes - :type attributes: Dict :param result: The result object this element belongs to :param tags: List of tags - :type tags: Dict """ self._result = result self.attributes = attributes # ToDo: Add option to modify attribute modifiers - attribute_modifiers = dict(GLOBAL_ATTRIBUTE_MODIFIERS.items()) + attribute_modifiers: Dict[str, Callable] = dict(GLOBAL_ATTRIBUTE_MODIFIERS.items()) for n, m in attribute_modifiers.items(): if n in self.attributes: self.attributes[n] = m(self.attributes[n]) - self.id = None + self.id: int self.tags = tags @classmethod - def get_center_from_json(cls, data): + def get_center_from_json(cls, data: dict) -> Tuple[Decimal, Decimal]: """ Get center information from json data :param data: json data :return: tuple with two elements: lat and lon - :rtype: tuple """ center_lat = None center_lon = None @@ -638,15 +645,35 @@ def get_center_from_json(cls, data): return center_lat, center_lon @classmethod - def get_center_from_xml_dom(cls, sub_child): - center_lat = sub_child.attrib.get("lat") - center_lon = sub_child.attrib.get("lon") - if center_lat is None or center_lon is None: + def get_center_from_xml_dom(cls, sub_child: xml.etree.ElementTree.Element) -> Tuple[Decimal, Decimal]: + center_lat_str: str = sub_child.attrib.get("lat") + center_lon_str: str = sub_child.attrib.get("lon") + if center_lat_str is None or center_lon_str is None: raise ValueError("Unable to get lat or lon of way center.") - center_lat = Decimal(center_lat) - center_lon = Decimal(center_lon) + center_lat = Decimal(center_lat_str) + center_lon = Decimal(center_lon_str) return center_lat, center_lon + @classmethod + def from_json(cls: Type[ElementTypeVar], data: dict, result: Optional[Result] = None) -> ElementTypeVar: + """ + Create new Element() from json data + :param data: + :param result: + :return: + """ + raise NotImplementedError + + @classmethod + def from_xml( + cls: Type[ElementTypeVar], + child: xml.etree.ElementTree.Element, + result: Optional[Result] = None) -> ElementTypeVar: + """ + Create new Element() element from XML data + """ + raise NotImplementedError + class Area(Element): """ @@ -655,32 +682,27 @@ class Area(Element): _type_value = "area" - def __init__(self, area_id=None, **kwargs): + def __init__(self, area_id: Optional[int] = None, **kwargs): """ :param area_id: Id of the area element - :type area_id: Integer :param kwargs: Additional arguments are passed directly to the parent class - """ Element.__init__(self, **kwargs) #: The id of the way self.id = area_id - def __repr__(self): + def __repr__(self) -> str: return f"" @classmethod - def from_json(cls, data, result=None): + def from_json(cls, data: dict, result: Optional[Result] = None) -> "Area": """ Create new Area element from JSON data :param data: Element data from JSON - :type data: Dict :param result: The result this element belongs to - :type result: overpy.Result :return: New instance of Way - :rtype: overpy.Area :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. """ if data.get("type") != cls._type_value: @@ -703,16 +725,13 @@ def from_json(cls, data, result=None): return cls(area_id=area_id, attributes=attributes, tags=tags, result=result) @classmethod - def from_xml(cls, child, result=None): + def from_xml(cls, child: xml.etree.ElementTree.Element, result: Optional[Result] = None) -> "Area": """ Create new way element from XML data :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element :param result: The result this node belongs to - :type result: overpy.Result :return: New Way oject - :rtype: overpy.Way :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match :raises ValueError: If the ref attribute of the xml node is not provided :raises ValueError: If a tag doesn't have a name @@ -733,9 +752,10 @@ def from_xml(cls, child, result=None): value = sub_child.attrib.get("v") tags[name] = value - area_id = child.attrib.get("id") - if area_id is not None: - area_id = int(area_id) + area_id_str: Optional[str] = child.attrib.get("id") + area_id: Optional[int] = None + if area_id_str is not None: + area_id = int(area_id_str) attributes = {} ignore = ["id"] @@ -754,14 +774,16 @@ class Node(Element): _type_value = "node" - def __init__(self, node_id=None, lat=None, lon=None, **kwargs): + def __init__( + self, + node_id: Optional[int] = None, + lat: Optional[Union[Decimal, float]] = None, + lon: Optional[Union[Decimal, float]] = None, + **kwargs): """ :param lat: Latitude - :type lat: Decimal or Float :param lon: Longitude - :type long: Decimal or Float :param node_id: Id of the node element - :type node_id: Integer :param kwargs: Additional arguments are passed directly to the parent class """ @@ -770,20 +792,17 @@ def __init__(self, node_id=None, lat=None, lon=None, **kwargs): self.lat = lat self.lon = lon - def __repr__(self): + def __repr__(self) -> str: return f"" @classmethod - def from_json(cls, data, result=None): + def from_json(cls, data: dict, result: Optional[Result] = None) -> "Node": """ Create new Node element from JSON data :param data: Element data from JSON - :type data: Dict :param result: The result this element belongs to - :type result: overpy.Result :return: New instance of Node - :rtype: overpy.Node :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. """ if data.get("type") != cls._type_value: @@ -808,16 +827,13 @@ def from_json(cls, data, result=None): return cls(node_id=node_id, lat=lat, lon=lon, tags=tags, attributes=attributes, result=result) @classmethod - def from_xml(cls, child, result=None): + def from_xml(cls, child: xml.etree.ElementTree.Element, result: Optional[Result] = None) -> "Node": """ Create new way element from XML data :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element :param result: The result this node belongs to - :type result: overpy.Result :return: New Way oject - :rtype: overpy.Node :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match :raises ValueError: If a tag doesn't have a name """ @@ -837,15 +853,20 @@ def from_xml(cls, child, result=None): value = sub_child.attrib.get("v") tags[name] = value - node_id = child.attrib.get("id") - if node_id is not None: - node_id = int(node_id) - lat = child.attrib.get("lat") - if lat is not None: - lat = Decimal(lat) - lon = child.attrib.get("lon") - if lon is not None: - lon = Decimal(lon) + node_id: Optional[int] = None + node_id_str: Optional[str] = child.attrib.get("id") + if node_id_str is not None: + node_id = int(node_id_str) + + lat: Optional[Decimal] = None + lat_str: Optional[str] = child.attrib.get("lat") + if lat_str is not None: + lat = Decimal(lat_str) + + lon: Optional[Decimal] = None + lon_str: Optional[str] = child.attrib.get("lon") + if lon_str is not None: + lon = Decimal(lon_str) attributes = {} ignore = ["id", "lat", "lon"] @@ -864,14 +885,17 @@ class Way(Element): _type_value = "way" - def __init__(self, way_id=None, center_lat=None, center_lon=None, node_ids=None, **kwargs): + def __init__( + self, + way_id: Optional[int] = None, + center_lat: Optional[Union[Decimal, float]] = None, + center_lon: Optional[Union[Decimal, float]] = None, + node_ids: Optional[Union[List[int], Tuple[int]]] = None, + **kwargs): """ :param node_ids: List of node IDs - :type node_ids: List or Tuple :param way_id: Id of the way element - :type way_id: Integer :param kwargs: Additional arguments are passed directly to the parent class - """ Element.__init__(self, **kwargs) @@ -889,20 +913,18 @@ def __repr__(self): return f"" @property - def nodes(self): + def nodes(self) -> List[Node]: """ List of nodes associated with the way. """ return self.get_nodes() - def get_nodes(self, resolve_missing=False): + def get_nodes(self, resolve_missing: bool = False) -> List[Node]: """ Get the nodes defining the geometry of the way :param resolve_missing: Try to resolve missing nodes. - :type resolve_missing: Boolean :return: List of nodes - :rtype: List of overpy.Node :raises overpy.exception.DataIncomplete: At least one referenced node is not available in the result cache. :raises overpy.exception.DataIncomplete: If resolve_missing is True and at least one node can't be resolved. """ @@ -952,16 +974,13 @@ def get_nodes(self, resolve_missing=False): return result @classmethod - def from_json(cls, data, result=None): + def from_json(cls, data: dict, result: Optional[Result] = None) -> "Way": """ Create new Way element from JSON data :param data: Element data from JSON - :type data: Dict :param result: The result this element belongs to - :type result: overpy.Result :return: New instance of Way - :rtype: overpy.Way :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. """ if data.get("type") != cls._type_value: @@ -994,16 +1013,13 @@ def from_json(cls, data, result=None): ) @classmethod - def from_xml(cls, child, result=None): + def from_xml(cls, child: xml.etree.ElementTree.Element, result: Optional[Result] = None) -> "Way": """ Create new way element from XML data :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element :param result: The result this node belongs to - :type result: overpy.Result :return: New Way oject - :rtype: overpy.Way :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match :raises ValueError: If the ref attribute of the xml node is not provided :raises ValueError: If a tag doesn't have a name @@ -1027,17 +1043,18 @@ def from_xml(cls, child, result=None): value = sub_child.attrib.get("v") tags[name] = value if sub_child.tag.lower() == "nd": - ref_id = sub_child.attrib.get("ref") - if ref_id is None: + ref_id_str = sub_child.attrib.get("ref") + if ref_id_str is None: raise ValueError("Unable to find required ref value.") - ref_id = int(ref_id) + ref_id: int = int(ref_id_str) node_ids.append(ref_id) if sub_child.tag.lower() == "center": (center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child) - way_id = child.attrib.get("id") - if way_id is not None: - way_id = int(way_id) + way_id: Optional[int] = None + way_id_str: Optional[str] = child.attrib.get("id") + if way_id_str is not None: + way_id = int(way_id_str) attributes = {} ignore = ["id"] @@ -1057,11 +1074,16 @@ class Relation(Element): _type_value = "relation" - def __init__(self, rel_id=None, center_lat=None, center_lon=None, members=None, **kwargs): + def __init__( + self, + rel_id: Optional[int] = None, + center_lat: Optional[Union[Decimal, float]] = None, + center_lon: Optional[Union[Decimal, float]] = None, + members: Optional[List["RelationMember"]] = None, + **kwargs): """ :param members: :param rel_id: Id of the relation element - :type rel_id: Integer :param kwargs: :return: """ @@ -1078,16 +1100,13 @@ def __repr__(self): return f"" @classmethod - def from_json(cls, data, result=None): + def from_json(cls, data: dict, result: Optional[Result] = None) -> "Relation": """ Create new Relation element from JSON data :param data: Element data from JSON - :type data: Dict :param result: The result this element belongs to - :type result: overpy.Result :return: New instance of Relation - :rtype: overpy.Relation :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. """ if data.get("type") != cls._type_value: @@ -1133,16 +1152,13 @@ def from_json(cls, data, result=None): ) @classmethod - def from_xml(cls, child, result=None): + def from_xml(cls, child: xml.etree.ElementTree.Element, result: Optional[Result] = None) -> "Relation": """ Create new way element from XML data :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element :param result: The result this node belongs to - :type result: overpy.Result :return: New Way oject - :rtype: overpy.Relation :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match :raises ValueError: If a tag doesn't have a name """ @@ -1178,9 +1194,10 @@ def from_xml(cls, child, result=None): if sub_child.tag.lower() == "center": (center_lat, center_lon) = cls.get_center_from_xml_dom(sub_child=sub_child) - rel_id = child.attrib.get("id") - if rel_id is not None: - rel_id = int(rel_id) + rel_id: Optional[int] = None + rel_id_str: Optional[str] = child.attrib.get("id") + if rel_id_str is not None: + rel_id = int(rel_id_str) attributes = {} ignore = ["id"] @@ -1204,8 +1221,15 @@ class RelationMember: """ Base class to represent a member of a relation. """ + _type_value: Optional[str] = None - def __init__(self, attributes=None, geometry=None, ref=None, role=None, result=None): + def __init__( + self, + attributes: Optional[dict] = None, + geometry: Optional[List["RelationWayGeometryValue"]] = None, + ref: Optional[int] = None, + role: Optional[str] = None, + result: Optional[Result] = None): """ :param ref: Reference Id :type ref: Integer @@ -1220,16 +1244,13 @@ def __init__(self, attributes=None, geometry=None, ref=None, role=None, result=N self.geometry = geometry @classmethod - def from_json(cls, data, result=None): + def from_json(cls, data: dict, result: Optional[Result] = None) -> "RelationMember": """ Create new RelationMember element from JSON data - :param child: Element data from JSON - :type child: Dict + :param data: Element data from JSON :param result: The result this element belongs to - :type result: overpy.Result :return: New instance of RelationMember - :rtype: overpy.RelationMember :raises overpy.exception.ElementDataWrongType: If type value of the passed JSON data does not match. """ if data.get("type") != cls._type_value: @@ -1271,16 +1292,16 @@ def from_json(cls, data, result=None): ) @classmethod - def from_xml(cls, child, result=None): + def from_xml( + cls, + child: xml.etree.ElementTree.Element, + result: Optional[Result] = None) -> "RelationMember": """ Create new RelationMember from XML data :param child: XML node to be parsed - :type child: xml.etree.ElementTree.Element :param result: The result this element belongs to - :type result: overpy.Result :return: New relation member oject - :rtype: overpy.RelationMember :raises overpy.exception.ElementDataWrongType: If name of the xml child node doesn't match """ if child.attrib.get("type") != cls._type_value: @@ -1289,10 +1310,12 @@ def from_xml(cls, child, result=None): type_provided=child.tag.lower() ) - ref = child.attrib.get("ref") - if ref is not None: - ref = int(ref) - role = child.attrib.get("role") + ref: Optional[int] = None + ref_str: Optional[str] = child.attrib.get("ref") + if ref_str is not None: + ref = int(ref_str) + + role: Optional[str] = child.attrib.get("role") attributes = {} ignore = ["geometry", "ref", "role", "type"] @@ -1325,7 +1348,7 @@ def from_xml(cls, child, result=None): class RelationNode(RelationMember): _type_value = "node" - def resolve(self, resolve_missing=False): + def resolve(self, resolve_missing: bool = False) -> Node: return self._result.get_node(self.ref, resolve_missing=resolve_missing) def __repr__(self): @@ -1335,7 +1358,7 @@ def __repr__(self): class RelationWay(RelationMember): _type_value = "way" - def resolve(self, resolve_missing=False): + def resolve(self, resolve_missing: bool = False) -> Way: return self._result.get_way(self.ref, resolve_missing=resolve_missing) def __repr__(self): @@ -1343,7 +1366,7 @@ def __repr__(self): class RelationWayGeometryValue: - def __init__(self, lat, lon): + def __init__(self, lat: Union[Decimal, float], lon: Union[Decimal, float]): self.lat = lat self.lon = lon @@ -1354,7 +1377,7 @@ def __repr__(self): class RelationRelation(RelationMember): _type_value = "relation" - def resolve(self, resolve_missing=False): + def resolve(self, resolve_missing: bool = False) -> Relation: return self._result.get_relation(self.ref, resolve_missing=resolve_missing) def __repr__(self): @@ -1364,7 +1387,7 @@ def __repr__(self): class RelationArea(RelationMember): _type_value = "area" - def resolve(self, resolve_missing=False): + def resolve(self, resolve_missing: bool = False) -> Area: return self._result.get_area(self.ref, resolve_missing=resolve_missing) def __repr__(self): @@ -1376,29 +1399,26 @@ class OSMSAXHandler(handler.ContentHandler): SAX parser for Overpass XML response. """ #: Tuple of opening elements to ignore - ignore_start = ('osm', 'meta', 'note', 'bounds', 'remark') + ignore_start: ClassVar = ('osm', 'meta', 'note', 'bounds', 'remark') #: Tuple of closing elements to ignore - ignore_end = ('osm', 'meta', 'note', 'bounds', 'remark', 'tag', 'nd', 'center') + ignore_end: ClassVar = ('osm', 'meta', 'note', 'bounds', 'remark', 'tag', 'nd', 'center') - def __init__(self, result): + def __init__(self, result: Result): """ :param result: Append results to this result set. - :type result: overpy.Result """ handler.ContentHandler.__init__(self) self._result = result - self._curr = {} + self._curr: Dict[str, Any] = {} #: Current relation member object - self.cur_relation_member = None + self.cur_relation_member: Optional[RelationMember] = None - def startElement(self, name, attrs): + def startElement(self, name: str, attrs: dict): """ Handle opening elements. :param name: Name of the element - :type name: String :param attrs: Attributes of the element - :type attrs: Dict """ if name in self.ignore_start: return @@ -1408,12 +1428,11 @@ def startElement(self, name, attrs): raise KeyError("Unknown element start '%s'" % name) handler(attrs) - def endElement(self, name): + def endElement(self, name: str): """ Handle closing elements :param name: Name of the element - :type name: String """ if name in self.ignore_end: return @@ -1423,7 +1442,7 @@ def endElement(self, name): raise KeyError("Unknown element end '%s'" % name) handler() - def _handle_start_center(self, attrs): + def _handle_start_center(self, attrs: dict): """ Handle opening center element @@ -1437,12 +1456,11 @@ def _handle_start_center(self, attrs): self._curr["center_lat"] = Decimal(center_lat) self._curr["center_lon"] = Decimal(center_lon) - def _handle_start_tag(self, attrs): + def _handle_start_tag(self, attrs: dict): """ Handle opening tag element :param attrs: Attributes of the element - :type attrs: Dict """ try: tag_key = attrs['k'] @@ -1450,12 +1468,11 @@ def _handle_start_tag(self, attrs): raise ValueError("Tag without name/key.") self._curr['tags'][tag_key] = attrs.get('v') - def _handle_start_node(self, attrs): + def _handle_start_node(self, attrs: dict): """ Handle opening node element :param attrs: Attributes of the element - :type attrs: Dict """ self._curr = { 'attributes': dict(attrs), @@ -1481,12 +1498,11 @@ def _handle_end_node(self): self._result.append(Node(result=self._result, **self._curr)) self._curr = {} - def _handle_start_way(self, attrs): + def _handle_start_way(self, attrs: dict): """ Handle opening way element :param attrs: Attributes of the element - :type attrs: Dict """ self._curr = { 'center_lat': None, @@ -1507,12 +1523,11 @@ def _handle_end_way(self): self._result.append(Way(result=self._result, **self._curr)) self._curr = {} - def _handle_start_area(self, attrs): + def _handle_start_area(self, attrs: dict): """ Handle opening area element :param attrs: Attributes of the element - :type attrs: Dict """ self._curr = { 'attributes': dict(attrs), @@ -1530,12 +1545,11 @@ def _handle_end_area(self): self._result.append(Area(result=self._result, **self._curr)) self._curr = {} - def _handle_start_nd(self, attrs): + def _handle_start_nd(self, attrs: dict): """ Handle opening nd element :param attrs: Attributes of the element - :type attrs: Dict """ if isinstance(self.cur_relation_member, RelationWay): if self.cur_relation_member.geometry is None: @@ -1553,12 +1567,11 @@ def _handle_start_nd(self, attrs): raise ValueError("Unable to find required ref value.") self._curr['node_ids'].append(int(node_ref)) - def _handle_start_relation(self, attrs): + def _handle_start_relation(self, attrs: dict): """ Handle opening relation element :param attrs: Attributes of the element - :type attrs: Dict """ self._curr = { 'attributes': dict(attrs), @@ -1577,15 +1590,14 @@ def _handle_end_relation(self): self._result.append(Relation(result=self._result, **self._curr)) self._curr = {} - def _handle_start_member(self, attrs): + def _handle_start_member(self, attrs: dict): """ Handle opening member element :param attrs: Attributes of the element - :type attrs: Dict """ - params = { + params: Dict[str, Any] = { # ToDo: Parse attributes 'attributes': {}, 'ref': None, @@ -1603,7 +1615,7 @@ def _handle_start_member(self, attrs): "relation": RelationRelation, "way": RelationWay } - cls = cls_map.get(attrs["type"]) + cls: Type[RelationMember] = cls_map.get(attrs["type"]) if cls is None: raise ValueError("Undefined type for member: '%s'" % attrs['type']) diff --git a/overpy/exception.py b/overpy/exception.py index d22d490..c7b4ffa 100644 --- a/overpy/exception.py +++ b/overpy/exception.py @@ -30,7 +30,7 @@ def __init__(self, type_expected, type_provided=None): self.type_expected = type_expected self.type_provided = type_provided - def __str__(self): + def __str__(self) -> str: return "Type expected '{}' but '{}' provided".format( self.type_expected, str(self.type_provided) @@ -45,7 +45,7 @@ def __init__(self, retry_count, exceptions): self.exceptions = exceptions self.retry_count = retry_count - def __str__(self): + def __str__(self) -> str: return "Unable get any result from the Overpass API server after %d retries." % self.retry_count @@ -64,7 +64,7 @@ def __init__(self, query, msgs=None): msgs = [] self.msgs = msgs - def __str__(self): + def __str__(self) -> str: tmp_msgs = [] for tmp_msg in self.msgs: if not isinstance(tmp_msg, str): @@ -89,7 +89,7 @@ def __init__(self, msg=None): #: The message from the remark tag or element self.msg = msg - def __str__(self): + def __str__(self) -> str: if self.msg is None: return "No error message provided" if not isinstance(self.msg, str): @@ -139,7 +139,7 @@ class OverpassUnknownContentType(OverPyException): def __init__(self, content_type): self.content_type = content_type - def __str__(self): + def __str__(self) -> str: if self.content_type is None: return "No content type returned" return "Unknown content type: %s" % self.content_type @@ -162,5 +162,5 @@ class OverpassUnknownHTTPStatusCode(OverPyException): def __init__(self, code): self.code = code - def __str__(self): + def __str__(self) -> str: return "Unknown/Unhandled status code: %d" % self.code diff --git a/overpy/helper.py b/overpy/helper.py index e3ac017..28a6a2c 100644 --- a/overpy/helper.py +++ b/overpy/helper.py @@ -1,15 +1,19 @@ +from typing import List, Optional __author__ = 'mjob' import overpy -def get_street(street, areacode, api=None): +def get_street( + street: str, + areacode: str, + api: Optional[overpy.Overpass] = None) -> overpy.Result: """ Retrieve streets in a given bounding area - :param overpy.Overpass api: First street of intersection - :param String street: Name of street - :param String areacode: The OSM id of the bounding area + :param street: Name of street + :param areacode: The OSM id of the bounding area + :param api: API object to fetch missing elements :return: Parsed result :raises overpy.exception.OverPyException: If something bad happens. """ @@ -35,14 +39,18 @@ def get_street(street, areacode, api=None): return data -def get_intersection(street1, street2, areacode, api=None): +def get_intersection( + street1: str, + street2: str, + areacode: str, + api: Optional[overpy.Overpass] = None) -> List[overpy.Node]: """ Retrieve intersection of two streets in a given bounding area - :param overpy.Overpass api: First street of intersection - :param String street1: Name of first street of intersection - :param String street2: Name of second street of intersection - :param String areacode: The OSM id of the bounding area + :param street1: Name of first street of intersection + :param street2: Name of second street of intersection + :param areacode: The OSM id of the bounding area + :param api: API object to fetch missing elements :return: List of intersections :raises overpy.exception.OverPyException: If something bad happens. """