Skip to content

Commit 4842bc3

Browse files
committed
fix issues
1 parent de853b2 commit 4842bc3

File tree

3 files changed

+8
-7
lines changed

3 files changed

+8
-7
lines changed

flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchRowDataLookupFunction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.table.functions.FunctionContext;
2929
import org.apache.flink.table.functions.LookupFunction;
3030
import org.apache.flink.table.types.DataType;
31+
import org.apache.flink.util.FlinkRuntimeException;
3132
import org.apache.flink.util.Preconditions;
3233

3334
import org.elasticsearch.action.search.SearchRequest;
@@ -49,7 +50,7 @@
4950

5051
import static org.apache.flink.util.Preconditions.checkNotNull;
5152

52-
/** A lookup function implementing {@link LookupTableSource} in elasticsearch connector. */
53+
/** A lookup function implementing {@link LookupTableSource} in Elasticsearch connector. */
5354
@Internal
5455
public class ElasticsearchRowDataLookupFunction<C extends AutoCloseable> extends LookupFunction {
5556

@@ -155,14 +156,14 @@ public Collection<RowData> lookup(RowData keyRow) {
155156
} catch (IOException e) {
156157
LOG.error(String.format("Elasticsearch search error, retry times = %d", retry), e);
157158
if (retry >= maxRetryTimes) {
158-
throw new RuntimeException("Execution of Elasticsearch search failed.", e);
159+
throw new FlinkRuntimeException("Execution of Elasticsearch search failed.", e);
159160
}
160161
try {
161162
Thread.sleep(1000L * retry);
162163
} catch (InterruptedException e1) {
163164
LOG.warn(
164165
"Interrupted while waiting to retry failed elasticsearch search, aborting");
165-
throw new RuntimeException(e1);
166+
throw new FlinkRuntimeException(e1);
166167
}
167168
}
168169
}

flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public DynamicTableSource copy() {
120120

121121
@Override
122122
public String asSummaryString() {
123-
return "Elasticsearch-6";
123+
return "Elasticsearch6";
124124
}
125125

126126
@Override

flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
8383
config.getPathPrefix().orElse(null));
8484
}
8585

86-
Elasticsearch7ApiCallBridge elasticsearch6ApiCallBridge =
86+
Elasticsearch7ApiCallBridge elasticsearch7ApiCallBridge =
8787
new Elasticsearch7ApiCallBridge(config.getHosts(), restClientFactory);
8888

8989
// Elasticsearch only support non-nested look up keys
@@ -104,7 +104,7 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
104104
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
105105
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
106106
keyNames,
107-
elasticsearch6ApiCallBridge);
107+
elasticsearch7ApiCallBridge);
108108
if (lookupCache != null) {
109109
return PartialCachingLookupProvider.of(lookupFunction, lookupCache);
110110
} else {
@@ -120,7 +120,7 @@ public DynamicTableSource copy() {
120120

121121
@Override
122122
public String asSummaryString() {
123-
return "Elasticsearch-6";
123+
return "Elasticsearch7";
124124
}
125125

126126
@Override

0 commit comments

Comments
 (0)