From b75a80b4683e59c192efc3ab1869010821b6e87e Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Wed, 21 Feb 2024 21:12:14 +0100 Subject: [PATCH 01/10] YARN-11656 RMStateStore event queue blocked TODO --- .../org/apache/hadoop/yarn/event/Event.java | 10 + .../multidispatcher/MultiDispatcher.java | 175 ++++++++++++++++++ .../MultiDispatcherConfig.java | 58 ++++++ .../MultiDispatcherLibrary.java | 43 +++++ .../multidispatcher/MultiDispatcherLocks.java | 68 +++++++ .../yarn/metrics/DispatcherEventMetrics.java | 88 +++++++++ .../yarn/event/multidispatcher/MockEvent.java | 37 ++++ .../event/multidispatcher/MockEventType.java | 6 + .../multidispatcher/TestMultiDispatcher.java | 79 ++++++++ .../recovery/RMStateStore.java | 26 +-- .../recovery/RMStateStoreAppAttemptEvent.java | 5 + .../recovery/RMStateStoreAppEvent.java | 5 + .../RMStateStoreRemoveAppAttemptEvent.java | 5 + .../recovery/RMStateStoreRemoveAppEvent.java | 5 + .../RMStateUpdateAppAttemptEvent.java | 5 + .../recovery/RMStateUpdateAppEvent.java | 5 + .../recovery/TestFSRMStateStore.java | 1 - .../recovery/TestLeveldbRMStateStore.java | 1 - .../recovery/TestZKRMStateStore.java | 1 - 19 files changed, 598 insertions(+), 25 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLocks.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Event.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Event.java index e76a7530a7eca..ddaa1b37971d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Event.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Event.java @@ -32,4 +32,14 @@ public interface Event> { TYPE getType(); long getTimestamp(); String toString(); + + /** + * In case of parallel execution of events in the same dispatcher, + * the result of this method will be used as semaphore. + * If method returns null, then a default semaphore will be used. + * @return the semaphore + */ + default String getLockKey() { + return null; + }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java new file mode 100644 index 0000000000000..f07ac7496bf3c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.metrics.DispatcherEventMetrics; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; + +/** + * Dispatches {@link Event}s in a parallel thread. + * The {@link this#getEventHandler()} method can be used to post an event to the dispatcher. + * The posted event will be added to the event queue what is polled by many thread, + * based on the config values in the {@link MultiDispatcherConfig}. + * The posted events can be parallel executed, + * if the result of the {@link Event#getLockKey()} is different between 2 event. + * If the method return with null, then a global semaphore will be used for these events. + * The method usually returns with the applicationId based on the concept + * parallel apps should not affect each others. + * The locking logic is implemented in {@link MultiDispatcherLocks}. + * The Dispatcher provides metric data using the {@link DispatcherEventMetrics} + */ +public class MultiDispatcher extends AbstractService implements Dispatcher { + + private final Logger LOG; + private final String dispatcherName; + private final DispatcherEventMetrics metrics; + private final MultiDispatcherLocks locks; + private final MultiDispatcherLibrary library; + private final Clock clock = new MonotonicClock(); + + private MultiDispatcherConfig config; + private BlockingQueue eventQueue; + private ThreadPoolExecutor threadPoolExecutor; + private ScheduledThreadPoolExecutor monitorExecutor; + + public MultiDispatcher(String dispatcherName) { + super("Dispatcher"); + this.dispatcherName = dispatcherName.replaceAll(" ", "-").toLowerCase(); + this.LOG = LoggerFactory.getLogger(MultiDispatcher.class.getCanonicalName() + "." + this.dispatcherName); + this.metrics = new DispatcherEventMetrics(this.dispatcherName); + this.locks = new MultiDispatcherLocks(this.LOG); + this.library = new MultiDispatcherLibrary(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception{ + super.serviceInit(conf); + this.config = new MultiDispatcherConfig(getConfig(), this.dispatcherName); + this.eventQueue = new LinkedBlockingQueue<>(this.config.getQueueSize()); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + createWorkerPool(); + createMonitorThread(); + DefaultMetricsSystem.instance().register( + "Event metrics for " + dispatcherName, + "Event metrics for " + dispatcherName, + metrics + ); + } + + @Override + protected void serviceStop() throws Exception { + if (monitorExecutor != null) { + monitorExecutor.shutdownNow(); + } + threadPoolExecutor.shutdown(); + threadPoolExecutor.awaitTermination(config.getGracefulStopSeconds(), TimeUnit.SECONDS); + int terminatedSize = threadPoolExecutor.shutdownNow().size(); + if (0 < terminatedSize) { + LOG.error("{} tasks not finished in time, so they were terminated", terminatedSize); + } + } + + @Override + public EventHandler getEventHandler() { + return event -> { + if (isInState(STATE.STOPPED)) { + LOG.warn("Discard event {} because stopped state", event); + } else { + EventHandler handler = library.getEventHandler(event); + threadPoolExecutor.execute(createRunnable(event, handler)); + metrics.addEvent(event.getType()); + } + }; + } + + @Override + public void register(Class eventType, EventHandler handler) { + library.register(eventType, handler); + metrics.init(eventType); + } + + private Runnable createRunnable(Event event, EventHandler handler) { + return () -> { + LOG.debug("{} handle {}, queue size: {}", + Thread.currentThread().getName(), event.getClass().getSimpleName(), eventQueue.size()); + locks.lock(event); + long start = clock.getTime(); + try { + handler.handle(event); + } finally { + metrics.updateRate(event.getType(), clock.getTime() - start); + locks.unLock(event); + metrics.removeEvent(event.getType()); + } + }; + } + + private void createWorkerPool() { + this.threadPoolExecutor = new ThreadPoolExecutor( + config.getDefaultPoolSize(), + config.getMaxPoolSize(), + config.getKeepAliveSeconds(), + TimeUnit.SECONDS, + eventQueue, + new BasicThreadFactory.Builder() + .namingPattern(this.dispatcherName + "-worker-%d") + .build() + ); + } + + private void createMonitorThread() { + int interval = config.getMonitorSeconds(); + if (interval < 1) { + return; + } + this.monitorExecutor = new ScheduledThreadPoolExecutor( + 1, + new BasicThreadFactory.Builder() + .namingPattern(this.dispatcherName + "-monitor-%d") + .build()); + monitorExecutor.scheduleAtFixedRate(() -> { + int size = eventQueue.size(); + if (0 < size) { + LOG.info("Event queue size is {}", size); + LOG.debug("Metrics: {}", metrics); + } + },10, interval, TimeUnit.SECONDS); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java new file mode 100644 index 0000000000000..cedcb8b8d315e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import org.apache.hadoop.conf.Configuration; + +/** + * All the config what can be use in the {@link MultiDispatcher} + */ +class MultiDispatcherConfig extends Configuration { + + private final String prefix; + + public MultiDispatcherConfig(Configuration configuration, String dispatcherName) { + super(configuration); + this.prefix = String.format("yarn.dispatcher.multi-thread.%s.", dispatcherName); + } + + public int getDefaultPoolSize() { + return super.getInt(prefix + "default-pool-size", 4); + } + + public int getMaxPoolSize() { + return super.getInt(prefix + "max-pool-size", 8); + } + + public int getKeepAliveSeconds() { + return super.getInt(prefix + "keep-alive-seconds", 10); + } + + public int getQueueSize() { + return super.getInt(prefix + "queue-size", 1_000_000); + } + + public int getMonitorSeconds() { + return super.getInt(prefix + "monitor-seconds", 30); + } + + public int getGracefulStopSeconds() { + return super.getInt(prefix + "graceful-stop-seconds", 60); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java new file mode 100644 index 0000000000000..c2a309f3a83a2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; + + +class MultiDispatcherLibrary { + + private final Map LIB = new HashMap<>(); + + public EventHandler getEventHandler(Event e) { + EventHandler handler = LIB.get(e.getType().getClass().getCanonicalName()); + if (handler == null) { + throw new Error("EventHandler for " + e.getType() + ", was not found in " + LIB.keySet()); + } + return handler; + } + + public void register(Class eventType, EventHandler handler) { + LIB.put(eventType.getCanonicalName(), handler); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLocks.java new file mode 100644 index 0000000000000..0181316b5c690 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLocks.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; + +import org.apache.hadoop.yarn.event.Event; + +/** + * The locking logic of the {@link MultiDispatcher} + */ +class MultiDispatcherLocks { + private final Logger LOG; + private final Set lockKeys= ConcurrentHashMap.newKeySet(); + private final Lock globalLock = new ReentrantLock(); + + public MultiDispatcherLocks(Logger LOG) { + this.LOG = LOG; + } + + void lock(Event event) { + String lockKey = event.getLockKey(); + debugLockLog(lockKey, "lock", event); + if (lockKey == null) { + globalLock.lock(); + } else { + lockKeys.add(lockKey); + } + debugLockLog(lockKey, "locked", event); + } + + void unLock(Event event) { + String lockKey = event.getLockKey(); + debugLockLog(lockKey, "unlock", event); + if (lockKey == null) { + globalLock.unlock(); + } else { + lockKeys.remove(lockKey); + } + debugLockLog(lockKey, "unlocked", event); + } + + private void debugLockLog(String lockKey, String command, Event event) { + LOG.debug("{} {} with thread {} for event {}", + lockKey, command, Thread.currentThread().getName(), event.getClass().getSimpleName()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java new file mode 100644 index 0000000000000..7c79eb5e5e699 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.StringJoiner; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class DispatcherEventMetrics implements MetricsSource { + + private final Map currentEventCountMetrics; + private final Map processingTimeMetrics; + private final MetricsRegistry registry; + + public DispatcherEventMetrics(String name) { + this.currentEventCountMetrics = new HashMap<>(); + this.processingTimeMetrics = new HashMap<>(); + this.registry = new MetricsRegistry(Interns.info( + "DispatcherEventMetrics for " + name, + "DispatcherEventMetrics for " + name + )); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public void init(Class typeClass) { + for(Object c : typeClass.getEnumConstants()) { + String key = createKey(c); + currentEventCountMetrics.put(key, this.registry.newGauge( + key + "_Current", key + "_Current", 0L)); + processingTimeMetrics.put(key, this.registry.newRate(key + "_", key+ "_")); + } + } + + public void addEvent(Object type) { + currentEventCountMetrics.get(createKey(type)).incr(); + } + + public void removeEvent(Object type) { + currentEventCountMetrics.get(createKey(type)).decr(); + } + + public void updateRate(Object type, long millisecond) { + processingTimeMetrics.get(createKey(type)).add(millisecond); + } + + private String createKey(Object constant) { + return constant.getClass().getSimpleName() + "#" + constant; + } + + @Override + public String toString() { + return new StringJoiner(", ") + .add("currentEventCountMetrics=" + currentEventCountMetrics) + .add("processingTimeMetrics=" + processingTimeMetrics) + .toString(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java new file mode 100644 index 0000000000000..22418f2b9a8a9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java @@ -0,0 +1,37 @@ +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.yarn.event.Event; + +class MockEvent implements Event { + private static final Random RANDOM = new Random(); + + private final MockEventType type = Math.random() < 0.5 + ? MockEventType.TYPE_1 + : MockEventType.TYPE_2; + + private final long timestamp = System.currentTimeMillis(); + + private final String lockKey = Arrays.asList( + "APP1", + "APP2", + null + ).get(RANDOM.nextInt(3)); + + @Override + public MockEventType getType() { + return type; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public String getLockKey() { + return lockKey; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java new file mode 100644 index 0000000000000..e618f9c3d87c3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java @@ -0,0 +1,6 @@ +package org.apache.hadoop.yarn.event.multidispatcher; + +enum MockEventType { + TYPE_1, + TYPE_2 +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java new file mode 100644 index 0000000000000..1e1b6c10bd28c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; + +import static org.junit.Assert.assertEquals; + +public class TestMultiDispatcher { + + @Test(timeout = 5_000) + public void testHandle() throws Exception { + MultiDispatcher dispatcher = new MultiDispatcher("Test"); + assertEquals(Service.STATE.NOTINITED, dispatcher.getServiceState()); + dispatcher.init(new Configuration()); + assertEquals(Service.STATE.INITED, dispatcher.getServiceState()); + dispatcher.start(); + assertEquals(Service.STATE.STARTED, dispatcher.getServiceState()); + AtomicInteger handledEvent = new AtomicInteger(); + dispatcher.register(MockEventType.class, event -> handledEvent.incrementAndGet()); + + IntStream.range(0, 1_000) + .forEach(value -> dispatcher.getEventHandler().handle(new MockEvent())); + + dispatcher.stop(); + assertEquals(Service.STATE.STOPPED, dispatcher.getServiceState()); + assertEquals(1_000, handledEvent.get()); + } + + @Test(timeout = 5_000, expected = Error.class) + public void testMissingHandler() { + MultiDispatcher dispatcher = new MultiDispatcher("Test"); + assertEquals(Service.STATE.NOTINITED, dispatcher.getServiceState()); + dispatcher.init(new Configuration()); + assertEquals(Service.STATE.INITED, dispatcher.getServiceState()); + dispatcher.start(); + assertEquals(Service.STATE.STARTED, dispatcher.getServiceState()); + + dispatcher.getEventHandler().handle(new MockEvent()); + } + + @Test(timeout = 5_000) + public void testBlocksNewOnStop() { + MultiDispatcher dispatcher = new MultiDispatcher("Test"); + assertEquals(Service.STATE.NOTINITED, dispatcher.getServiceState()); + dispatcher.init(new Configuration()); + assertEquals(Service.STATE.INITED, dispatcher.getServiceState()); + AtomicInteger handledEvent = new AtomicInteger(); + dispatcher.register(MockEventType.class, event -> handledEvent.incrementAndGet()); + dispatcher.start(); + assertEquals(Service.STATE.STARTED, dispatcher.getServiceState()); + dispatcher.stop(); + assertEquals(Service.STATE.STOPPED, dispatcher.getServiceState()); + dispatcher.getEventHandler().handle(new MockEvent()); + assertEquals(0, handledEvent.get()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 49e25eb8f1378..6952806a3c846 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -33,9 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.crypto.SecretKey; @@ -59,9 +56,9 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.multidispatcher.MultiDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -118,8 +115,6 @@ public abstract class RMStateStore extends AbstractService { protected long baseEpoch; private long epochRange; protected ResourceManager resourceManager; - private final ReadLock readLock; - private final WriteLock writeLock; public static final Logger LOG = LoggerFactory.getLogger(RMStateStore.class); @@ -687,9 +682,6 @@ public RMStateStoreState transition(RMStateStore store, public RMStateStore() { super(RMStateStore.class.getName()); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - this.readLock = lock.readLock(); - this.writeLock = lock.writeLock(); stateMachine = stateMachineFactory.make(this); } @@ -799,20 +791,18 @@ public void setRMDispatcher(Dispatcher dispatcher) { this.rmDispatcher = dispatcher; } - AsyncDispatcher dispatcher; + MultiDispatcher dispatcher; @SuppressWarnings("rawtypes") @VisibleForTesting protected EventHandler rmStateStoreEventHandler; @Override protected void serviceInit(Configuration conf) throws Exception{ - // create async handler - dispatcher = new AsyncDispatcher("RM StateStore dispatcher"); + dispatcher = new MultiDispatcher("RM State Store"); dispatcher.init(conf); rmStateStoreEventHandler = new ForwardingEventHandler(); dispatcher.register(RMStateStoreEventType.class, rmStateStoreEventHandler); - dispatcher.setDrainEventsOnStop(); // read the base epoch value from conf baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH, YarnConfiguration.DEFAULT_RM_EPOCH); @@ -1330,7 +1320,6 @@ protected boolean isFencedState() { // Dispatcher related code protected void handleStoreEvent(RMStateStoreEvent event) { - this.writeLock.lock(); try { LOG.debug("Processing event of type {}", event.getType()); @@ -1346,8 +1335,6 @@ protected void handleStoreEvent(RMStateStoreEvent event) { } catch (InvalidStateTransitionException e) { LOG.error("Can't handle this event at current state", e); - } finally { - this.writeLock.unlock(); } } @@ -1441,12 +1428,7 @@ public void setResourceManager(ResourceManager rm) { } public RMStateStoreState getRMStateStoreState() { - this.readLock.lock(); - try { - return this.stateMachine.getCurrentState(); - } finally { - this.readLock.unlock(); - } + return this.stateMachine.getCurrentState(); } @SuppressWarnings("rawtypes") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java index 3399431cf040f..c30036bdd75e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java @@ -31,4 +31,9 @@ public RMStateStoreAppAttemptEvent(ApplicationAttemptStateData attemptState) { public ApplicationAttemptStateData getAppAttemptState() { return attemptState; } + + @Override + public String getLockKey() { + return attemptState.getAttemptId().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java index 50e59f72610be..9033a2357727a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java @@ -32,4 +32,9 @@ public RMStateStoreAppEvent(ApplicationStateData appState) { public ApplicationStateData getAppState() { return appState; } + + @Override + public String getLockKey() { + return appState.getApplicationSubmissionContext().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java index 7455c39999a8f..c5edeef8ddfd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java @@ -34,4 +34,9 @@ public class RMStateStoreRemoveAppAttemptEvent extends RMStateStoreEvent { public ApplicationAttemptId getApplicationAttemptId() { return applicationAttemptId; } + + @Override + public String getLockKey() { + return applicationAttemptId.getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java index fbba64c8783e1..d184d8d785ae0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java @@ -31,4 +31,9 @@ public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent { public ApplicationStateData getAppState() { return appState; } + + @Override + public String getLockKey() { + return appState.getApplicationSubmissionContext().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java index 14f8e9d47fcf6..62fd84baeef56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java @@ -33,4 +33,9 @@ public RMStateUpdateAppAttemptEvent( public ApplicationAttemptStateData getAppAttemptState() { return attemptState; } + + @Override + public String getLockKey() { + return attemptState.getAttemptId().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java index 1c156df0fcb14..4b1d73004d670 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java @@ -60,4 +60,9 @@ public boolean isNotifyApplication() { public SettableFuture getResult() { return future; } + + @Override + public String getLockKey() { + return appState.getApplicationSubmissionContext().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 17737e59c2b69..35e8fb66eb997 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -73,7 +73,6 @@ class TestFileSystemRMStore extends FileSystemRMStateStore { init(conf); Assert.assertNull(fs); assertTrue(workingDirPathURI.equals(fsWorkingPath)); - dispatcher.disableExitOnDispatchException(); start(); Assert.assertNotNull(fs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index e93599dd47ec4..1dc929cba6592 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -174,7 +174,6 @@ public RMStateStore getRMStateStore() throws Exception { stateStore = new LeveldbRMStateStore(); stateStore.init(conf); stateStore.start(); - stateStore.dispatcher.disableExitOnDispatchException(); return stateStore; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 404fae9d853f0..bcabafae150fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -148,7 +148,6 @@ class TestZKRMStateStoreInternal extends ZKRMStateStore { throws Exception { setResourceManager(new ResourceManager()); init(conf); - dispatcher.disableExitOnDispatchException(); start(); assertTrue(znodeWorkingPath.equals(workingZnode)); } From f3dbab4b2faa880ceb9ce1ca538e645822d5aef1 Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Thu, 22 Feb 2024 09:26:38 +0100 Subject: [PATCH 02/10] YARN-11656 RMStateStore event queue blocked - fix failing test - some test using RMStateStore without start it --- .../event/multidispatcher/MultiDispatcher.java | 11 +++-------- .../MultiDispatcherLibrary.java | 12 +++++++----- .../yarn/event/multidispatcher/MockEvent.java | 18 ++++++++++++++++++ .../event/multidispatcher/MockEventType.java | 18 ++++++++++++++++++ 4 files changed, 46 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java index f07ac7496bf3c..4d7a6c06d0253 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java @@ -77,13 +77,8 @@ public MultiDispatcher(String dispatcherName) { @Override protected void serviceInit(Configuration conf) throws Exception{ super.serviceInit(conf); - this.config = new MultiDispatcherConfig(getConfig(), this.dispatcherName); - this.eventQueue = new LinkedBlockingQueue<>(this.config.getQueueSize()); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); + this.config = new MultiDispatcherConfig(getConfig(), dispatcherName); + this.eventQueue = new LinkedBlockingQueue<>(config.getQueueSize()); createWorkerPool(); createMonitorThread(); DefaultMetricsSystem.instance().register( @@ -134,8 +129,8 @@ private Runnable createRunnable(Event event, EventHandler handler) { try { handler.handle(event); } finally { - metrics.updateRate(event.getType(), clock.getTime() - start); locks.unLock(event); + metrics.updateRate(event.getType(), clock.getTime() - start); metrics.removeEvent(event.getType()); } }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java index c2a309f3a83a2..25126e6e788fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java @@ -24,20 +24,22 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; - +/** + * Stores {@link EventHandler} for {@link Event} in {@link MultiDispatcher} + */ class MultiDispatcherLibrary { - private final Map LIB = new HashMap<>(); + private final Map lib = new HashMap<>(); public EventHandler getEventHandler(Event e) { - EventHandler handler = LIB.get(e.getType().getClass().getCanonicalName()); + EventHandler handler = lib.get(e.getType().getClass().getCanonicalName()); if (handler == null) { - throw new Error("EventHandler for " + e.getType() + ", was not found in " + LIB.keySet()); + throw new Error("EventHandler for " + e.getType() + ", was not found in " + lib.keySet()); } return handler; } public void register(Class eventType, EventHandler handler) { - LIB.put(eventType.getCanonicalName(), handler); + lib.put(eventType.getCanonicalName(), handler); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java index 22418f2b9a8a9..8d899ea3f6f78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.yarn.event.multidispatcher; import java.util.Arrays; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java index e618f9c3d87c3..f9c96968c47d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.yarn.event.multidispatcher; enum MockEventType { From 576c4fac8ee9ca3b7cfb23423dd8647b5ce10f07 Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Thu, 22 Feb 2024 14:50:42 +0100 Subject: [PATCH 03/10] YARN-11656 RMStateStore event queue blocked - fix javadoc --- .../hadoop/yarn/event/multidispatcher/MultiDispatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java index 4d7a6c06d0253..f882162dbbc38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java @@ -40,7 +40,7 @@ /** * Dispatches {@link Event}s in a parallel thread. - * The {@link this#getEventHandler()} method can be used to post an event to the dispatcher. + * The {@link Dispatcher#getEventHandler()} method can be used to post an event to the dispatcher. * The posted event will be added to the event queue what is polled by many thread, * based on the config values in the {@link MultiDispatcherConfig}. * The posted events can be parallel executed, From cf3fe1c315017521747817146edd745aa808d7e5 Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Sat, 24 Feb 2024 09:09:57 +0100 Subject: [PATCH 04/10] YARN-11656 RMStateStore event queue blocked - fix metric --- .../hadoop/yarn/event/multidispatcher/MultiDispatcher.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java index f882162dbbc38..e296355abe92c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java @@ -129,8 +129,9 @@ private Runnable createRunnable(Event event, EventHandler handler) { try { handler.handle(event); } finally { + long end = clock.getTime(); locks.unLock(event); - metrics.updateRate(event.getType(), clock.getTime() - start); + metrics.updateRate(event.getType(), end - start); metrics.removeEvent(event.getType()); } }; From 481125759af3c364d7c3a3e47b621ea9f033bb73 Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Thu, 29 Feb 2024 11:08:29 +0100 Subject: [PATCH 05/10] YARN-11656 RMStateStore event queue blocked - fix test - remove locks - add hash based dispatching --- .../multidispatcher/MultiDispatcher.java | 107 +++++++--------- .../MultiDispatcherConfig.java | 14 +-- .../MultiDispatcherExecutor.java | 115 ++++++++++++++++++ .../multidispatcher/MultiDispatcherLocks.java | 68 ----------- .../yarn/metrics/DispatcherEventMetrics.java | 67 +--------- .../metrics/DispatcherEventMetricsImpl.java | 91 ++++++++++++++ .../metrics/DispatcherEventMetricsNoOps.java | 56 +++++++++ .../multidispatcher/TestMultiDispatcher.java | 2 +- 8 files changed, 315 insertions(+), 205 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLocks.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java index e296355abe92c..40799df77b9c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java @@ -18,11 +18,10 @@ package org.apache.hadoop.yarn.event.multidispatcher; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,80 +34,73 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.metrics.DispatcherEventMetrics; +import org.apache.hadoop.yarn.metrics.DispatcherEventMetricsImpl; +import org.apache.hadoop.yarn.metrics.DispatcherEventMetricsNoOps; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.MonotonicClock; /** * Dispatches {@link Event}s in a parallel thread. * The {@link Dispatcher#getEventHandler()} method can be used to post an event to the dispatcher. - * The posted event will be added to the event queue what is polled by many thread, - * based on the config values in the {@link MultiDispatcherConfig}. - * The posted events can be parallel executed, - * if the result of the {@link Event#getLockKey()} is different between 2 event. - * If the method return with null, then a global semaphore will be used for these events. - * The method usually returns with the applicationId based on the concept - * parallel apps should not affect each others. - * The locking logic is implemented in {@link MultiDispatcherLocks}. - * The Dispatcher provides metric data using the {@link DispatcherEventMetrics} + * The posted event will be separated based on the hashcode of the {@link Event#getLockKey()}. + * The {@link MultiDispatcherConfig} contains the information, + * how many thread will be used, for parallel execution. + * The {@link MultiDispatcherExecutor} contains the worker threads, which handle the events. + * The {@link MultiDispatcherLibrary} contains the information, + * how to pair event types with {@link EventHandler}s + * The Dispatcher provides metric data using the {@link DispatcherEventMetricsImpl}. */ public class MultiDispatcher extends AbstractService implements Dispatcher { - private final Logger LOG; + private final Logger log; private final String dispatcherName; - private final DispatcherEventMetrics metrics; - private final MultiDispatcherLocks locks; private final MultiDispatcherLibrary library; private final Clock clock = new MonotonicClock(); - private MultiDispatcherConfig config; - private BlockingQueue eventQueue; - private ThreadPoolExecutor threadPoolExecutor; + private MultiDispatcherExecutor workerExecutor; private ScheduledThreadPoolExecutor monitorExecutor; + private DispatcherEventMetrics metrics; public MultiDispatcher(String dispatcherName) { super("Dispatcher"); this.dispatcherName = dispatcherName.replaceAll(" ", "-").toLowerCase(); - this.LOG = LoggerFactory.getLogger(MultiDispatcher.class.getCanonicalName() + "." + this.dispatcherName); - this.metrics = new DispatcherEventMetrics(this.dispatcherName); - this.locks = new MultiDispatcherLocks(this.LOG); + this.log = LoggerFactory.getLogger(MultiDispatcher.class.getCanonicalName() + "." + this.dispatcherName); + this.metrics = new DispatcherEventMetricsImpl(this.dispatcherName); this.library = new MultiDispatcherLibrary(); } @Override protected void serviceInit(Configuration conf) throws Exception{ super.serviceInit(conf); - this.config = new MultiDispatcherConfig(getConfig(), dispatcherName); - this.eventQueue = new LinkedBlockingQueue<>(config.getQueueSize()); - createWorkerPool(); - createMonitorThread(); - DefaultMetricsSystem.instance().register( - "Event metrics for " + dispatcherName, - "Event metrics for " + dispatcherName, - metrics - ); + MultiDispatcherConfig config = new MultiDispatcherConfig(getConfig(), dispatcherName); + workerExecutor = new MultiDispatcherExecutor(log, config, dispatcherName); + createMonitorThread(config); + if (config.getMetricsEnabled()) { + metrics = new DispatcherEventMetricsImpl(dispatcherName); + DefaultMetricsSystem.instance().register( + "Event metrics for " + dispatcherName, + "Event metrics for " + dispatcherName, + metrics + ); + } else { + metrics = new DispatcherEventMetricsNoOps(log); + } } @Override protected void serviceStop() throws Exception { - if (monitorExecutor != null) { - monitorExecutor.shutdownNow(); - } - threadPoolExecutor.shutdown(); - threadPoolExecutor.awaitTermination(config.getGracefulStopSeconds(), TimeUnit.SECONDS); - int terminatedSize = threadPoolExecutor.shutdownNow().size(); - if (0 < terminatedSize) { - LOG.error("{} tasks not finished in time, so they were terminated", terminatedSize); - } + workerExecutor.stop(); } @Override public EventHandler getEventHandler() { return event -> { if (isInState(STATE.STOPPED)) { - LOG.warn("Discard event {} because stopped state", event); + log.warn("Discard event {} because stopped state", event); } else { EventHandler handler = library.getEventHandler(event); - threadPoolExecutor.execute(createRunnable(event, handler)); + Runnable runnable = createRunnable(event, handler); + workerExecutor.execute(event, runnable); metrics.addEvent(event.getType()); } }; @@ -122,35 +114,17 @@ public void register(Class eventType, EventHandler handler) { private Runnable createRunnable(Event event, EventHandler handler) { return () -> { - LOG.debug("{} handle {}, queue size: {}", - Thread.currentThread().getName(), event.getClass().getSimpleName(), eventQueue.size()); - locks.lock(event); long start = clock.getTime(); try { handler.handle(event); } finally { - long end = clock.getTime(); - locks.unLock(event); - metrics.updateRate(event.getType(), end - start); + metrics.updateRate(event.getType(), clock.getTime() - start); metrics.removeEvent(event.getType()); } }; } - private void createWorkerPool() { - this.threadPoolExecutor = new ThreadPoolExecutor( - config.getDefaultPoolSize(), - config.getMaxPoolSize(), - config.getKeepAliveSeconds(), - TimeUnit.SECONDS, - eventQueue, - new BasicThreadFactory.Builder() - .namingPattern(this.dispatcherName + "-worker-%d") - .build() - ); - } - - private void createMonitorThread() { + private void createMonitorThread(MultiDispatcherConfig config) { int interval = config.getMonitorSeconds(); if (interval < 1) { return; @@ -161,11 +135,14 @@ private void createMonitorThread() { .namingPattern(this.dispatcherName + "-monitor-%d") .build()); monitorExecutor.scheduleAtFixedRate(() -> { - int size = eventQueue.size(); - if (0 < size) { - LOG.info("Event queue size is {}", size); - LOG.debug("Metrics: {}", metrics); + List notEmptyQueues = workerExecutor.getQueueSize().entrySet().stream() + .filter(e -> 0 < e.getValue()) + .map(e -> String.format("%s has queue size %d", e.getKey(), e.getValue())) + .collect(Collectors.toList()); + if (!notEmptyQueues.isEmpty()) { + log.info("Event queue sizes: {}", notEmptyQueues); } + log.debug("Metrics: {}", metrics); },10, interval, TimeUnit.SECONDS); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java index cedcb8b8d315e..00110343e1aa7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java @@ -36,16 +36,8 @@ public int getDefaultPoolSize() { return super.getInt(prefix + "default-pool-size", 4); } - public int getMaxPoolSize() { - return super.getInt(prefix + "max-pool-size", 8); - } - - public int getKeepAliveSeconds() { - return super.getInt(prefix + "keep-alive-seconds", 10); - } - public int getQueueSize() { - return super.getInt(prefix + "queue-size", 1_000_000); + return super.getInt(prefix + "queue-size", 10_000_000); } public int getMonitorSeconds() { @@ -55,4 +47,8 @@ public int getMonitorSeconds() { public int getGracefulStopSeconds() { return super.getInt(prefix + "graceful-stop-seconds", 60); } + + public boolean getMetricsEnabled() { + return super.getBoolean(prefix + "metrics-enabled", false); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java new file mode 100644 index 0000000000000..b0f74ea02c48d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +import org.slf4j.Logger; + +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; + +/** + * This class contains the thread which process the {@link MultiDispatcher}'s events. + */ +public class MultiDispatcherExecutor { + + private final Logger LOG; + private final MultiDispatcherConfig config; + private final MultiDispatcherExecutorThread[] threads; + private final Clock clock = new MonotonicClock(); + + public MultiDispatcherExecutor( + Logger LOG, + MultiDispatcherConfig config, + String dispatcherName + ) { + this.LOG = LOG; + this.config = config; + this.threads = new MultiDispatcherExecutorThread[config.getDefaultPoolSize()]; + ThreadGroup group = new ThreadGroup(dispatcherName); + for (int i=0; i < threads.length; ++i) { + threads[i] = new MultiDispatcherExecutorThread(group, i, config.getQueueSize()); + threads[i].start(); + } + } + + public void execute(Event event, Runnable runnable) { + String lockKey = event.getLockKey(); + int threadIndex = lockKey == null ? 0 : lockKey.hashCode() % threads.length; + MultiDispatcherExecutorThread thread = threads[threadIndex]; + thread.add(runnable); + LOG.debug("The {} with lock key {} will be handled by {}", + event.getType(), lockKey, thread.getName()); + } + + public void stop() throws InterruptedException { + long timeOut = clock.getTime() + config.getGracefulStopSeconds() * 1_000L; + // if not all queue is empty + if (!Arrays.stream(threads).allMatch(t -> t.queue.isEmpty()) + // and not timeout yet + && clock.getTime() < timeOut) { + LOG.debug("Not all event queue is empty, waiting to drain ..."); + Thread.sleep(1_000); + } + for (MultiDispatcherExecutorThread thread : threads) { + thread.interrupt(); + } + } + + public Map getQueueSize() { + return Arrays.stream(threads).collect(Collectors.toMap( + MultiDispatcherExecutorThread::getName, + MultiDispatcherExecutorThread::queueSize + )); + } + + private final class MultiDispatcherExecutorThread extends Thread { + private final BlockingQueue queue; + + MultiDispatcherExecutorThread(ThreadGroup group, int index, int queueSize) { + super(group, String.format("Thread-%d", index)); + this.queue = new LinkedBlockingQueue<>(queueSize); + } + + void add(Runnable runnable) { + queue.add(runnable); + } + + long queueSize() { + return queue.size(); + } + + @Override + public void run() { + try { + while (true) { + queue.take().run(); + } + } catch (InterruptedException e) { + LOG.warn("{} get interrupted", getName()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLocks.java deleted file mode 100644 index 0181316b5c690..0000000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLocks.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.event.multidispatcher; - -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.slf4j.Logger; - -import org.apache.hadoop.yarn.event.Event; - -/** - * The locking logic of the {@link MultiDispatcher} - */ -class MultiDispatcherLocks { - private final Logger LOG; - private final Set lockKeys= ConcurrentHashMap.newKeySet(); - private final Lock globalLock = new ReentrantLock(); - - public MultiDispatcherLocks(Logger LOG) { - this.LOG = LOG; - } - - void lock(Event event) { - String lockKey = event.getLockKey(); - debugLockLog(lockKey, "lock", event); - if (lockKey == null) { - globalLock.lock(); - } else { - lockKeys.add(lockKey); - } - debugLockLog(lockKey, "locked", event); - } - - void unLock(Event event) { - String lockKey = event.getLockKey(); - debugLockLog(lockKey, "unlock", event); - if (lockKey == null) { - globalLock.unlock(); - } else { - lockKeys.remove(lockKey); - } - debugLockLog(lockKey, "unlocked", event); - } - - private void debugLockLog(String lockKey, String command, Event event) { - LOG.debug("{} {} with thread {} for event {}", - lockKey, command, Thread.currentThread().getName(), event.getClass().getSimpleName()); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java index 7c79eb5e5e699..e99fdfddc20e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java @@ -17,72 +17,15 @@ */ package org.apache.hadoop.yarn.metrics; -import java.util.HashMap; -import java.util.Map; -import java.util.StringJoiner; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.Interns; -import org.apache.hadoop.metrics2.lib.MetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableGaugeLong; -import org.apache.hadoop.metrics2.lib.MutableRate; - -@InterfaceAudience.Private -@Metrics(context="yarn") -public class DispatcherEventMetrics implements MetricsSource { - - private final Map currentEventCountMetrics; - private final Map processingTimeMetrics; - private final MetricsRegistry registry; - - public DispatcherEventMetrics(String name) { - this.currentEventCountMetrics = new HashMap<>(); - this.processingTimeMetrics = new HashMap<>(); - this.registry = new MetricsRegistry(Interns.info( - "DispatcherEventMetrics for " + name, - "DispatcherEventMetrics for " + name - )); - } - - @Override - public void getMetrics(MetricsCollector collector, boolean all) { - registry.snapshot(collector.addRecord(registry.info()), all); - } - - public void init(Class typeClass) { - for(Object c : typeClass.getEnumConstants()) { - String key = createKey(c); - currentEventCountMetrics.put(key, this.registry.newGauge( - key + "_Current", key + "_Current", 0L)); - processingTimeMetrics.put(key, this.registry.newRate(key + "_", key+ "_")); - } - } - - public void addEvent(Object type) { - currentEventCountMetrics.get(createKey(type)).incr(); - } - public void removeEvent(Object type) { - currentEventCountMetrics.get(createKey(type)).decr(); - } +public interface DispatcherEventMetrics extends MetricsSource { - public void updateRate(Object type, long millisecond) { - processingTimeMetrics.get(createKey(type)).add(millisecond); - } + void init(Class typeClass); - private String createKey(Object constant) { - return constant.getClass().getSimpleName() + "#" + constant; - } + void addEvent(Object type); - @Override - public String toString() { - return new StringJoiner(", ") - .add("currentEventCountMetrics=" + currentEventCountMetrics) - .add("processingTimeMetrics=" + processingTimeMetrics) - .toString(); - } + void removeEvent(Object type); + void updateRate(Object type, long millisecond); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java new file mode 100644 index 0000000000000..878d56c880c8e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.StringJoiner; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class DispatcherEventMetricsImpl implements DispatcherEventMetrics { + + private final Map currentEventCountMetrics; + private final Map processingTimeMetrics; + private final MetricsRegistry registry; + + public DispatcherEventMetricsImpl(String name) { + this.currentEventCountMetrics = new HashMap<>(); + this.processingTimeMetrics = new HashMap<>(); + this.registry = new MetricsRegistry(Interns.info( + "DispatcherEventMetrics for " + name, + "DispatcherEventMetrics for " + name + )); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + @Override + public void init(Class typeClass) { + for(Object c : typeClass.getEnumConstants()) { + String key = createKey(c); + currentEventCountMetrics.put(key, this.registry.newGauge( + key + "_Current", key + "_Current", 0L)); + processingTimeMetrics.put(key, this.registry.newRate(key + "_", key+ "_")); + } + } + + @Override + public void addEvent(Object type) { + currentEventCountMetrics.get(createKey(type)).incr(); + } + + @Override + public void removeEvent(Object type) { + currentEventCountMetrics.get(createKey(type)).decr(); + } + + @Override + public void updateRate(Object type, long millisecond) { + processingTimeMetrics.get(createKey(type)).add(millisecond); + } + + private String createKey(Object constant) { + return constant.getClass().getSimpleName() + "#" + constant; + } + + @Override + public String toString() { + return new StringJoiner(", ") + .add("currentEventCountMetrics=" + currentEventCountMetrics) + .add("processingTimeMetrics=" + processingTimeMetrics) + .toString(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java new file mode 100644 index 0000000000000..6ec502eee8c76 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.metrics; + +import org.slf4j.Logger; + +import org.apache.hadoop.metrics2.MetricsCollector; + +public class DispatcherEventMetricsNoOps implements DispatcherEventMetrics { + + private final Logger log; + + public DispatcherEventMetricsNoOps(Logger log) { + this.log = log; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + log.debug("called getMetrics"); + } + + @Override + public void init(Class typeClass) { + log.debug("called init"); + } + + @Override + public void addEvent(Object type) { + log.debug("called addEvent"); + } + + @Override + public void removeEvent(Object type) { + log.debug("called removeEvent"); + } + + @Override + public void updateRate(Object type, long millisecond) { + log.debug("called updateRate"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java index 1e1b6c10bd28c..caf8ae846fd0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java @@ -31,7 +31,7 @@ public class TestMultiDispatcher { @Test(timeout = 5_000) - public void testHandle() throws Exception { + public void testHandle() { MultiDispatcher dispatcher = new MultiDispatcher("Test"); assertEquals(Service.STATE.NOTINITED, dispatcher.getServiceState()); dispatcher.init(new Configuration()); From f0788b5a11a17fa254c739702510509b0f121520 Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Thu, 29 Feb 2024 11:12:38 +0100 Subject: [PATCH 06/10] YARN-11656 RMStateStore event queue blocked - some style fix --- .../yarn/event/multidispatcher/MultiDispatcherExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java index b0f74ea02c48d..85ce29a0cf282 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java @@ -67,7 +67,7 @@ public void execute(Event event, Runnable runnable) { public void stop() throws InterruptedException { long timeOut = clock.getTime() + config.getGracefulStopSeconds() * 1_000L; // if not all queue is empty - if (!Arrays.stream(threads).allMatch(t -> t.queue.isEmpty()) + if (Arrays.stream(threads).anyMatch(t -> 0 < t.queueSize()) // and not timeout yet && clock.getTime() < timeOut) { LOG.debug("Not all event queue is empty, waiting to drain ..."); From 3b56adeff5d29c360e3a720c5ddc67d83275ef15 Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Thu, 29 Feb 2024 14:33:09 +0100 Subject: [PATCH 07/10] YARN-11656 RMStateStore event queue blocked - fix thread name - sort log records in monitor --- .../hadoop/yarn/event/multidispatcher/MultiDispatcher.java | 3 ++- .../yarn/event/multidispatcher/MultiDispatcherExecutor.java | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java index 40799df77b9c9..2b3c40c46b3b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java @@ -135,9 +135,10 @@ private void createMonitorThread(MultiDispatcherConfig config) { .namingPattern(this.dispatcherName + "-monitor-%d") .build()); monitorExecutor.scheduleAtFixedRate(() -> { - List notEmptyQueues = workerExecutor.getQueueSize().entrySet().stream() + List notEmptyQueues = workerExecutor.getQueuesSize().entrySet().stream() .filter(e -> 0 < e.getValue()) .map(e -> String.format("%s has queue size %d", e.getKey(), e.getValue())) + .sorted() .collect(Collectors.toList()); if (!notEmptyQueues.isEmpty()) { log.info("Event queue sizes: {}", notEmptyQueues); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java index 85ce29a0cf282..a0c3f6f90bb1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java @@ -78,7 +78,7 @@ public void stop() throws InterruptedException { } } - public Map getQueueSize() { + public Map getQueuesSize() { return Arrays.stream(threads).collect(Collectors.toMap( MultiDispatcherExecutorThread::getName, MultiDispatcherExecutorThread::queueSize @@ -89,7 +89,7 @@ private final class MultiDispatcherExecutorThread extends Thread { private final BlockingQueue queue; MultiDispatcherExecutorThread(ThreadGroup group, int index, int queueSize) { - super(group, String.format("Thread-%d", index)); + super(group, String.format("%s-worker-%d", group.getName(), index)); this.queue = new LinkedBlockingQueue<>(queueSize); } From e4b7b53d9c5e4e73e53789dbb97702207e112be5 Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Thu, 29 Feb 2024 15:20:18 +0100 Subject: [PATCH 08/10] YARN-11656 RMStateStore event queue blocked - fix out of array exception --- .../yarn/event/multidispatcher/MultiDispatcherExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java index a0c3f6f90bb1a..29b33d1b93cc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java @@ -57,7 +57,7 @@ public MultiDispatcherExecutor( public void execute(Event event, Runnable runnable) { String lockKey = event.getLockKey(); - int threadIndex = lockKey == null ? 0 : lockKey.hashCode() % threads.length; + int threadIndex = lockKey == null ? 0 : Math.abs(lockKey.hashCode()) % threads.length; MultiDispatcherExecutorThread thread = threads[threadIndex]; thread.add(runnable); LOG.debug("The {} with lock key {} will be handled by {}", From 06542118fd6c6cb9f43683720f86a263c86abcf0 Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Fri, 8 Mar 2024 19:05:57 +0100 Subject: [PATCH 09/10] YARN-11656 RMStateStore event queue blocked - fix tests --- .../recovery/RMStateStoreTestBase.java | 80 ++++++++++++------- .../recovery/TestZKRMStateStore.java | 9 ++- .../recovery/TestZKRMStateStorePerf.java | 2 +- 3 files changed, 57 insertions(+), 34 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 1773c7aaa88e7..7416e829b6f12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -33,6 +33,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.crypto.SecretKey; @@ -74,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -99,9 +102,8 @@ public class RMStateStoreTestBase { static class TestDispatcher implements Dispatcher, EventHandler { - ApplicationAttemptId attemptId; - - boolean notified = false; + private final Set handledAttempt = ConcurrentHashMap.newKeySet(); + private final Set handledApps = ConcurrentHashMap.newKeySet(); @SuppressWarnings("rawtypes") @Override @@ -113,11 +115,16 @@ public void register(Class eventType, public void handle(Event event) { if (event instanceof RMAppAttemptEvent) { RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent) event; - assertEquals(attemptId, rmAppAttemptEvent.getApplicationAttemptId()); - } - notified = true; - synchronized (this) { - notifyAll(); + synchronized (handledAttempt) { + handledAttempt.add(rmAppAttemptEvent.getApplicationAttemptId()); + handledAttempt.notifyAll(); + } + } else if (event instanceof RMAppEvent) { + RMAppEvent rmAppEvent = (RMAppEvent) event; + synchronized (handledApps) { + handledApps.add(rmAppEvent.getApplicationId()); + handledApps.notifyAll(); + } } } @@ -127,6 +134,38 @@ public EventHandler getEventHandler() { return this; } + void waitNotify(ApplicationAttemptId attemptId) { + long startTime = System.currentTimeMillis(); + while(!handledAttempt.contains(attemptId)) { + synchronized (handledAttempt) { + try { + handledAttempt.wait(100); + } catch (InterruptedException e) { + LOG.trace("Interrupted", e); + } + } + if(System.currentTimeMillis() - startTime > 1000*60) { + fail("Timed out attempt store notification"); + } + } + } + + void waitNotify(ApplicationId applicationId) { + long startTime = System.currentTimeMillis(); + while(!handledApps.contains(applicationId)) { + synchronized (handledApps) { + try { + handledApps.wait(100); + } catch (InterruptedException e) { + LOG.trace("Interrupted", e); + } + } + if(System.currentTimeMillis() - startTime > 1000*60) { + fail("Timed out attempt store notification"); + } + } + } + } public static class StoreStateVerifier { @@ -148,23 +187,6 @@ public long getEpochRange() { return epochRange; } - void waitNotify(TestDispatcher dispatcher) { - long startTime = System.currentTimeMillis(); - while(!dispatcher.notified) { - synchronized (dispatcher) { - try { - dispatcher.wait(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - if(System.currentTimeMillis() - startTime > 1000*60) { - fail("Timed out attempt store notification"); - } - } - dispatcher.notified = false; - } - protected RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime, long startTime) throws Exception { ApplicationSubmissionContext context = @@ -204,17 +226,15 @@ protected RMAppAttempt storeAttempt(RMStateStore store, .thenReturn(mockRmAppAttemptMetrics); when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage()) .thenReturn(new AggregateAppResourceUsage(new HashMap<>())); - dispatcher.attemptId = attemptId; store.storeNewApplicationAttempt(mockAttempt); - waitNotify(dispatcher); + dispatcher.waitNotify(attemptId); return mockAttempt; } protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher, ApplicationAttemptStateData attemptState) { - dispatcher.attemptId = attemptState.getAttemptId(); store.updateApplicationAttemptState(attemptState); - waitNotify(dispatcher); + dispatcher.waitNotify(attemptState.getAttemptId()); } void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) @@ -642,6 +662,8 @@ private ArrayList createAndStoreApps( public void testDeleteStore(RMStateStoreHelper stateStoreHelper) throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); ArrayList appList = createAndStoreApps(stateStoreHelper, store, 5); store.deleteStore(); // verify apps deleted diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index bcabafae150fb..0f3512ccf30cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -536,6 +536,7 @@ public void testFencedState() throws Exception { RMApp mockApp = mock(RMApp.class); ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); + context.setApplicationId(mock(ApplicationId.class)); when(mockApp.getSubmitTime()).thenReturn(submitTime); when(mockApp.getStartTime()).thenReturn(startTime); when(mockApp.getApplicationSubmissionContext()).thenReturn(context); @@ -875,7 +876,7 @@ private void finishAppWithAttempts(RMState state, RMStateStore store, appStateNew.attempts.putAll(appState.attempts); } store.updateApplicationState(appStateNew); - waitNotify(dispatcher); + dispatcher.waitNotify(appId); Container container = new ContainerPBImpl(); container.setId(ContainerId.newContainerId(attemptId, 1)); ApplicationAttemptStateData newAttemptState = @@ -893,7 +894,7 @@ private void storeAppWithAttempts(RMStateStore store, private void storeApp(RMStateStore store, TestDispatcher dispatcher, ApplicationId appId, long submitTime, long startTime) throws Exception { storeApp(store, appId, submitTime, startTime); - waitNotify(dispatcher); + dispatcher.waitNotify(appId); } private void storeAppWithAttempts(RMStateStore store, @@ -1000,7 +1001,7 @@ public void testAppNodeSplit() throws Exception { // Store app2 with app id application_1352994193343_120213. ApplicationId appId21 = ApplicationId.newInstance(1352994193343L, 120213); storeApp(store, appId21, submitTime, startTime); - waitNotify(dispatcher); + dispatcher.waitNotify(appId21); // Store another app which will be removed. ApplicationId appIdRemoved = ApplicationId.newInstance(1352994193343L, 2); @@ -1170,7 +1171,7 @@ public void testAppNodeSplitChangeAcrossRestarts() throws Exception { ApplicationId appId71 = ApplicationId.newInstance(1442994195087L, 7); //storeApp(store, dispatcher, appId71, submitTime, startTime); storeApp(store, appId71, submitTime, startTime); - waitNotify(dispatcher); + dispatcher.waitNotify(appId71); ApplicationAttemptId attemptId71 = ApplicationAttemptId.newInstance(appId71, 1); storeAttempt(store, ApplicationAttemptId.newInstance(appId71, 1), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java index 3cb428c5c5960..5e8c5f99ecbb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java @@ -192,7 +192,7 @@ public int run(String[] args) { + e.getMessage()); return -1; } - waitNotify(dispatcher); + dispatcher.waitNotify(appId); rmApps.add(app); } From 465a0de4f481ee2dc7383e6fb40b93e32135ba3c Mon Sep 17 00:00:00 2001 From: Bence Kosztolnik Date: Sat, 9 Mar 2024 10:52:01 +0100 Subject: [PATCH 10/10] YARN-11656 RMStateStore event queue blocked - fix style --- .../multidispatcher/MultiDispatcher.java | 7 ++++- .../MultiDispatcherConfig.java | 27 +++++++++++++++++-- .../MultiDispatcherExecutor.java | 25 ++++++++++------- .../yarn/metrics/DispatcherEventMetrics.java | 21 +++++++++++++++ .../metrics/DispatcherEventMetricsImpl.java | 15 ++++++----- .../metrics/DispatcherEventMetricsNoOps.java | 13 +++++---- 6 files changed, 85 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java index 2b3c40c46b3b1..1d1f84b0268a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java @@ -43,6 +43,8 @@ * Dispatches {@link Event}s in a parallel thread. * The {@link Dispatcher#getEventHandler()} method can be used to post an event to the dispatcher. * The posted event will be separated based on the hashcode of the {@link Event#getLockKey()}. + * If the getLockKey() method returns with null, + * then the first executor thread will be used as default worker * The {@link MultiDispatcherConfig} contains the information, * how many thread will be used, for parallel execution. * The {@link MultiDispatcherExecutor} contains the worker threads, which handle the events. @@ -65,7 +67,6 @@ public MultiDispatcher(String dispatcherName) { super("Dispatcher"); this.dispatcherName = dispatcherName.replaceAll(" ", "-").toLowerCase(); this.log = LoggerFactory.getLogger(MultiDispatcher.class.getCanonicalName() + "." + this.dispatcherName); - this.metrics = new DispatcherEventMetricsImpl(this.dispatcherName); this.library = new MultiDispatcherLibrary(); } @@ -74,6 +75,7 @@ protected void serviceInit(Configuration conf) throws Exception{ super.serviceInit(conf); MultiDispatcherConfig config = new MultiDispatcherConfig(getConfig(), dispatcherName); workerExecutor = new MultiDispatcherExecutor(log, config, dispatcherName); + workerExecutor.start(); createMonitorThread(config); if (config.getMetricsEnabled()) { metrics = new DispatcherEventMetricsImpl(dispatcherName); @@ -89,6 +91,9 @@ protected void serviceInit(Configuration conf) throws Exception{ @Override protected void serviceStop() throws Exception { + if (monitorExecutor != null) { + monitorExecutor.shutdown(); + } workerExecutor.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java index 00110343e1aa7..f5cf2ada9c39d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java @@ -32,22 +32,45 @@ public MultiDispatcherConfig(Configuration configuration, String dispatcherName) this.prefix = String.format("yarn.dispatcher.multi-thread.%s.", dispatcherName); } + /** + * How many executor thread should be created to handle the incoming events + * @return configured value, or default 4 + */ public int getDefaultPoolSize() { return super.getInt(prefix + "default-pool-size", 4); } + /** + * Maximus size of the event queue of the executor threads. + * If limit is reached then the queue#add method will block. + * @return configured value, or default 1_000_000 + */ public int getQueueSize() { - return super.getInt(prefix + "queue-size", 10_000_000); + return super.getInt(prefix + "queue-size", 1_000_000); } + /** + * How frequently the monitor thread should write the state of the dispatcher to the LOG. + * If less than 1 this monitoring will be turned off. + * @return configured value, or default 0 + */ public int getMonitorSeconds() { - return super.getInt(prefix + "monitor-seconds", 30); + return super.getInt(prefix + "monitor-seconds", 0); } + /** + * How long should the dispatcher wait to drain all event queue of workers, + * after stop signal is received. + * @return configured value, or default 60 + */ public int getGracefulStopSeconds() { return super.getInt(prefix + "graceful-stop-seconds", 60); } + /** + * Dispatcher metrics should be published to the metric system. + * @return configured value, or default false + */ public boolean getMetricsEnabled() { return super.getBoolean(prefix + "metrics-enabled", false); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java index 29b33d1b93cc9..9725c25c226c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java @@ -35,32 +35,39 @@ */ public class MultiDispatcherExecutor { - private final Logger LOG; + private final Logger log; private final MultiDispatcherConfig config; private final MultiDispatcherExecutorThread[] threads; private final Clock clock = new MonotonicClock(); public MultiDispatcherExecutor( - Logger LOG, + Logger log, MultiDispatcherConfig config, String dispatcherName ) { - this.LOG = LOG; + this.log = log; this.config = config; this.threads = new MultiDispatcherExecutorThread[config.getDefaultPoolSize()]; ThreadGroup group = new ThreadGroup(dispatcherName); - for (int i=0; i < threads.length; ++i) { + for (int i = 0; i < threads.length; ++i) { threads[i] = new MultiDispatcherExecutorThread(group, i, config.getQueueSize()); - threads[i].start(); + } + } + + public void start() { + for(Thread t : threads) { + t.start(); } } public void execute(Event event, Runnable runnable) { String lockKey = event.getLockKey(); - int threadIndex = lockKey == null ? 0 : Math.abs(lockKey.hashCode()) % threads.length; + // abs of Integer.MIN_VALUE is Integer.MIN_VALUE + int threadIndex = lockKey == null || lockKey.hashCode() == Integer.MIN_VALUE ? + 0 : Math.abs(lockKey.hashCode() % threads.length); MultiDispatcherExecutorThread thread = threads[threadIndex]; thread.add(runnable); - LOG.debug("The {} with lock key {} will be handled by {}", + log.trace("The {} with lock key {} will be handled by {}", event.getType(), lockKey, thread.getName()); } @@ -70,7 +77,7 @@ public void stop() throws InterruptedException { if (Arrays.stream(threads).anyMatch(t -> 0 < t.queueSize()) // and not timeout yet && clock.getTime() < timeOut) { - LOG.debug("Not all event queue is empty, waiting to drain ..."); + log.debug("Not all event queue is empty, waiting to drain ..."); Thread.sleep(1_000); } for (MultiDispatcherExecutorThread thread : threads) { @@ -108,7 +115,7 @@ public void run() { queue.take().run(); } } catch (InterruptedException e) { - LOG.warn("{} get interrupted", getName()); + log.warn("{} get interrupted", getName()); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java index e99fdfddc20e6..f8278e06a1f54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java @@ -19,13 +19,34 @@ import org.apache.hadoop.metrics2.MetricsSource; +/** + * Interface for {@link org.apache.hadoop.yarn.event.Dispatcher} + * can be used to publish {@link org.apache.hadoop.yarn.event.Event} related metrics + */ public interface DispatcherEventMetrics extends MetricsSource { + /** + * Class of the event type what can be handled by the DispatcherEventMetrics + * @param typeClass the event type + */ void init(Class typeClass); + /** + * Call if Event added for dispatching + * @param type type of the event + */ void addEvent(Object type); + /** + * Call if Event handled + * @param type type of the event + */ void removeEvent(Object type); + /** + * Call with how much time was required to handle the event + * @param type type of the event + * @param millisecond time interval + */ void updateRate(Object type, long millisecond); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java index 878d56c880c8e..446afccb0df8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java @@ -29,6 +29,9 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableRate; +/** + * Metric object for {@link org.apache.hadoop.yarn.event.multidispatcher.MultiDispatcher} + */ @InterfaceAudience.Private @Metrics(context="yarn") public class DispatcherEventMetricsImpl implements DispatcherEventMetrics { @@ -53,11 +56,12 @@ public void getMetrics(MetricsCollector collector, boolean all) { @Override public void init(Class typeClass) { - for(Object c : typeClass.getEnumConstants()) { - String key = createKey(c); - currentEventCountMetrics.put(key, this.registry.newGauge( - key + "_Current", key + "_Current", 0L)); - processingTimeMetrics.put(key, this.registry.newRate(key + "_", key+ "_")); + for(Object constant : typeClass.getEnumConstants()) { + String key = createKey(constant); + currentEventCountMetrics.put(key, + registry.newGauge(key + "_Current", key + "_Current", 0L)); + processingTimeMetrics.put(key, + registry.newRate(key + "_", key + "_")); } } @@ -87,5 +91,4 @@ public String toString() { .add("processingTimeMetrics=" + processingTimeMetrics) .toString(); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java index 6ec502eee8c76..04209812e3510 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java @@ -21,6 +21,9 @@ import org.apache.hadoop.metrics2.MetricsCollector; +/** + * Used if metric publication should be disabled + */ public class DispatcherEventMetricsNoOps implements DispatcherEventMetrics { private final Logger log; @@ -31,26 +34,26 @@ public DispatcherEventMetricsNoOps(Logger log) { @Override public void getMetrics(MetricsCollector collector, boolean all) { - log.debug("called getMetrics"); + log.trace("called getMetrics"); } @Override public void init(Class typeClass) { - log.debug("called init"); + log.trace("called init"); } @Override public void addEvent(Object type) { - log.debug("called addEvent"); + log.trace("called addEvent"); } @Override public void removeEvent(Object type) { - log.debug("called removeEvent"); + log.trace("called removeEvent"); } @Override public void updateRate(Object type, long millisecond) { - log.debug("called updateRate"); + log.trace("called updateRate"); } }