|
36 | 36 | import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; |
37 | 37 | import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; |
38 | 38 | import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; |
| 39 | +import org.elasticsearch.action.bulk.WriteMemoryLimits; |
39 | 40 | import org.elasticsearch.action.support.replication.TransportReplicationAction; |
40 | 41 | import org.elasticsearch.client.Client; |
41 | 42 | import org.elasticsearch.cluster.ClusterName; |
@@ -1154,10 +1155,34 @@ public void beforeIndexDeletion() throws Exception { |
1154 | 1155 | // and not all docs have been purged after the test) and inherit from |
1155 | 1156 | // ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures. |
1156 | 1157 | assertNoPendingIndexOperations(); |
| 1158 | + assertAllPendingWriteLimitsReleased(); |
1157 | 1159 | assertOpenTranslogReferences(); |
1158 | 1160 | assertNoSnapshottedIndexCommit(); |
1159 | 1161 | } |
1160 | 1162 |
|
| 1163 | + private void assertAllPendingWriteLimitsReleased() throws Exception { |
| 1164 | + assertBusy(() -> { |
| 1165 | + for (NodeAndClient nodeAndClient : nodes.values()) { |
| 1166 | + WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name); |
| 1167 | + final long coordinatingBytes = writeMemoryLimits.getCoordinatingBytes(); |
| 1168 | + if (coordinatingBytes > 0) { |
| 1169 | + throw new AssertionError("pending coordinating write bytes [" + coordinatingBytes + "] bytes on node [" |
| 1170 | + + nodeAndClient.name + "]."); |
| 1171 | + } |
| 1172 | + final long primaryBytes = writeMemoryLimits.getPrimaryBytes(); |
| 1173 | + if (primaryBytes > 0) { |
| 1174 | + throw new AssertionError("pending primary write bytes [" + coordinatingBytes + "] bytes on node [" |
| 1175 | + + nodeAndClient.name + "]."); |
| 1176 | + } |
| 1177 | + final long replicaBytes = writeMemoryLimits.getReplicaBytes(); |
| 1178 | + if (replicaBytes > 0) { |
| 1179 | + throw new AssertionError("pending replica write bytes [" + coordinatingBytes + "] bytes on node [" |
| 1180 | + + nodeAndClient.name + "]."); |
| 1181 | + } |
| 1182 | + } |
| 1183 | + }, 60, TimeUnit.SECONDS); |
| 1184 | + } |
| 1185 | + |
1161 | 1186 | private void assertNoPendingIndexOperations() throws Exception { |
1162 | 1187 | assertBusy(() -> { |
1163 | 1188 | for (NodeAndClient nodeAndClient : nodes.values()) { |
|
0 commit comments