-
Notifications
You must be signed in to change notification settings - Fork 999
Description
What kind an issue is this?
- Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
The easier it is to track down the bug, the faster it is solved. - Feature Request. Start by telling us what problem you’re trying to solve.
Often a solution already exists! Don’t send pull requests to implement new features without
first getting our support. Sometimes we leave features out on purpose to keep the project small.
Issue description
The automatic index creation feature in elasticsearch-spark has stopped working with Elasticsearch clusters starting from version 8.6.0.
When using default saving mode which saves documents directly to Elasticsearch shards, when Spark creates more than one task to save those documents into Elasticsearch, at least one task fails with the exception like in "Stack trace" section below.
I have spent (together with my colleagues) some time investigating this issue and we have found the following:
- we have tested this on different Elasticsearch versions and this issue started occurring on Elasticsearch 8.6.0 and continues to occur even with newest Elasticsearch cluster 8.13.1
- it seems that this issue is caused by the Elasticsearch cluster API returning inconsistent replies:
- method to check if index exists (i.e.
HEAD /<name-of-index>) returns HTTP status 200 meaning that index exists - but method to get search shards (i.e.
GET /<name-of-index>/_search_shardsfor some short period of time returns empty list of shards, but this period (despite being short) is long enough to cause the assertion inRestService.initSingleIndex()to fail - it looks like this behaviour has changed in Elatsicsearch 8.6.0, because on previous versions this issue did not exist
- method to check if index exists (i.e.
Steps to reproduce
- Try to save some documents into non-existing index, with both settings
es.nodes.wan.onlyandes.nodes.client.onlyset tofalse(default)
Code:
public class Main {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setAppName("Example")
.setMaster("local[*]")
.set("es.nodes", "localhost:9200");
try (JavaSparkContext sparkContext = new JavaSparkContext(sparkConf)) {
List<Map<String, String>> data = Arrays.asList(
Map.of("propertyA", "valueA1", "propertyB", "valueB1"),
Map.of("propertyA", "valueA2", "propertyB", "valueB2"),
Map.of("propertyA", "valueA3", "propertyB", "valueB3")
);
JavaRDD<Map<String, String>> stringJavaRDD = sparkContext.parallelize(data);
JavaEsSpark.saveToEs(stringJavaRDD, "test-index");
}
}
}See https://github.com/mlyczek/elasticsearch-hadoop-concurrency-issue for full project ready to run.
Strack trace
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot determine write shards for [test-index]; likely its format is incorrect (maybe it contains illegal characters? or all shards failed?)
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:689)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:634)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:71)
at org.elasticsearch.spark.rdd.EsSpark$.$anonfun$doSaveToEs$1(EsSpark.scala:108)
at org.elasticsearch.spark.rdd.EsSpark$.$anonfun$doSaveToEs$1$adapted(EsSpark.scala:108)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Full log: full-log.txt
More investigation information
I have prepared a small Python script to show the inconsistent behaviour of Elasticsearch. It more or less mimics the behaviour of RestService.initSingleIndex() (checks if index exists, if not then creates it, waits for YELLOW status and then gets shards, if index does exist, then gets shards right away).
import logging
import threading
import time
import requests
from requests.auth import HTTPBasicAuth
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
es_url = "https://localhost:9201/"
index_name = "text-index"
my_auth = None
def run():
while True:
exists_reponse = requests.head(f"{es_url}/{index_name}", auth=my_auth, verify=False)
if (exists_reponse.status_code != requests.codes.ok):
logging.info("Creating test-index")
response = requests.put(f"{es_url}/{index_name}", auth=my_auth, verify=False)
logging.info("index create response %s", response)
logging.info("Waiting for YELLOW")
health_response = requests.get(f"{es_url}/_cluster/health/{index_name}?wait_for_status=YELLOW&timeout=10s", auth=my_auth, verify=False)
logging.info("health response %s", health_response)
else:
logging.info("index exists")
logging.info("getting shards")
search_shards_response = requests.get(f"{es_url}/{index_name}/_search_shards", auth=my_auth, verify=False)
shards = search_shards_response.json()['shards']
logging.info(shards)
if (shards[0] == []):
time.sleep(0.01)
else:
break
if __name__ == "__main__":
format = "%(asctime)s.%(msecs)03d [%(threadName)s]: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
logging.info("Deleting test-index")
response = requests.delete(f"{es_url}/{index_name}", auth=my_auth, verify=False)
logging.info(response)
x = threading.Thread(name="X ", target=run)
y = threading.Thread(name=" X", target=run)
x.start()
time.sleep(0.025)
y.start()Below is one of the outputs that I got after running the above script (it is necessary to adjust the sleep time at the end depending on the machine to catch the short period of time when the issue exists):
15:38:18.130 [MainThread]: Deleting test-index
15:38:18.159 [MainThread]: <Response [200]>
15:38:18.170 [X ]: Creating test-index
15:38:18.197 [ X]: index exists
15:38:18.198 [ X]: getting shards
15:38:18.210 [ X]: [[]]
15:38:18.233 [ X]: index exists
15:38:18.233 [ X]: getting shards
15:38:18.241 [X ]: index create response <Response [200]>
15:38:18.241 [X ]: Waiting for YELLOW
15:38:18.245 [ X]: [[{'state': 'STARTED', 'primary': True, 'node': 'dUGB49_fRcGcVonQgXvB_g', 'relocating_node': None, 'shard': 0, 'index': 'text-index', 'allocation_id': {'id': 'Q6Ya3noUTQuN5BLXWvXhcQ'}, 'relocation_failure_info': {'failed_attempts': 0}}]]
15:38:18.254 [X ]: health response <Response [200]>
15:38:18.254 [X ]: getting shards
15:38:18.266 [X ]: [[{'state': 'STARTED', 'primary': True, 'node': 'dUGB49_fRcGcVonQgXvB_g', 'relocating_node': None, 'shard': 0, 'index': 'text-index', 'allocation_id': {'id': 'Q6Ya3noUTQuN5BLXWvXhcQ'}, 'relocation_failure_info': {'failed_attempts': 0}}]]
In the above log, it can be seen that while one thread is waiting for Elasticsearch to finish creating the index, the other thread gets information that index exists, but the list of shards is empty, and after sleeping 0.01 second the list of shards is returned correctly.
Version Info
OS: Linux
JVM: OpenJDK 17.0.9
Spark: 3.3.2
ES-Spark: 8.13.1
ES: >= 8.6.0