Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

Expand Down Expand Up @@ -69,11 +72,16 @@
* {@link #currentAuditContext()} to get the thread-local
* context for the caller, which can then be manipulated.
*
* For further information, especially related to memory consumption,
* read the document `auditing_architecture` in the `hadoop-aws` module.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class CommonAuditContext {

private static final Logger LOG = LoggerFactory.getLogger(
CommonAuditContext.class);

/**
* Process ID; currently built from UUID and timestamp.
*/
Expand All @@ -92,7 +100,7 @@ public final class CommonAuditContext {
* Supplier operations must themselves be thread safe.
*/
private final Map<String, Supplier<String>> evaluatedEntries =
new ConcurrentHashMap<>();
new ConcurrentHashMap<>(1);

static {
// process ID is fixed.
Expand All @@ -108,7 +116,7 @@ public final class CommonAuditContext {
* the span is finalized.
*/
private static final ThreadLocal<CommonAuditContext> ACTIVE_CONTEXT =
ThreadLocal.withInitial(() -> createInstance());
ThreadLocal.withInitial(CommonAuditContext::createInstance);

private CommonAuditContext() {
}
Expand All @@ -125,11 +133,21 @@ public Supplier<String> put(String key, String value) {

/**
* Put a context entry dynamically evaluated on demand.
* Important: as these supplier methods are long-lived,
* the supplier function <i>MUST NOT</i> be part of/refer to
* any object instance of significant memory size.
* Applications SHOULD remove references when they are
* no longer needed.
* When logged at TRACE, prints the key and stack trace of the caller,
* to allow for debugging of any problems.
* @param key key
* @param value new value
* @return old value or null
*/
public Supplier<String> put(String key, Supplier<String> value) {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding context entry {}", key, new Exception(key));
}
return evaluatedEntries.put(key, value);
}

Expand All @@ -138,6 +156,9 @@ public Supplier<String> put(String key, Supplier<String> value) {
* @param key key
*/
public void remove(String key) {
if (LOG.isTraceEnabled()) {
LOG.trace("Remove context entry {}", key);
}
evaluatedEntries.remove(key);
}

Expand Down Expand Up @@ -168,7 +189,7 @@ public void reset() {
private void init() {

// thread 1 is dynamic
put(PARAM_THREAD1, () -> currentThreadID());
put(PARAM_THREAD1, CommonAuditContext::currentThreadID);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;

import org.apache.hadoop.util.WeakReferenceMap;

/**
* A WeakReferenceMap for threads.
* @param <V> value type of the map
*/
public class WeakReferenceThreadMap<V> extends WeakReferenceMap<Long, V> {

public WeakReferenceThreadMap(final Function<? super Long, ? extends V> factory,
@Nullable final Consumer<? super Long> referenceLost) {
super(factory, referenceLost);
}

public V getForCurrentThread() {
return get(currentThreadId());
}

public V removeForCurrentThread() {
return remove(currentThreadId());
}

public long currentThreadId() {
return Thread.currentThread().getId();
}

public V setForCurrentThread(V newVal) {
return put(currentThreadId(), newVal);
}

}
Loading