Skip to content

Commit 03e5026

Browse files
committed
SAMZA-1552: Host affinity improvements - Improve matching of hosts to allocated resources
Author: Jagadish <[email protected]> Reviewers: Prateek <[email protected]> Closes apache#401 from vjagadish1989/host-affinity-fix
1 parent 7e68e4b commit 03e5026

File tree

4 files changed

+234
-41
lines changed

4 files changed

+234
-41
lines changed

samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,9 @@ private void addToAllocatedResourceList(String host, SamzaResource samzaResource
171171
public void updateStateAfterAssignment(SamzaResourceRequest request, String assignedHost, SamzaResource samzaResource) {
172172
synchronized (lock) {
173173
requestsQueue.remove(request);
174+
// A reference for the resource could either be held in the preferred host buffer or in the ANY_HOST buffer.
174175
allocatedResources.get(assignedHost).remove(samzaResource);
176+
allocatedResources.get(ANY_HOST).remove(samzaResource);
175177
if (hostAffinityEnabled) {
176178
// assignedHost may not always be the preferred host.
177179
// Hence, we should safely decrement the counter for the preferredHost
@@ -266,18 +268,33 @@ private List<String> getAllocatedHosts() {
266268
/**
267269
* Retrieves, but does not remove, the first allocated resource on the specified host.
268270
*
269-
* @param host the host for which a resource is needed.
270-
* @return the first {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one.
271+
* @param host the host for which a resource is needed.
272+
* @return a {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one.
271273
*/
272-
273274
public SamzaResource peekResource(String host) {
274275
synchronized (lock) {
275-
List<SamzaResource> resourcesOnTheHost = this.allocatedResources.get(host);
276+
List<SamzaResource> resourcesOnPreferredHostBuffer = this.allocatedResources.get(host);
277+
List<SamzaResource> resourcesOnAnyHostBuffer = this.allocatedResources.get(ANY_HOST);
276278

277-
if (resourcesOnTheHost == null || resourcesOnTheHost.isEmpty()) {
279+
// First search for the preferred host buffers
280+
if (resourcesOnPreferredHostBuffer != null && !resourcesOnPreferredHostBuffer.isEmpty()) {
281+
SamzaResource resource = resourcesOnPreferredHostBuffer.get(0);
282+
log.info("Returning a buffered resource: {} for {} from preferred-host buffer.", resource.getResourceID(), host);
283+
return resource;
284+
} else if (resourcesOnAnyHostBuffer != null && !resourcesOnAnyHostBuffer.isEmpty()) {
285+
// If preferred host buffers are empty, scan the ANY_HOST buffer
286+
log.debug("No resources on preferred-host buffer. Scanning ANY_HOST buffer");
287+
SamzaResource resource = resourcesOnAnyHostBuffer.stream()
288+
.filter(resrc -> resrc.getHost().equals(host))
289+
.findAny().orElse(null);
290+
if (resource != null) {
291+
log.info("Returning a buffered resource: {} for {} from ANY_HOST buffer.", resource.getResourceID(), host);
292+
}
293+
return resource;
294+
} else {
295+
log.debug("Cannot find any resource in the ANY_HOST buffer for {} because both buffers are empty", host);
278296
return null;
279297
}
280-
return resourcesOnTheHost.get(0);
281298
}
282299
}
283300

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.clustermanager;
20+
21+
import org.apache.samza.config.Config;
22+
23+
import java.lang.reflect.Field;
24+
import java.util.Map;
25+
import java.util.concurrent.Semaphore;
26+
import java.util.concurrent.TimeUnit;
27+
28+
public class MockHostAwareContainerAllocator extends HostAwareContainerAllocator {
29+
private static final int ALLOCATOR_TIMEOUT_MS = 10000;
30+
private Semaphore semaphore = new Semaphore(0);
31+
32+
public MockHostAwareContainerAllocator(ClusterResourceManager manager,
33+
Config config, SamzaApplicationState state) {
34+
super(manager, ALLOCATOR_TIMEOUT_MS, config, state);
35+
}
36+
37+
/**
38+
* Causes the current thread to block until the expected number of containers have started.
39+
*
40+
* @param numExpectedContainers the number of containers expected to start
41+
* @param timeout the maximum time to wait
42+
* @param unit the time unit of the {@code timeout} argument
43+
*
44+
* @return a boolean that specifies whether containers started within the timeout.
45+
* @throws InterruptedException if the current thread is interrupted while waiting
46+
*/
47+
boolean awaitContainersStart(int numExpectedContainers, long timeout, TimeUnit unit) throws InterruptedException {
48+
return semaphore.tryAcquire(numExpectedContainers, timeout, unit);
49+
}
50+
51+
@Override
52+
public void requestResources(Map<String, String> containerToHostMappings) {
53+
super.requestResources(containerToHostMappings);
54+
}
55+
56+
public ResourceRequestState getContainerRequestState() throws Exception {
57+
Field field = AbstractContainerAllocator.class.getDeclaredField("resourceRequestState");
58+
field.setAccessible(true);
59+
60+
return (ResourceRequestState) field.get(this);
61+
}
62+
63+
@Override
64+
protected void runStreamProcessor(SamzaResourceRequest request, String preferredHost) {
65+
super.runStreamProcessor(request, preferredHost);
66+
semaphore.release();
67+
}
68+
}

0 commit comments

Comments
 (0)