Skip to content

Commit b034153

Browse files
committed
Change grok watch dog to be Matcher based instead of thread based. (#48346)
There is a watchdog in order to avoid long running (and expensive) grok expressions. Currently the watchdog is thread based, threads that run grok expressions are registered and after completion unregister. If these threads stay registered for too long then the watch dog interrupts these threads. Joni (the library that powers grok expressions) has a mechanism that checks whether the current thread is interrupted and if so abort the pattern matching. Newer versions have an additional method to abort long running pattern matching inside joni. Instead of checking the thread's interrupted flag, joni now also checks a volatile field that can be set via a `Matcher` instance. This is more efficient method for aborting long running matches. (joni checks each 30k iterations whether interrupted flag is set vs. just checking a volatile field) Recently we upgraded to a recent joni version (#47374), and this PR is a followup of that PR. This change should also fix #43673, since it appears when unit tests are ran the a test runner thread's interrupted flag may already have been set, due to some thread reuse.
1 parent 5ecfcdb commit b034153

File tree

10 files changed

+177
-158
lines changed

10 files changed

+177
-158
lines changed

libs/grok/src/main/java/org/elasticsearch/grok/Grok.java

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -74,24 +74,24 @@ public final class Grok {
7474
private final Map<String, String> patternBank;
7575
private final boolean namedCaptures;
7676
private final Regex compiledExpression;
77-
private final ThreadWatchdog threadWatchdog;
77+
private final MatcherWatchdog matcherWatchdog;
7878

7979
public Grok(Map<String, String> patternBank, String grokPattern) {
80-
this(patternBank, grokPattern, true, ThreadWatchdog.noop());
80+
this(patternBank, grokPattern, true, MatcherWatchdog.noop());
8181
}
8282

83-
public Grok(Map<String, String> patternBank, String grokPattern, ThreadWatchdog threadWatchdog) {
84-
this(patternBank, grokPattern, true, threadWatchdog);
83+
public Grok(Map<String, String> patternBank, String grokPattern, MatcherWatchdog matcherWatchdog) {
84+
this(patternBank, grokPattern, true, matcherWatchdog);
8585
}
8686

8787
Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures) {
88-
this(patternBank, grokPattern, namedCaptures, ThreadWatchdog.noop());
88+
this(patternBank, grokPattern, namedCaptures, MatcherWatchdog.noop());
8989
}
9090

91-
private Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, ThreadWatchdog threadWatchdog) {
91+
private Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, MatcherWatchdog matcherWatchdog) {
9292
this.patternBank = patternBank;
9393
this.namedCaptures = namedCaptures;
94-
this.threadWatchdog = threadWatchdog;
94+
this.matcherWatchdog = matcherWatchdog;
9595

9696
for (Map.Entry<String, String> entry : patternBank.entrySet()) {
9797
String name = entry.getKey();
@@ -172,14 +172,12 @@ public String toRegex(String grokPattern) {
172172

173173
int result;
174174
try {
175-
threadWatchdog.register();
176-
result = matcher.searchInterruptible(0, grokPatternBytes.length, Option.NONE);
177-
} catch (InterruptedException e) {
178-
result = Matcher.INTERRUPTED;
175+
matcherWatchdog.register(matcher);
176+
result = matcher.search(0, grokPatternBytes.length, Option.NONE);
179177
} finally {
180-
threadWatchdog.unregister();
178+
matcherWatchdog.unregister(matcher);
181179
}
182-
if (result != -1) {
180+
if (result >= 0) {
183181
Region region = matcher.getEagerRegion();
184182
String namedPatternRef = groupMatch(NAME_GROUP, region, grokPattern);
185183
String subName = groupMatch(SUBNAME_GROUP, region, grokPattern);
@@ -217,18 +215,16 @@ public String toRegex(String grokPattern) {
217215
* Checks whether a specific text matches the defined grok expression.
218216
*
219217
* @param text the string to match
220-
* @return true if grok expression matches text, false otherwise.
218+
* @return true if grok expression matches text or there is a timeout, false otherwise.
221219
*/
222220
public boolean match(String text) {
223221
Matcher matcher = compiledExpression.matcher(text.getBytes(StandardCharsets.UTF_8));
224222
int result;
225223
try {
226-
threadWatchdog.register();
227-
result = matcher.searchInterruptible(0, text.length(), Option.DEFAULT);
228-
} catch (InterruptedException e) {
229-
result = Matcher.INTERRUPTED;
224+
matcherWatchdog.register(matcher);
225+
result = matcher.search(0, text.length(), Option.DEFAULT);
230226
} finally {
231-
threadWatchdog.unregister();
227+
matcherWatchdog.unregister(matcher);
232228
}
233229
return (result != -1);
234230
}
@@ -245,16 +241,14 @@ public Map<String, Object> captures(String text) {
245241
Matcher matcher = compiledExpression.matcher(textAsBytes);
246242
int result;
247243
try {
248-
threadWatchdog.register();
249-
result = matcher.searchInterruptible(0, textAsBytes.length, Option.DEFAULT);
250-
} catch (InterruptedException e) {
251-
result = Matcher.INTERRUPTED;
244+
matcherWatchdog.register(matcher);
245+
result = matcher.search(0, textAsBytes.length, Option.DEFAULT);
252246
} finally {
253-
threadWatchdog.unregister();
247+
matcherWatchdog.unregister(matcher);
254248
}
255249
if (result == Matcher.INTERRUPTED) {
256250
throw new RuntimeException("grok pattern matching was interrupted after [" +
257-
threadWatchdog.maxExecutionTimeInMillis() + "] ms");
251+
matcherWatchdog.maxExecutionTimeInMillis() + "] ms");
258252
} else if (result == Matcher.FAILED) {
259253
// TODO: I think we should throw an error here?
260254
return null;

libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java renamed to libs/grok/src/main/java/org/elasticsearch/grok/MatcherWatchdog.java

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.elasticsearch.grok;
2020

21+
import org.joni.Matcher;
22+
2123
import java.util.Map;
2224
import java.util.concurrent.ConcurrentHashMap;
2325
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,88 +29,92 @@
2729

2830
/**
2931
* Protects against long running operations that happen between the register and unregister invocations.
30-
* Threads that invoke {@link #register()}, but take too long to invoke the {@link #unregister()} method
32+
* Threads that invoke {@link #register(Matcher)}, but take too long to invoke the {@link #unregister(Matcher)} method
3133
* will be interrupted.
3234
*
3335
* This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because
3436
* it can end up spinning endlessly if the regular expression is too complex. Joni has checks
3537
* that for every 30k iterations it checks if the current thread is interrupted and if so
3638
* returns {@link org.joni.Matcher#INTERRUPTED}.
3739
*/
38-
public interface ThreadWatchdog {
39-
40+
public interface MatcherWatchdog {
41+
4042
/**
41-
* Registers the current thread and interrupts the current thread
42-
* if the takes too long for this thread to invoke {@link #unregister()}.
43+
* Registers the current matcher and interrupts the this matcher
44+
* if the takes too long for this thread to invoke {@link #unregister(Matcher)}.
45+
*
46+
* @param matcher The matcher to register
4347
*/
44-
void register();
45-
48+
void register(Matcher matcher);
49+
4650
/**
47-
* @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister()}
48-
* after {@link #register()} has been invoked before this ThreadWatchDog starts to interrupting that thread.
51+
* @return The maximum allowed time in milliseconds for a thread to invoke {@link #unregister(Matcher)}
52+
* after {@link #register(Matcher)} has been invoked before this ThreadWatchDog starts to interrupting that thread.
4953
*/
5054
long maxExecutionTimeInMillis();
51-
55+
5256
/**
53-
* Unregisters the current thread and prevents it from being interrupted.
57+
* Unregisters the current matcher and prevents it from being interrupted.
58+
*
59+
* @param matcher The matcher to unregister
5460
*/
55-
void unregister();
56-
61+
void unregister(Matcher matcher);
62+
5763
/**
58-
* Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register()}
59-
* and not {@link #unregister()} and have been in this state for longer than the specified max execution interval and
64+
* Returns an implementation that checks for each fixed interval if there are threads that have invoked {@link #register(Matcher)}
65+
* and not {@link #unregister(Matcher)} and have been in this state for longer than the specified max execution interval and
6066
* then interrupts these threads.
6167
*
6268
* @param interval The fixed interval to check if there are threads to interrupt
6369
* @param maxExecutionTime The time a thread has the execute an operation.
6470
* @param relativeTimeSupplier A supplier that returns relative time
6571
* @param scheduler A scheduler that is able to execute a command for each fixed interval
6672
*/
67-
static ThreadWatchdog newInstance(long interval,
73+
static MatcherWatchdog newInstance(long interval,
6874
long maxExecutionTime,
6975
LongSupplier relativeTimeSupplier,
7076
BiConsumer<Long, Runnable> scheduler) {
7177
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
7278
}
73-
79+
7480
/**
7581
* @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions.
7682
*/
77-
static ThreadWatchdog noop() {
83+
static MatcherWatchdog noop() {
7884
return Noop.INSTANCE;
7985
}
80-
81-
class Noop implements ThreadWatchdog {
82-
86+
87+
class Noop implements MatcherWatchdog {
88+
8389
private static final Noop INSTANCE = new Noop();
84-
90+
8591
private Noop() {
8692
}
87-
93+
8894
@Override
89-
public void register() {
95+
public void register(Matcher matcher) {
9096
}
91-
97+
9298
@Override
9399
public long maxExecutionTimeInMillis() {
94100
return Long.MAX_VALUE;
95101
}
96-
102+
97103
@Override
98-
public void unregister() {
104+
public void unregister(Matcher matcher) {
99105
}
100106
}
101-
102-
class Default implements ThreadWatchdog {
103-
107+
108+
class Default implements MatcherWatchdog {
109+
104110
private final long interval;
105111
private final long maxExecutionTime;
106112
private final LongSupplier relativeTimeSupplier;
107113
private final BiConsumer<Long, Runnable> scheduler;
108114
private final AtomicInteger registered = new AtomicInteger(0);
109115
private final AtomicBoolean running = new AtomicBoolean(false);
110-
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
111-
116+
final ConcurrentHashMap<Matcher, Long> registry = new ConcurrentHashMap<>();
117+
112118
private Default(long interval,
113119
long maxExecutionTime,
114120
LongSupplier relativeTimeSupplier,
@@ -118,30 +124,30 @@ private Default(long interval,
118124
this.relativeTimeSupplier = relativeTimeSupplier;
119125
this.scheduler = scheduler;
120126
}
121-
122-
public void register() {
127+
128+
public void register(Matcher matcher) {
123129
registered.getAndIncrement();
124-
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
130+
Long previousValue = registry.put(matcher, relativeTimeSupplier.getAsLong());
125131
if (running.compareAndSet(false, true) == true) {
126132
scheduler.accept(interval, this::interruptLongRunningExecutions);
127133
}
128134
assert previousValue == null;
129135
}
130-
136+
131137
@Override
132138
public long maxExecutionTimeInMillis() {
133139
return maxExecutionTime;
134140
}
135-
136-
public void unregister() {
137-
Long previousValue = registry.remove(Thread.currentThread());
141+
142+
public void unregister(Matcher matcher) {
143+
Long previousValue = registry.remove(matcher);
138144
registered.decrementAndGet();
139145
assert previousValue != null;
140146
}
141-
147+
142148
private void interruptLongRunningExecutions() {
143149
final long currentRelativeTime = relativeTimeSupplier.getAsLong();
144-
for (Map.Entry<Thread, Long> entry : registry.entrySet()) {
150+
for (Map.Entry<Matcher, Long> entry : registry.entrySet()) {
145151
if ((currentRelativeTime - entry.getValue()) > maxExecutionTime) {
146152
entry.getKey().interrupt();
147153
// not removing the entry here, this happens in the unregister() method.
@@ -153,7 +159,7 @@ private void interruptLongRunningExecutions() {
153159
running.set(false);
154160
}
155161
}
156-
162+
157163
}
158-
164+
159165
}

libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ public void testExponentialExpressions() {
430430
});
431431
t.start();
432432
};
433-
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
433+
Grok grok = new Grok(basePatterns, grokPattern, MatcherWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
434434
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));
435435
run.set(false);
436436
assertThat(e.getMessage(), equalTo("grok pattern matching was interrupted after [200] ms"));

libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java renamed to libs/grok/src/test/java/org/elasticsearch/grok/MatcherWatchdogTests.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,24 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626
import org.elasticsearch.test.ESTestCase;
27+
import org.joni.Matcher;
2728
import org.mockito.Mockito;
2829

2930
import static org.hamcrest.Matchers.is;
3031
import static org.mockito.Matchers.any;
3132
import static org.mockito.Matchers.eq;
3233
import static org.mockito.Mockito.doAnswer;
3334
import static org.mockito.Mockito.mock;
35+
import static org.mockito.Mockito.timeout;
3436
import static org.mockito.Mockito.verify;
3537
import static org.mockito.Mockito.verifyNoMoreInteractions;
3638
import static org.mockito.Mockito.verifyZeroInteractions;
3739

38-
public class ThreadWatchdogTests extends ESTestCase {
40+
public class MatcherWatchdogTests extends ESTestCase {
3941

4042
public void testInterrupt() throws Exception {
4143
AtomicBoolean run = new AtomicBoolean(true); // to avoid a lingering thread when test has completed
42-
ThreadWatchdog watchdog = ThreadWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> {
44+
MatcherWatchdog watchdog = MatcherWatchdog.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> {
4345
try {
4446
Thread.sleep(delay);
4547
} catch (InterruptedException e) {
@@ -53,17 +55,17 @@ public void testInterrupt() throws Exception {
5355
thread.start();
5456
});
5557

56-
Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry;
58+
Map<?, ?> registry = ((MatcherWatchdog.Default) watchdog).registry;
5759
assertThat(registry.size(), is(0));
5860
// need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted
5961
AtomicBoolean interrupted = new AtomicBoolean(false);
6062
Thread thread = new Thread(() -> {
61-
Thread currentThread = Thread.currentThread();
62-
watchdog.register();
63-
while (currentThread.isInterrupted() == false) {}
63+
Matcher matcher = mock(Matcher.class);
64+
watchdog.register(matcher);
65+
verify(matcher, timeout(9999).times(1)).interrupt();
6466
interrupted.set(true);
6567
while (run.get()) {} // wait here so that the size of the registry can be asserted
66-
watchdog.unregister();
68+
watchdog.unregister(matcher);
6769
});
6870
thread.start();
6971
assertBusy(() -> {
@@ -79,7 +81,7 @@ public void testInterrupt() throws Exception {
7981
public void testIdleIfNothingRegistered() throws Exception {
8082
long interval = 1L;
8183
ScheduledExecutorService threadPool = mock(ScheduledExecutorService.class);
82-
ThreadWatchdog watchdog = ThreadWatchdog.newInstance(interval, Long.MAX_VALUE, System::currentTimeMillis,
84+
MatcherWatchdog watchdog = MatcherWatchdog.newInstance(interval, Long.MAX_VALUE, System::currentTimeMillis,
8385
(delay, command) -> threadPool.schedule(command, delay, TimeUnit.MILLISECONDS));
8486
// Periodic action is not scheduled because no thread is registered
8587
verifyZeroInteractions(threadPool);
@@ -91,16 +93,20 @@ public void testIdleIfNothingRegistered() throws Exception {
9193
}).when(threadPool).schedule(
9294
any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS)
9395
);
94-
watchdog.register();
96+
Matcher matcher = mock(Matcher.class);
97+
watchdog.register(matcher);
9598
// Registering the first thread should have caused the command to get scheduled again
9699
Runnable command = commandFuture.get(1L, TimeUnit.MILLISECONDS);
97100
Mockito.reset(threadPool);
98-
watchdog.unregister();
101+
watchdog.unregister(matcher);
99102
command.run();
100103
// Periodic action is not scheduled again because no thread is registered
101104
verifyZeroInteractions(threadPool);
102-
watchdog.register();
103-
Thread otherThread = new Thread(watchdog::register);
105+
watchdog.register(matcher);
106+
Thread otherThread = new Thread(() -> {
107+
Matcher otherMatcher = mock(Matcher.class);
108+
watchdog.register(otherMatcher);
109+
});
104110
try {
105111
verify(threadPool).schedule(any(Runnable.class), eq(interval), eq(TimeUnit.MILLISECONDS));
106112
// Registering a second thread does not cause the command to get scheduled twice

0 commit comments

Comments
 (0)