-
Notifications
You must be signed in to change notification settings - Fork 227
Error reading nested tags in Open StreetMap XML-Files #190
Description
Hello,
I try to parse Open Streetmap XML-Files using the latest spark-xml and pyspark (scala 2.11.8., spark 2.0.1 with jupyter notebooks as well with databricks community).
The xml data can be fetched using:
wget -O winterthur_map.osm "http://overpass-api.de/api/map?bbox=8.6432,47.4579,8.8210,47.5442"
Extracting the way-tags and realtion-tags works fine, but extracting the nodes throws the following error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-15-fcee35e58e1c> in <module>()
6
7 df = sqlContext.read.format('com.databricks.spark.xml') .options(rowTag='node', mode='FAILFAST') .schema(my_schema) .load(MAP_DATA_SET_WINTERTHUR)
----> 8 df.show(30)
/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/sql/dataframe.py in show(self, n, truncate)
285 +---+-----+
286 """
--> 287 print(self._jdf.showString(n, truncate))
288
289 def __repr__(self):
/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o175.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 1 times, most recent failure: Lost task 0.0 in stage 13.0 (TID 23, localhost): java.lang.RuntimeException: Malformed line in FAILFAST mode:
<node id="15998875" lat="47.5296917" lon="8.8199895" version="10" timestamp="2014-05-17T02:31:10Z" changeset="22381881" uid="178186" user="mdk"/> <node id="15998878" lat="47.5302420" lon="8.8158363" version="13" timestamp="2014-06-10T19:13:01Z" changeset="22856880" uid="266672" user="tf66"/> <node id="15998881" lat="47.5308430" lon="8.8112575" version="15" timestamp="2014-06-10T19:13:01Z" changeset="22856880" uid="266672" user="tf66"/> <node id="15998885" lat="47.5291517" lon="8.8246617" version="10" timestamp="2008-11-10T22:47:33Z" changeset="730754" uid="44456" user="twalter"/> <node id="15998916" lat="47.5315221" lon="8.8060980" version="5" timestamp="2014-06-10T19:13:01Z" changeset="22856880" uid="266672" user="tf66"/> <node id="15998949" lat="47.5288157" lon="8.8318368" version="10" timestamp="2014-05-17T02:31:10Z" changeset="22381881" uid="178186" user="mdk"/> <node id="15998977" lat="47.4577472" lon="8.7085158" version="8" timestamp="2014-09-23T23:31:01Z" changeset="25635449" uid="7412" user="mjulius"/> <node id="15998991" lat="47.5288778" lon="8.8282872" version="11" timestamp="2013-05-12T17:13:27Z" changeset="16099021" uid="390170" user="chewiebug"/> <node id="15999625" lat="47.5291589" lon="8.8375096" version="11" timestamp="2014-05-17T02:31:10Z" changeset="22381881" uid="178186" user="mdk"/> <node id="15999629" lat="47.5265460" lon="8.8810894" version="4" timestamp="2014-04-29T16:17:32Z" changeset="22026107" uid="1881716" user="Leonardo Hauschild"/> <node id="15999636" lat="47.5250933" lon="8.8860317" version="3" timestamp="2008-09-03T11:53:55Z" changeset="512858" uid="7260" user="studerap"> <tag k="created_by" v="JOSM"/> </node> at com.databricks.spark.xml.parsers.StaxXmlParser$.com$databricks$spark$xml$parsers$StaxXmlParser$$failedRecord$1(StaxXmlParser.scala:47)
at com.databricks.spark.xml.parsers.StaxXmlParser$$anonfun$parse$1$$anonfun$apply$4.apply(StaxXmlParser.scala:92)
at com.databricks.spark.xml.parsers.StaxXmlParser$$anonfun$parse$1$$anonfun$apply$4.apply(StaxXmlParser.scala:73)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
This is the first part of the OSM-XML-File mentioned in the error message:
<?xml version="1.0" encoding="UTF-8"?>
<osm version="0.6" generator="Overpass API">
<note>The data included in this document is from www.openstreetmap.org. The data is made available under ODbL.</note>
<meta osm_base="2016-10-16T09:14:02Z"/>
<bounds minlat="47.4579" minlon="8.6432" maxlat="47.5442" maxlon="8.821"/>
<node id="15998875" lat="47.5296917" lon="8.8199895" version="10" timestamp="2014-05-17T02:31:10Z" changeset="22381881" uid="178186" user="mdk"/>
<node id="15998878" lat="47.5302420" lon="8.8158363" version="13" timestamp="2014-06-10T19:13:01Z" changeset="22856880" uid="266672" user="tf66"/>
<node id="15998881" lat="47.5308430" lon="8.8112575" version="15" timestamp="2014-06-10T19:13:01Z" changeset="22856880" uid="266672" user="tf66"/>
<node id="15998885" lat="47.5291517" lon="8.8246617" version="10" timestamp="2008-11-10T22:47:33Z" changeset="730754" uid="44456" user="twalter"/>
<node id="15998916" lat="47.5315221" lon="8.8060980" version="5" timestamp="2014-06-10T19:13:01Z" changeset="22856880" uid="266672" user="tf66"/>
<node id="15998949" lat="47.5288157" lon="8.8318368" version="10" timestamp="2014-05-17T02:31:10Z" changeset="22381881" uid="178186" user="mdk"/>
<node id="15998977" lat="47.4577472" lon="8.7085158" version="8" timestamp="2014-09-23T23:31:01Z" changeset="25635449" uid="7412" user="mjulius"/>
<node id="15998991" lat="47.5288778" lon="8.8282872" version="11" timestamp="2013-05-12T17:13:27Z" changeset="16099021" uid="390170" user="chewiebug"/>
<node id="15999625" lat="47.5291589" lon="8.8375096" version="11" timestamp="2014-05-17T02:31:10Z" changeset="22381881" uid="178186" user="mdk"/>
<node id="15999629" lat="47.5265460" lon="8.8810894" version="4" timestamp="2014-04-29T16:17:32Z" changeset="22026107" uid="1881716" user="Leonardo Hauschild"/>
<node id="15999636" lat="47.5250933" lon="8.8860317" version="3" timestamp="2008-09-03T11:53:55Z" changeset="512858" uid="7260" user="studerap">
<tag k="created_by" v="JOSM"/>
</node>The following lines are then following, just in case this might help
<node id="15999644" lat="47.5278966" lon="8.8775565" version="4" timestamp="2014-04-29T16:17:32Z" changeset="22026107" uid="1881716" user="Leonardo Hauschild"/>
<node id="15999647" lat="47.5302711" lon="8.8651121" version="4" timestamp="2014-06-09T08:39:48Z" changeset="22825155" uid="266672" user="tf66"/>
<node id="15999651" lat="47.5296471" lon="8.8414284" version="10" timestamp="2014-06-09T08:39:48Z" changeset="22825155" uid="266672" user="tf66"/>I used the following Python code and this produces the stack-trace above:
from pyspark.sql.types import *
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
MAP_DATA_SET_WINTERTHUR = 'winterthur_map.osm'
tag_type = StructType([ \
StructField("_VALUE", StringType(), True), \
StructField("_k", StringType(), True), \
StructField("_v", StringType(), True)])
my_schema = StructType([ \
StructField("_changeset", LongType() ,True), \
StructField("_corrupt_record",StringType() ,True), \
StructField("_id", LongType(), True), \
StructField("_lat", DoubleType(), True), \
StructField("_lon", DoubleType(), True), \
StructField("_timestamp", StringType(), True), \
StructField("_uid", LongType(), True), \
StructField("_user", StringType(), True), \
StructField("_version", LongType(), True), \
StructField("_VALUE", StringType(), True), \
StructField("tag", ArrayType(tag_type, True), True)])
df = sqlContext.read.format('com.databricks.spark.xml') \
.options(rowTag='node', mode='FAILFAST') \
.schema(my_schema) \
.load(MAP_DATA_SET_WINTERTHUR)
df.show(30)Just printing the schema works good.
df_nodes = sqlContext.read.format('com.databricks.spark.xml') \
.options(rowTag='node', mode='FAILFAST') \
.load(MAP_DATA_SET_WINTERTHUR)
df_nodes.printSchema()This produces the following output:
root
|-- _changeset: long (nullable = true)
|-- _id: long (nullable = true)
|-- _lat: double (nullable = true)
|-- _lon: double (nullable = true)
|-- _timestamp: string (nullable = true)
|-- _uid: long (nullable = true)
|-- _user: string (nullable = true)
|-- _version: long (nullable = true)
|-- tag: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _VALUE: string (nullable = true)
| | |-- _k: string (nullable = true)
| | |-- _v: string (nullable = true)
But using this derived schema still produces the same error as above:
df_nodes_2 = sqlContext.read.format('com.databricks.spark.xml') \
.options(rowTag='node', mode='FAILFAST') \
.load(MAP_DATA_SET_WINTERTHUR, schema = df_nodes.schema)
df_nodes_2.show()Is this a bug or do I miss something. After starring at the file and even reformatting the node and tag elements to one line, I still can not find the reason for this error.
Any help would be great!