|
141 | 141 | import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; |
142 | 142 | import org.elasticsearch.test.IndexSettingsModule; |
143 | 143 | import org.elasticsearch.test.VersionUtils; |
| 144 | +import org.elasticsearch.threadpool.ThreadPool; |
144 | 145 | import org.hamcrest.MatcherAssert; |
145 | 146 |
|
146 | 147 | import java.io.Closeable; |
@@ -5777,7 +5778,7 @@ public void testMaxSeqNoInCommitUserData() throws Exception { |
5777 | 5778 | assertMaxSeqNoInCommitUserData(engine); |
5778 | 5779 | } |
5779 | 5780 |
|
5780 | | - public void testRefreshAndFailEngineConcurrently() throws Exception { |
| 5781 | + public void testRefreshAndCloseEngineConcurrently() throws Exception { |
5781 | 5782 | AtomicBoolean stopped = new AtomicBoolean(); |
5782 | 5783 | Semaphore indexedDocs = new Semaphore(0); |
5783 | 5784 | Thread indexer = new Thread(() -> { |
@@ -5807,7 +5808,11 @@ public void testRefreshAndFailEngineConcurrently() throws Exception { |
5807 | 5808 | refresher.start(); |
5808 | 5809 | indexedDocs.acquire(randomIntBetween(1, 100)); |
5809 | 5810 | try { |
5810 | | - engine.failEngine("test", new IOException("simulated error")); |
| 5811 | + if (randomBoolean()) { |
| 5812 | + engine.failEngine("test", new IOException("simulated error")); |
| 5813 | + } else { |
| 5814 | + engine.close(); |
| 5815 | + } |
5811 | 5816 | } finally { |
5812 | 5817 | stopped.set(true); |
5813 | 5818 | indexer.join(); |
@@ -6149,4 +6154,41 @@ public void afterRefresh(boolean didRefresh) { |
6149 | 6154 | } |
6150 | 6155 | } |
6151 | 6156 | } |
| 6157 | + |
| 6158 | + public void testRefreshDoesNotBlockClosing() throws Exception { |
| 6159 | + final CountDownLatch refreshStarted = new CountDownLatch(1); |
| 6160 | + final CountDownLatch engineClosed = new CountDownLatch(1); |
| 6161 | + final ReferenceManager.RefreshListener refreshListener = new ReferenceManager.RefreshListener() { |
| 6162 | + |
| 6163 | + @Override |
| 6164 | + public void beforeRefresh() { |
| 6165 | + refreshStarted.countDown(); |
| 6166 | + try { |
| 6167 | + engineClosed.await(); |
| 6168 | + } catch (InterruptedException e) { |
| 6169 | + throw new AssertionError(e); |
| 6170 | + } |
| 6171 | + } |
| 6172 | + |
| 6173 | + @Override |
| 6174 | + public void afterRefresh(boolean didRefresh) { |
| 6175 | + assertFalse(didRefresh); |
| 6176 | + } |
| 6177 | + }; |
| 6178 | + try (Store store = createStore()) { |
| 6179 | + final EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, |
| 6180 | + refreshListener, null, null, engine.config().getCircuitBreakerService()); |
| 6181 | + try (InternalEngine engine = createEngine(config)) { |
| 6182 | + if (randomBoolean()) { |
| 6183 | + engine.index(indexForDoc(createParsedDoc("id", null))); |
| 6184 | + } |
| 6185 | + threadPool.executor(ThreadPool.Names.REFRESH).execute(() -> |
| 6186 | + expectThrows(AlreadyClosedException.class, |
| 6187 | + () -> engine.refresh("test", randomFrom(Engine.SearcherScope.values()), true))); |
| 6188 | + refreshStarted.await(); |
| 6189 | + engine.close(); |
| 6190 | + engineClosed.countDown(); |
| 6191 | + } |
| 6192 | + } |
| 6193 | + } |
6152 | 6194 | } |
0 commit comments