diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Event.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Event.java index e76a7530a7eca..ddaa1b37971d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Event.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Event.java @@ -32,4 +32,14 @@ public interface Event> { TYPE getType(); long getTimestamp(); String toString(); + + /** + * In case of parallel execution of events in the same dispatcher, + * the result of this method will be used as semaphore. + * If method returns null, then a default semaphore will be used. + * @return the semaphore + */ + default String getLockKey() { + return null; + }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java new file mode 100644 index 0000000000000..1d1f84b0268a0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.metrics.DispatcherEventMetrics; +import org.apache.hadoop.yarn.metrics.DispatcherEventMetricsImpl; +import org.apache.hadoop.yarn.metrics.DispatcherEventMetricsNoOps; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; + +/** + * Dispatches {@link Event}s in a parallel thread. + * The {@link Dispatcher#getEventHandler()} method can be used to post an event to the dispatcher. + * The posted event will be separated based on the hashcode of the {@link Event#getLockKey()}. + * If the getLockKey() method returns with null, + * then the first executor thread will be used as default worker + * The {@link MultiDispatcherConfig} contains the information, + * how many thread will be used, for parallel execution. + * The {@link MultiDispatcherExecutor} contains the worker threads, which handle the events. + * The {@link MultiDispatcherLibrary} contains the information, + * how to pair event types with {@link EventHandler}s + * The Dispatcher provides metric data using the {@link DispatcherEventMetricsImpl}. + */ +public class MultiDispatcher extends AbstractService implements Dispatcher { + + private final Logger log; + private final String dispatcherName; + private final MultiDispatcherLibrary library; + private final Clock clock = new MonotonicClock(); + + private MultiDispatcherExecutor workerExecutor; + private ScheduledThreadPoolExecutor monitorExecutor; + private DispatcherEventMetrics metrics; + + public MultiDispatcher(String dispatcherName) { + super("Dispatcher"); + this.dispatcherName = dispatcherName.replaceAll(" ", "-").toLowerCase(); + this.log = LoggerFactory.getLogger(MultiDispatcher.class.getCanonicalName() + "." + this.dispatcherName); + this.library = new MultiDispatcherLibrary(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception{ + super.serviceInit(conf); + MultiDispatcherConfig config = new MultiDispatcherConfig(getConfig(), dispatcherName); + workerExecutor = new MultiDispatcherExecutor(log, config, dispatcherName); + workerExecutor.start(); + createMonitorThread(config); + if (config.getMetricsEnabled()) { + metrics = new DispatcherEventMetricsImpl(dispatcherName); + DefaultMetricsSystem.instance().register( + "Event metrics for " + dispatcherName, + "Event metrics for " + dispatcherName, + metrics + ); + } else { + metrics = new DispatcherEventMetricsNoOps(log); + } + } + + @Override + protected void serviceStop() throws Exception { + if (monitorExecutor != null) { + monitorExecutor.shutdown(); + } + workerExecutor.stop(); + } + + @Override + public EventHandler getEventHandler() { + return event -> { + if (isInState(STATE.STOPPED)) { + log.warn("Discard event {} because stopped state", event); + } else { + EventHandler handler = library.getEventHandler(event); + Runnable runnable = createRunnable(event, handler); + workerExecutor.execute(event, runnable); + metrics.addEvent(event.getType()); + } + }; + } + + @Override + public void register(Class eventType, EventHandler handler) { + library.register(eventType, handler); + metrics.init(eventType); + } + + private Runnable createRunnable(Event event, EventHandler handler) { + return () -> { + long start = clock.getTime(); + try { + handler.handle(event); + } finally { + metrics.updateRate(event.getType(), clock.getTime() - start); + metrics.removeEvent(event.getType()); + } + }; + } + + private void createMonitorThread(MultiDispatcherConfig config) { + int interval = config.getMonitorSeconds(); + if (interval < 1) { + return; + } + this.monitorExecutor = new ScheduledThreadPoolExecutor( + 1, + new BasicThreadFactory.Builder() + .namingPattern(this.dispatcherName + "-monitor-%d") + .build()); + monitorExecutor.scheduleAtFixedRate(() -> { + List notEmptyQueues = workerExecutor.getQueuesSize().entrySet().stream() + .filter(e -> 0 < e.getValue()) + .map(e -> String.format("%s has queue size %d", e.getKey(), e.getValue())) + .sorted() + .collect(Collectors.toList()); + if (!notEmptyQueues.isEmpty()) { + log.info("Event queue sizes: {}", notEmptyQueues); + } + log.debug("Metrics: {}", metrics); + },10, interval, TimeUnit.SECONDS); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java new file mode 100644 index 0000000000000..f5cf2ada9c39d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherConfig.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import org.apache.hadoop.conf.Configuration; + +/** + * All the config what can be use in the {@link MultiDispatcher} + */ +class MultiDispatcherConfig extends Configuration { + + private final String prefix; + + public MultiDispatcherConfig(Configuration configuration, String dispatcherName) { + super(configuration); + this.prefix = String.format("yarn.dispatcher.multi-thread.%s.", dispatcherName); + } + + /** + * How many executor thread should be created to handle the incoming events + * @return configured value, or default 4 + */ + public int getDefaultPoolSize() { + return super.getInt(prefix + "default-pool-size", 4); + } + + /** + * Maximus size of the event queue of the executor threads. + * If limit is reached then the queue#add method will block. + * @return configured value, or default 1_000_000 + */ + public int getQueueSize() { + return super.getInt(prefix + "queue-size", 1_000_000); + } + + /** + * How frequently the monitor thread should write the state of the dispatcher to the LOG. + * If less than 1 this monitoring will be turned off. + * @return configured value, or default 0 + */ + public int getMonitorSeconds() { + return super.getInt(prefix + "monitor-seconds", 0); + } + + /** + * How long should the dispatcher wait to drain all event queue of workers, + * after stop signal is received. + * @return configured value, or default 60 + */ + public int getGracefulStopSeconds() { + return super.getInt(prefix + "graceful-stop-seconds", 60); + } + + /** + * Dispatcher metrics should be published to the metric system. + * @return configured value, or default false + */ + public boolean getMetricsEnabled() { + return super.getBoolean(prefix + "metrics-enabled", false); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java new file mode 100644 index 0000000000000..9725c25c226c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherExecutor.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +import org.slf4j.Logger; + +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; + +/** + * This class contains the thread which process the {@link MultiDispatcher}'s events. + */ +public class MultiDispatcherExecutor { + + private final Logger log; + private final MultiDispatcherConfig config; + private final MultiDispatcherExecutorThread[] threads; + private final Clock clock = new MonotonicClock(); + + public MultiDispatcherExecutor( + Logger log, + MultiDispatcherConfig config, + String dispatcherName + ) { + this.log = log; + this.config = config; + this.threads = new MultiDispatcherExecutorThread[config.getDefaultPoolSize()]; + ThreadGroup group = new ThreadGroup(dispatcherName); + for (int i = 0; i < threads.length; ++i) { + threads[i] = new MultiDispatcherExecutorThread(group, i, config.getQueueSize()); + } + } + + public void start() { + for(Thread t : threads) { + t.start(); + } + } + + public void execute(Event event, Runnable runnable) { + String lockKey = event.getLockKey(); + // abs of Integer.MIN_VALUE is Integer.MIN_VALUE + int threadIndex = lockKey == null || lockKey.hashCode() == Integer.MIN_VALUE ? + 0 : Math.abs(lockKey.hashCode() % threads.length); + MultiDispatcherExecutorThread thread = threads[threadIndex]; + thread.add(runnable); + log.trace("The {} with lock key {} will be handled by {}", + event.getType(), lockKey, thread.getName()); + } + + public void stop() throws InterruptedException { + long timeOut = clock.getTime() + config.getGracefulStopSeconds() * 1_000L; + // if not all queue is empty + if (Arrays.stream(threads).anyMatch(t -> 0 < t.queueSize()) + // and not timeout yet + && clock.getTime() < timeOut) { + log.debug("Not all event queue is empty, waiting to drain ..."); + Thread.sleep(1_000); + } + for (MultiDispatcherExecutorThread thread : threads) { + thread.interrupt(); + } + } + + public Map getQueuesSize() { + return Arrays.stream(threads).collect(Collectors.toMap( + MultiDispatcherExecutorThread::getName, + MultiDispatcherExecutorThread::queueSize + )); + } + + private final class MultiDispatcherExecutorThread extends Thread { + private final BlockingQueue queue; + + MultiDispatcherExecutorThread(ThreadGroup group, int index, int queueSize) { + super(group, String.format("%s-worker-%d", group.getName(), index)); + this.queue = new LinkedBlockingQueue<>(queueSize); + } + + void add(Runnable runnable) { + queue.add(runnable); + } + + long queueSize() { + return queue.size(); + } + + @Override + public void run() { + try { + while (true) { + queue.take().run(); + } + } catch (InterruptedException e) { + log.warn("{} get interrupted", getName()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java new file mode 100644 index 0000000000000..25126e6e788fa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcherLibrary.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; + +/** + * Stores {@link EventHandler} for {@link Event} in {@link MultiDispatcher} + */ +class MultiDispatcherLibrary { + + private final Map lib = new HashMap<>(); + + public EventHandler getEventHandler(Event e) { + EventHandler handler = lib.get(e.getType().getClass().getCanonicalName()); + if (handler == null) { + throw new Error("EventHandler for " + e.getType() + ", was not found in " + lib.keySet()); + } + return handler; + } + + public void register(Class eventType, EventHandler handler) { + lib.put(eventType.getCanonicalName(), handler); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java new file mode 100644 index 0000000000000..f8278e06a1f54 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetrics.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.metrics; + +import org.apache.hadoop.metrics2.MetricsSource; + +/** + * Interface for {@link org.apache.hadoop.yarn.event.Dispatcher} + * can be used to publish {@link org.apache.hadoop.yarn.event.Event} related metrics + */ +public interface DispatcherEventMetrics extends MetricsSource { + + /** + * Class of the event type what can be handled by the DispatcherEventMetrics + * @param typeClass the event type + */ + void init(Class typeClass); + + /** + * Call if Event added for dispatching + * @param type type of the event + */ + void addEvent(Object type); + + /** + * Call if Event handled + * @param type type of the event + */ + void removeEvent(Object type); + + /** + * Call with how much time was required to handle the event + * @param type type of the event + * @param millisecond time interval + */ + void updateRate(Object type, long millisecond); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java new file mode 100644 index 0000000000000..446afccb0df8d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.StringJoiner; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * Metric object for {@link org.apache.hadoop.yarn.event.multidispatcher.MultiDispatcher} + */ +@InterfaceAudience.Private +@Metrics(context="yarn") +public class DispatcherEventMetricsImpl implements DispatcherEventMetrics { + + private final Map currentEventCountMetrics; + private final Map processingTimeMetrics; + private final MetricsRegistry registry; + + public DispatcherEventMetricsImpl(String name) { + this.currentEventCountMetrics = new HashMap<>(); + this.processingTimeMetrics = new HashMap<>(); + this.registry = new MetricsRegistry(Interns.info( + "DispatcherEventMetrics for " + name, + "DispatcherEventMetrics for " + name + )); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + @Override + public void init(Class typeClass) { + for(Object constant : typeClass.getEnumConstants()) { + String key = createKey(constant); + currentEventCountMetrics.put(key, + registry.newGauge(key + "_Current", key + "_Current", 0L)); + processingTimeMetrics.put(key, + registry.newRate(key + "_", key + "_")); + } + } + + @Override + public void addEvent(Object type) { + currentEventCountMetrics.get(createKey(type)).incr(); + } + + @Override + public void removeEvent(Object type) { + currentEventCountMetrics.get(createKey(type)).decr(); + } + + @Override + public void updateRate(Object type, long millisecond) { + processingTimeMetrics.get(createKey(type)).add(millisecond); + } + + private String createKey(Object constant) { + return constant.getClass().getSimpleName() + "#" + constant; + } + + @Override + public String toString() { + return new StringJoiner(", ") + .add("currentEventCountMetrics=" + currentEventCountMetrics) + .add("processingTimeMetrics=" + processingTimeMetrics) + .toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java new file mode 100644 index 0000000000000..04209812e3510 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DispatcherEventMetricsNoOps.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.metrics; + +import org.slf4j.Logger; + +import org.apache.hadoop.metrics2.MetricsCollector; + +/** + * Used if metric publication should be disabled + */ +public class DispatcherEventMetricsNoOps implements DispatcherEventMetrics { + + private final Logger log; + + public DispatcherEventMetricsNoOps(Logger log) { + this.log = log; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + log.trace("called getMetrics"); + } + + @Override + public void init(Class typeClass) { + log.trace("called init"); + } + + @Override + public void addEvent(Object type) { + log.trace("called addEvent"); + } + + @Override + public void removeEvent(Object type) { + log.trace("called removeEvent"); + } + + @Override + public void updateRate(Object type, long millisecond) { + log.trace("called updateRate"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java new file mode 100644 index 0000000000000..8d899ea3f6f78 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEvent.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.yarn.event.Event; + +class MockEvent implements Event { + private static final Random RANDOM = new Random(); + + private final MockEventType type = Math.random() < 0.5 + ? MockEventType.TYPE_1 + : MockEventType.TYPE_2; + + private final long timestamp = System.currentTimeMillis(); + + private final String lockKey = Arrays.asList( + "APP1", + "APP2", + null + ).get(RANDOM.nextInt(3)); + + @Override + public MockEventType getType() { + return type; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public String getLockKey() { + return lockKey; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java new file mode 100644 index 0000000000000..f9c96968c47d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/MockEventType.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +enum MockEventType { + TYPE_1, + TYPE_2 +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java new file mode 100644 index 0000000000000..caf8ae846fd0e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/multidispatcher/TestMultiDispatcher.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event.multidispatcher; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; + +import static org.junit.Assert.assertEquals; + +public class TestMultiDispatcher { + + @Test(timeout = 5_000) + public void testHandle() { + MultiDispatcher dispatcher = new MultiDispatcher("Test"); + assertEquals(Service.STATE.NOTINITED, dispatcher.getServiceState()); + dispatcher.init(new Configuration()); + assertEquals(Service.STATE.INITED, dispatcher.getServiceState()); + dispatcher.start(); + assertEquals(Service.STATE.STARTED, dispatcher.getServiceState()); + AtomicInteger handledEvent = new AtomicInteger(); + dispatcher.register(MockEventType.class, event -> handledEvent.incrementAndGet()); + + IntStream.range(0, 1_000) + .forEach(value -> dispatcher.getEventHandler().handle(new MockEvent())); + + dispatcher.stop(); + assertEquals(Service.STATE.STOPPED, dispatcher.getServiceState()); + assertEquals(1_000, handledEvent.get()); + } + + @Test(timeout = 5_000, expected = Error.class) + public void testMissingHandler() { + MultiDispatcher dispatcher = new MultiDispatcher("Test"); + assertEquals(Service.STATE.NOTINITED, dispatcher.getServiceState()); + dispatcher.init(new Configuration()); + assertEquals(Service.STATE.INITED, dispatcher.getServiceState()); + dispatcher.start(); + assertEquals(Service.STATE.STARTED, dispatcher.getServiceState()); + + dispatcher.getEventHandler().handle(new MockEvent()); + } + + @Test(timeout = 5_000) + public void testBlocksNewOnStop() { + MultiDispatcher dispatcher = new MultiDispatcher("Test"); + assertEquals(Service.STATE.NOTINITED, dispatcher.getServiceState()); + dispatcher.init(new Configuration()); + assertEquals(Service.STATE.INITED, dispatcher.getServiceState()); + AtomicInteger handledEvent = new AtomicInteger(); + dispatcher.register(MockEventType.class, event -> handledEvent.incrementAndGet()); + dispatcher.start(); + assertEquals(Service.STATE.STARTED, dispatcher.getServiceState()); + dispatcher.stop(); + assertEquals(Service.STATE.STOPPED, dispatcher.getServiceState()); + dispatcher.getEventHandler().handle(new MockEvent()); + assertEquals(0, handledEvent.get()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 49e25eb8f1378..6952806a3c846 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -33,9 +33,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.crypto.SecretKey; @@ -59,9 +56,9 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.event.multidispatcher.MultiDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; @@ -118,8 +115,6 @@ public abstract class RMStateStore extends AbstractService { protected long baseEpoch; private long epochRange; protected ResourceManager resourceManager; - private final ReadLock readLock; - private final WriteLock writeLock; public static final Logger LOG = LoggerFactory.getLogger(RMStateStore.class); @@ -687,9 +682,6 @@ public RMStateStoreState transition(RMStateStore store, public RMStateStore() { super(RMStateStore.class.getName()); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - this.readLock = lock.readLock(); - this.writeLock = lock.writeLock(); stateMachine = stateMachineFactory.make(this); } @@ -799,20 +791,18 @@ public void setRMDispatcher(Dispatcher dispatcher) { this.rmDispatcher = dispatcher; } - AsyncDispatcher dispatcher; + MultiDispatcher dispatcher; @SuppressWarnings("rawtypes") @VisibleForTesting protected EventHandler rmStateStoreEventHandler; @Override protected void serviceInit(Configuration conf) throws Exception{ - // create async handler - dispatcher = new AsyncDispatcher("RM StateStore dispatcher"); + dispatcher = new MultiDispatcher("RM State Store"); dispatcher.init(conf); rmStateStoreEventHandler = new ForwardingEventHandler(); dispatcher.register(RMStateStoreEventType.class, rmStateStoreEventHandler); - dispatcher.setDrainEventsOnStop(); // read the base epoch value from conf baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH, YarnConfiguration.DEFAULT_RM_EPOCH); @@ -1330,7 +1320,6 @@ protected boolean isFencedState() { // Dispatcher related code protected void handleStoreEvent(RMStateStoreEvent event) { - this.writeLock.lock(); try { LOG.debug("Processing event of type {}", event.getType()); @@ -1346,8 +1335,6 @@ protected void handleStoreEvent(RMStateStoreEvent event) { } catch (InvalidStateTransitionException e) { LOG.error("Can't handle this event at current state", e); - } finally { - this.writeLock.unlock(); } } @@ -1441,12 +1428,7 @@ public void setResourceManager(ResourceManager rm) { } public RMStateStoreState getRMStateStoreState() { - this.readLock.lock(); - try { - return this.stateMachine.getCurrentState(); - } finally { - this.readLock.unlock(); - } + return this.stateMachine.getCurrentState(); } @SuppressWarnings("rawtypes") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java index 3399431cf040f..c30036bdd75e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppAttemptEvent.java @@ -31,4 +31,9 @@ public RMStateStoreAppAttemptEvent(ApplicationAttemptStateData attemptState) { public ApplicationAttemptStateData getAppAttemptState() { return attemptState; } + + @Override + public String getLockKey() { + return attemptState.getAttemptId().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java index 50e59f72610be..9033a2357727a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java @@ -32,4 +32,9 @@ public RMStateStoreAppEvent(ApplicationStateData appState) { public ApplicationStateData getAppState() { return appState; } + + @Override + public String getLockKey() { + return appState.getApplicationSubmissionContext().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java index 7455c39999a8f..c5edeef8ddfd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java @@ -34,4 +34,9 @@ public class RMStateStoreRemoveAppAttemptEvent extends RMStateStoreEvent { public ApplicationAttemptId getApplicationAttemptId() { return applicationAttemptId; } + + @Override + public String getLockKey() { + return applicationAttemptId.getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java index fbba64c8783e1..d184d8d785ae0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppEvent.java @@ -31,4 +31,9 @@ public class RMStateStoreRemoveAppEvent extends RMStateStoreEvent { public ApplicationStateData getAppState() { return appState; } + + @Override + public String getLockKey() { + return appState.getApplicationSubmissionContext().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java index 14f8e9d47fcf6..62fd84baeef56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppAttemptEvent.java @@ -33,4 +33,9 @@ public RMStateUpdateAppAttemptEvent( public ApplicationAttemptStateData getAppAttemptState() { return attemptState; } + + @Override + public String getLockKey() { + return attemptState.getAttemptId().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java index 1c156df0fcb14..4b1d73004d670 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java @@ -60,4 +60,9 @@ public boolean isNotifyApplication() { public SettableFuture getResult() { return future; } + + @Override + public String getLockKey() { + return appState.getApplicationSubmissionContext().getApplicationId().toString(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 1773c7aaa88e7..7416e829b6f12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -33,6 +33,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.crypto.SecretKey; @@ -74,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -99,9 +102,8 @@ public class RMStateStoreTestBase { static class TestDispatcher implements Dispatcher, EventHandler { - ApplicationAttemptId attemptId; - - boolean notified = false; + private final Set handledAttempt = ConcurrentHashMap.newKeySet(); + private final Set handledApps = ConcurrentHashMap.newKeySet(); @SuppressWarnings("rawtypes") @Override @@ -113,11 +115,16 @@ public void register(Class eventType, public void handle(Event event) { if (event instanceof RMAppAttemptEvent) { RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent) event; - assertEquals(attemptId, rmAppAttemptEvent.getApplicationAttemptId()); - } - notified = true; - synchronized (this) { - notifyAll(); + synchronized (handledAttempt) { + handledAttempt.add(rmAppAttemptEvent.getApplicationAttemptId()); + handledAttempt.notifyAll(); + } + } else if (event instanceof RMAppEvent) { + RMAppEvent rmAppEvent = (RMAppEvent) event; + synchronized (handledApps) { + handledApps.add(rmAppEvent.getApplicationId()); + handledApps.notifyAll(); + } } } @@ -127,6 +134,38 @@ public EventHandler getEventHandler() { return this; } + void waitNotify(ApplicationAttemptId attemptId) { + long startTime = System.currentTimeMillis(); + while(!handledAttempt.contains(attemptId)) { + synchronized (handledAttempt) { + try { + handledAttempt.wait(100); + } catch (InterruptedException e) { + LOG.trace("Interrupted", e); + } + } + if(System.currentTimeMillis() - startTime > 1000*60) { + fail("Timed out attempt store notification"); + } + } + } + + void waitNotify(ApplicationId applicationId) { + long startTime = System.currentTimeMillis(); + while(!handledApps.contains(applicationId)) { + synchronized (handledApps) { + try { + handledApps.wait(100); + } catch (InterruptedException e) { + LOG.trace("Interrupted", e); + } + } + if(System.currentTimeMillis() - startTime > 1000*60) { + fail("Timed out attempt store notification"); + } + } + } + } public static class StoreStateVerifier { @@ -148,23 +187,6 @@ public long getEpochRange() { return epochRange; } - void waitNotify(TestDispatcher dispatcher) { - long startTime = System.currentTimeMillis(); - while(!dispatcher.notified) { - synchronized (dispatcher) { - try { - dispatcher.wait(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - if(System.currentTimeMillis() - startTime > 1000*60) { - fail("Timed out attempt store notification"); - } - } - dispatcher.notified = false; - } - protected RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime, long startTime) throws Exception { ApplicationSubmissionContext context = @@ -204,17 +226,15 @@ protected RMAppAttempt storeAttempt(RMStateStore store, .thenReturn(mockRmAppAttemptMetrics); when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage()) .thenReturn(new AggregateAppResourceUsage(new HashMap<>())); - dispatcher.attemptId = attemptId; store.storeNewApplicationAttempt(mockAttempt); - waitNotify(dispatcher); + dispatcher.waitNotify(attemptId); return mockAttempt; } protected void updateAttempt(RMStateStore store, TestDispatcher dispatcher, ApplicationAttemptStateData attemptState) { - dispatcher.attemptId = attemptState.getAttemptId(); store.updateApplicationAttemptState(attemptState); - waitNotify(dispatcher); + dispatcher.waitNotify(attemptState.getAttemptId()); } void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) @@ -642,6 +662,8 @@ private ArrayList createAndStoreApps( public void testDeleteStore(RMStateStoreHelper stateStoreHelper) throws Exception { RMStateStore store = stateStoreHelper.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); ArrayList appList = createAndStoreApps(stateStoreHelper, store, 5); store.deleteStore(); // verify apps deleted diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 17737e59c2b69..35e8fb66eb997 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -73,7 +73,6 @@ class TestFileSystemRMStore extends FileSystemRMStateStore { init(conf); Assert.assertNull(fs); assertTrue(workingDirPathURI.equals(fsWorkingPath)); - dispatcher.disableExitOnDispatchException(); start(); Assert.assertNotNull(fs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index e93599dd47ec4..1dc929cba6592 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -174,7 +174,6 @@ public RMStateStore getRMStateStore() throws Exception { stateStore = new LeveldbRMStateStore(); stateStore.init(conf); stateStore.start(); - stateStore.dispatcher.disableExitOnDispatchException(); return stateStore; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 404fae9d853f0..0f3512ccf30cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -148,7 +148,6 @@ class TestZKRMStateStoreInternal extends ZKRMStateStore { throws Exception { setResourceManager(new ResourceManager()); init(conf); - dispatcher.disableExitOnDispatchException(); start(); assertTrue(znodeWorkingPath.equals(workingZnode)); } @@ -537,6 +536,7 @@ public void testFencedState() throws Exception { RMApp mockApp = mock(RMApp.class); ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); + context.setApplicationId(mock(ApplicationId.class)); when(mockApp.getSubmitTime()).thenReturn(submitTime); when(mockApp.getStartTime()).thenReturn(startTime); when(mockApp.getApplicationSubmissionContext()).thenReturn(context); @@ -876,7 +876,7 @@ private void finishAppWithAttempts(RMState state, RMStateStore store, appStateNew.attempts.putAll(appState.attempts); } store.updateApplicationState(appStateNew); - waitNotify(dispatcher); + dispatcher.waitNotify(appId); Container container = new ContainerPBImpl(); container.setId(ContainerId.newContainerId(attemptId, 1)); ApplicationAttemptStateData newAttemptState = @@ -894,7 +894,7 @@ private void storeAppWithAttempts(RMStateStore store, private void storeApp(RMStateStore store, TestDispatcher dispatcher, ApplicationId appId, long submitTime, long startTime) throws Exception { storeApp(store, appId, submitTime, startTime); - waitNotify(dispatcher); + dispatcher.waitNotify(appId); } private void storeAppWithAttempts(RMStateStore store, @@ -1001,7 +1001,7 @@ public void testAppNodeSplit() throws Exception { // Store app2 with app id application_1352994193343_120213. ApplicationId appId21 = ApplicationId.newInstance(1352994193343L, 120213); storeApp(store, appId21, submitTime, startTime); - waitNotify(dispatcher); + dispatcher.waitNotify(appId21); // Store another app which will be removed. ApplicationId appIdRemoved = ApplicationId.newInstance(1352994193343L, 2); @@ -1171,7 +1171,7 @@ public void testAppNodeSplitChangeAcrossRestarts() throws Exception { ApplicationId appId71 = ApplicationId.newInstance(1442994195087L, 7); //storeApp(store, dispatcher, appId71, submitTime, startTime); storeApp(store, appId71, submitTime, startTime); - waitNotify(dispatcher); + dispatcher.waitNotify(appId71); ApplicationAttemptId attemptId71 = ApplicationAttemptId.newInstance(appId71, 1); storeAttempt(store, ApplicationAttemptId.newInstance(appId71, 1), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java index 3cb428c5c5960..5e8c5f99ecbb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java @@ -192,7 +192,7 @@ public int run(String[] args) { + e.getMessage()); return -1; } - waitNotify(dispatcher); + dispatcher.waitNotify(appId); rmApps.add(app); }