Skip to content

Commit efdec92

Browse files
HADOOP-18091. S3A auditing leaks memory through ThreadLocal references (#3930)
Adds a new map type WeakReferenceMap, which stores weak references to values, and a WeakReferenceThreadMap subclass to more closely resemble a thread local type, as it is a map of threadId to value. Construct it with a factory method and optional callback for notification on loss and regeneration. WeakReferenceThreadMap<WrappingAuditSpan> activeSpan = new WeakReferenceThreadMap<>( (k) -> getUnbondedSpan(), this::noteSpanReferenceLost); This is used in ActiveAuditManagerS3A for span tracking. Relates to * HADOOP-17511. Add an Audit plugin point for S3A * HADOOP-18094. Disable S3A auditing by default. Contributed by Steve Loughran.
1 parent 390967f commit efdec92

File tree

18 files changed

+1345
-38
lines changed

18 files changed

+1345
-38
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.function.Supplier;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
2730
import org.apache.hadoop.classification.InterfaceAudience;
2831
import org.apache.hadoop.classification.InterfaceStability;
2932

@@ -69,11 +72,16 @@
6972
* {@link #currentAuditContext()} to get the thread-local
7073
* context for the caller, which can then be manipulated.
7174
*
75+
* For further information, especially related to memory consumption,
76+
* read the document `auditing_architecture` in the `hadoop-aws` module.
7277
*/
7378
@InterfaceAudience.Public
7479
@InterfaceStability.Unstable
7580
public final class CommonAuditContext {
7681

82+
private static final Logger LOG = LoggerFactory.getLogger(
83+
CommonAuditContext.class);
84+
7785
/**
7886
* Process ID; currently built from UUID and timestamp.
7987
*/
@@ -92,7 +100,7 @@ public final class CommonAuditContext {
92100
* Supplier operations must themselves be thread safe.
93101
*/
94102
private final Map<String, Supplier<String>> evaluatedEntries =
95-
new ConcurrentHashMap<>();
103+
new ConcurrentHashMap<>(1);
96104

97105
static {
98106
// process ID is fixed.
@@ -108,7 +116,7 @@ public final class CommonAuditContext {
108116
* the span is finalized.
109117
*/
110118
private static final ThreadLocal<CommonAuditContext> ACTIVE_CONTEXT =
111-
ThreadLocal.withInitial(() -> createInstance());
119+
ThreadLocal.withInitial(CommonAuditContext::createInstance);
112120

113121
private CommonAuditContext() {
114122
}
@@ -125,11 +133,21 @@ public Supplier<String> put(String key, String value) {
125133

126134
/**
127135
* Put a context entry dynamically evaluated on demand.
136+
* Important: as these supplier methods are long-lived,
137+
* the supplier function <i>MUST NOT</i> be part of/refer to
138+
* any object instance of significant memory size.
139+
* Applications SHOULD remove references when they are
140+
* no longer needed.
141+
* When logged at TRACE, prints the key and stack trace of the caller,
142+
* to allow for debugging of any problems.
128143
* @param key key
129144
* @param value new value
130145
* @return old value or null
131146
*/
132147
public Supplier<String> put(String key, Supplier<String> value) {
148+
if (LOG.isTraceEnabled()) {
149+
LOG.trace("Adding context entry {}", key, new Exception(key));
150+
}
133151
return evaluatedEntries.put(key, value);
134152
}
135153

@@ -138,6 +156,9 @@ public Supplier<String> put(String key, Supplier<String> value) {
138156
* @param key key
139157
*/
140158
public void remove(String key) {
159+
if (LOG.isTraceEnabled()) {
160+
LOG.trace("Remove context entry {}", key);
161+
}
141162
evaluatedEntries.remove(key);
142163
}
143164

@@ -168,7 +189,7 @@ public void reset() {
168189
private void init() {
169190

170191
// thread 1 is dynamic
171-
put(PARAM_THREAD1, () -> currentThreadID());
192+
put(PARAM_THREAD1, CommonAuditContext::currentThreadID);
172193
}
173194

174195
/**
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.impl;
20+
21+
import java.util.function.Consumer;
22+
import java.util.function.Function;
23+
import javax.annotation.Nullable;
24+
25+
import org.apache.hadoop.util.WeakReferenceMap;
26+
27+
/**
28+
* A WeakReferenceMap for threads.
29+
* @param <V> value type of the map
30+
*/
31+
public class WeakReferenceThreadMap<V> extends WeakReferenceMap<Long, V> {
32+
33+
public WeakReferenceThreadMap(final Function<? super Long, ? extends V> factory,
34+
@Nullable final Consumer<? super Long> referenceLost) {
35+
super(factory, referenceLost);
36+
}
37+
38+
public V getForCurrentThread() {
39+
return get(currentThreadId());
40+
}
41+
42+
public V removeForCurrentThread() {
43+
return remove(currentThreadId());
44+
}
45+
46+
public long currentThreadId() {
47+
return Thread.currentThread().getId();
48+
}
49+
50+
public V setForCurrentThread(V newVal) {
51+
return put(currentThreadId(), newVal);
52+
}
53+
54+
}

0 commit comments

Comments
 (0)