Skip to content

Unable to run Spline in yarn cluster mode unable to access producer url #102

@ayanbizz

Description

@ayanbizz

I am getting the following error while trying to initialize spline in yarn cluster mode.
The error is related to the producer url access.
I am able to run spline in deploy-mode = client , the issue is with cluster mode.
This is how I tried to run the spark-submit

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --queue sparkjobs \
  --jars spark-2.3-spline-agent-bundle_2.11-0.5.3.jar \
  --conf "spark.sql.queryExecutionListeners=za.co.absa.spline.harvester.listener.SplineQueryExecutionListener" \
  --conf "spark.spline.producer.url=http://someurl/producer"
20/06/25 20:14:04 ERROR QueryExecutionEventHandlerFactory: Spline initialization failed! Spark Lineage tracking is DISABLED.
za.co.absa.spline.harvester.exception.SplineInitializationException: Spark Agent was not able to establish connection to Spline Gateway
	at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher$$anonfun$1.applyOrElse(HttpLineageDispatcher.scala:120)
	at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher$$anonfun$1.applyOrElse(HttpLineageDispatcher.scala:118)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
	at scala.util.Try$.apply(Try.scala:192)
	at scala.util.Failure.recover(Try.scala:216)
	at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher.<init>(HttpLineageDispatcher.scala:118)
	at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher.<init>(HttpLineageDispatcher.scala:88)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.instantiate(DefaultSplineConfigurer.scala:114)
	at za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.lineageDispatcher(DefaultSplineConfigurer.scala:94)
	at za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.queryExecutionEventHandler(DefaultSplineConfigurer.scala:92)
	at za.co.absa.spline.harvester.QueryExecutionEventHandlerFactory.createEventHandler(QueryExecutionEventHandlerFactory.scala:38)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$.za$co$absa$spline$harvester$listener$SplineQueryExecutionListener$$constructEventHandler(SplineQueryExecutionListener.scala:68)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$$anonfun$$lessinit$greater$1.apply(SplineQueryExecutionListener.scala:39)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$$anonfun$$lessinit$greater$1.apply(SplineQueryExecutionListener.scala:39)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.za$co$absa$spline$harvester$listener$SplineQueryExecutionListener$$maybeEventHandler$lzycompute(SplineQueryExecutionListener.scala:33)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.za$co$absa$spline$harvester$listener$SplineQueryExecutionListener$$maybeEventHandler(SplineQueryExecutionListener.scala:33)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener$$anonfun$onSuccess$1.apply$mcV$sp(SplineQueryExecutionListener.scala:42)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.withErrorHandling(SplineQueryExecutionListener.scala:51)
	at za.co.absa.spline.harvester.listener.SplineQueryExecutionListener.onSuccess(SplineQueryExecutionListener.scala:41)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:124)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:123)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:145)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:143)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
	at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
	at org.apache.spark.sql.util.ExecutionListenerManager.org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling(QueryExecutionListener.scala:143)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply$mcV$sp(QueryExecutionListener.scala:123)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:123)
	at org.apache.spark.sql.util.ExecutionListenerManager.readLock(QueryExecutionListener.scala:156)
	at org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:122)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3257)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: org.apache.hadoop.fs.FsUrlConnection:http://someurl/producer/status (of class org.apache.hadoop.fs.FsUrlConnection)
	at scalaj.http.HttpRequest.scalaj$http$HttpRequest$$doConnection(Http.scala:351)
	at scalaj.http.HttpRequest.exec(Http.scala:343)
	at scalaj.http.HttpRequest.asString(Http.scala:491)
	at za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher$$anonfun$2.apply(HttpLineageDispatcher.scala:109)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions