Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/main/java/com/uber/rss/StreamServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ private void createServiceRegistry(StreamServerConfig serverConfig) {
}

private Pair<Channel, Integer> bindPort(ServerBootstrap bootstrap, int port) throws InterruptedException, BindException {
logger.info(String.format("Binding to specified port: %s", port));
logger.info("Binding to specified port: {}", port);
Channel channel = bootstrap.bind(port).sync().channel();
InetSocketAddress localAddress = (InetSocketAddress)channel.localAddress();
logger.info(String.format("Bound to local address: %s", localAddress));
logger.info("Bound to local address: {}", localAddress);
return Pair.of(channel, localAddress.getPort());
}

Expand All @@ -186,7 +186,7 @@ public void initChannel(final SocketChannel ch) {
}

public void run() throws InterruptedException, BindException {
logger.info(String.format("Number of opened files: %s", SystemUtils.getFileDescriptorCount()));
logger.info("Number of opened files: {}", SystemUtils.getFileDescriptorCount());

String serverId = getServerId();

Expand All @@ -213,12 +213,12 @@ public void run() throws InterruptedException, BindException {
Pair<Channel, Integer> channelAndPort = bindPort(streamServerBootstrap, serverConfig.getShufflePort());
channels.add(channelAndPort.getKey());
shufflePort = channelAndPort.getValue();
logger.info(String.format("ShuffleServer: %s:%s", hostName, shufflePort));
logger.info("ShuffleServer: {}:{}", hostName, shufflePort);

if (this.serviceRegistry == null && serverConfig.getServiceRegistryType().
equalsIgnoreCase(ServiceRegistry.TYPE_STANDALONE)) {
logger.info(String.format("Creating registry client connecting to local stream server: %s:%s",
hostName, shufflePort));
logger.info("Creating registry client connecting to local stream server: {}:{}",
hostName, shufflePort);
this.serviceRegistry = new StandaloneServiceRegistryClient(this.hostName, shufflePort,
serverConfig.getNetworkTimeout(), "streamServer");
}
Expand Down Expand Up @@ -299,7 +299,7 @@ public void shutdown(boolean wait) {
try {
c.close();
} catch (Throwable e) {
logger.warn(String.format("Unable to shutdown channel %s:", c), e);
logger.warn("Unable to shutdown channel {}", c, e);
exceptions.add(e);
}
}
Expand Down Expand Up @@ -365,8 +365,8 @@ private static Thread addShutdownHook(StreamServer server) {

public static void main(String[] args) throws Exception {
StreamServerConfig serverConfig = StreamServerConfig.buildFromArgs(args);
logger.info(String.format("Starting server (version: %s, revision: %s) with config: %s",
RssBuildInfo.Version, RssBuildInfo.Revision, serverConfig));
logger.info("Starting server (version: {}, revision: {}) with config: {}",
RssBuildInfo.Version, RssBuildInfo.Revision, serverConfig);
StreamServer server = new StreamServer(serverConfig);
server.run();
addShutdownHook(server);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public BlockingQueueReadClient(BlockingSingleServerReadClient delegate, int queu
this.delegate = delegate;

this.recordQueue = new ArrayBlockingQueue<>(queueSize);
logger.info(String.format("Created blocking queue with size: %s", queueSize));
logger.info("Created blocking queue with size: {}", queueSize);

this.maxBlockingMillis = maxBlockingMillis;
}
Expand All @@ -65,8 +65,7 @@ record = delegate.readRecord();
recordQueue.put(new EofRecordKeyValuePair());
} catch (Throwable ex) {
M3Stats.addException(ex, this.getClass().getSimpleName());
String logStr = String.format("Failed to read record, %s", delegate.toString());
logger.warn(logStr, ex);
logger.warn("Failed to read record, {}", delegate, ex);
recordQueue.clear();
recordQueue.add(new FailedFetchRecordKeyValuePair(ex));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public BusyStatusSocketClient(String host, int port, int timeoutMillis, String u

public GetBusyStatusResponse getBusyStatus() {
if (socket == null) {
logger.debug(String.format("Connecting to server to get busy status: %s", connectionInfo));
logger.debug("Connecting to server to get busy status: {}", connectionInfo);
connectSocket();

write(MessageConstants.UPLOAD_UPLINK_MAGIC_BYTE);
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/uber/rss/clients/ClientBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ClientBase(String host, int port, int timeoutMillis) {
this.port = port;
this.timeoutMillis = timeoutMillis;
this.connectionInfo = String.format("%s %s [%s -> %s:%s]", this.getClass().getSimpleName(), internalClientId, NetworkUtils.getLocalHostName(), host, port);
logger.debug(String.format("Created instance (timeout: %s millis): %s", timeoutMillis, this));
logger.debug("Created instance (timeout: {} millis): {}", timeoutMillis, this);
}

@Override
Expand Down Expand Up @@ -148,7 +148,7 @@ protected void connectSocket() {
while (System.currentTimeMillis() - startTime <= timeoutMillis) {
ClientConnectMetrics metrics = metricGroupContainer.getMetricGroup(getClientConnectMetricsKey());
if (triedTimes >= 1) {
logger.info(String.format("Retrying connect to %s:%s, total retrying times: %s, elapsed milliseconds: %s", host, port, triedTimes, System.currentTimeMillis() - startTime));
logger.info("Retrying connect to {}:{} total retrying times: {}, elapsed milliseconds: {}", host, port, triedTimes, System.currentTimeMillis() - startTime);
metrics.getSocketConnectRetries().update(triedTimes);
}
triedTimes++;
Expand All @@ -167,7 +167,7 @@ protected void connectSocket() {
M3Stats.addException(socketException, this.getClass().getSimpleName());
socket = null;
lastException = socketException;
logger.info(String.format("Failed to connect to %s:%s, %s", host, port, ExceptionUtils.getSimpleMessage(socketException)));
logger.info("Failed to connect to {}:{}", host, port, socketException);

long elapsedTime = System.currentTimeMillis() - startTime;
if (elapsedTime < timeoutMillis) {
Expand Down Expand Up @@ -236,7 +236,7 @@ protected void writeMessageLengthAndContent(BaseMessage msg) {
}

protected void writeControlMessageNotWaitResponseStatus(BaseMessage msg) {
logger.debug(String.format("Writing control message: %s, connection: %s", msg, connectionInfo));
logger.debug("Writing control message: {}, connection: {}", msg, connectionInfo);
try {
outputStream.write(ByteBufUtils.convertIntToBytes(msg.getMessageType()));
} catch (IOException e) {
Expand All @@ -252,7 +252,7 @@ protected void writeControlMessageAndWaitResponseStatus(BaseMessage msg) {
writeControlMessageNotWaitResponseStatus(msg);

readResponseStatus();
logger.debug(String.format("Got OK response for control message: %s, connection: %s", msg, connectionInfo));
logger.debug("Got OK response for control message: {}, connection: {}", msg, connectionInfo);
}

private int readStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private ConnectDownloadResponse connectImpl() {
throw new RssShuffleCorruptedException("Shuffle data corrupted for: " + appShufflePartitionId, ex);
} catch (RssMissingShuffleWriteConfigException | RssShuffleStageNotStartedException ex) {
exceptionWrapper.setException(ex);
logger.warn(String.format("Did not find data in server side, server may not run fast enough to get data from client or server hits some issue, %s", appShufflePartitionId), ex);
logger.warn("Did not find data in server side, server may not run fast enough to get data from client or server hits some issue, {}", appShufflePartitionId, ex);
return null;
}
});
Expand Down Expand Up @@ -203,7 +203,7 @@ public GetDataAvailabilityResponse waitDataAvailable() {
if (getDataAvailabilityRetryResult != null && getDataAvailabilityRetryResult.getMapTaskCommitStatus() != null) {
MapTaskCommitStatus mapTaskCommitStatus = getDataAvailabilityRetryResult.getMapTaskCommitStatus();
if (mapTaskCommitStatus.getTaskAttemptIds().isEmpty()) {
taskAttemptIdInfo = String.format("no task attempt committed");
taskAttemptIdInfo = "no task attempt committed";
} else {
List<Long> taskAttemptIds = mapTaskCommitStatus.getTaskAttemptIds().values().stream().collect(Collectors.toList());
Collections.sort(taskAttemptIds);
Expand Down Expand Up @@ -266,7 +266,7 @@ public void close() {
super.close();
closeMetrics();
} catch (Throwable ex) {
logger.warn(String.format("Failed to close read client %s", this), ex);
logger.warn("Failed to close read client {}", this, ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public HeartbeatSocketClient(String host, int port, int timeoutMillis, String us

public void sendHeartbeat() {
if (socket == null) {
logger.debug(String.format("Connecting to server for heartbeat: %s", connectionInfo));
logger.debug("Connecting to server for heartbeat: {}", connectionInfo);
connectSocket();

write(MessageConstants.UPLOAD_UPLINK_MAGIC_BYTE);
Expand Down
21 changes: 10 additions & 11 deletions src/main/java/com/uber/rss/clients/MultiServerAsyncWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.uber.rss.metrics.M3Stats;
import com.uber.rss.metrics.WriteClientMetrics;
import com.uber.rss.metrics.WriteClientMetricsKey;
import com.uber.rss.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -119,7 +118,7 @@ public MultiServerAsyncWriteClient(Collection<ServerReplicationGroup> servers, i
this.servers.size()));
}

logger.info(String.format("Created %s, threads: %s, queue size: %s", this.getClass().getSimpleName(), numThreads, writeQueueSize));
logger.info("Created {}, threads: {}, queue size: {}", this.getClass().getSimpleName(), numThreads, writeQueueSize);
}

@Override
Expand All @@ -140,7 +139,7 @@ public void connect() {
for (int i = 0; i < threads.length; i++) {
final int threadIndex = i;
Thread thread = new Thread(() -> {
logger.info(String.format("Record Thread %s started", threadIndex));
logger.info("Record Thread {} started", threadIndex);
BlockingQueue<Record> recordQueue = recordQueues[threadIndex];
try {
// TODO optimize the max wait time for poll
Expand All @@ -163,7 +162,7 @@ public void connect() {
}
}
} catch (Throwable e) {
logger.warn(String.format("Record Thread %s got exception, %s", threadIndex, ExceptionUtils.getSimpleMessage(e)), e);
logger.warn("Record Thread {} got exception", threadIndex, e);
M3Stats.addException(e, this.getClass().getSimpleName());
exceptions.add(e);
}
Expand All @@ -172,7 +171,7 @@ public void connect() {
exceptions.add(new RssQueueNotReadyException(String.format("Record queue %s has %s remaining records not sent out", threadIndex, remainingRecords)));
}
recordQueue.clear();
logger.info(String.format("Record Thread %s finished, remaining records: %s", threadIndex, remainingRecords));
logger.info("Record Thread {} finished, remaining records: {}", threadIndex, remainingRecords);
});
thread.setName("Record Thread " + i);
threads[threadIndex] = thread;
Expand Down Expand Up @@ -219,7 +218,7 @@ public void writeDataBlock(int partition, ByteBuffer value) {
long currentTime = System.currentTimeMillis();
if (currentTime - lastLogTime > logInterval) {
for (int i = 0; i < recordQueues.length; i++) {
logger.info(String.format("Record queue %s size: %s", i, recordQueues[i].size()));
logger.info("Record queue {} size: {}", i, recordQueues[i].size());
}
lastLogTime = currentTime;
}
Expand All @@ -246,13 +245,13 @@ public void finishUpload() {

long underlyingClientFinishUploadTime = System.nanoTime() - underlyingClientFinishUploadStartTime;

logger.info(String.format("WriteClientTime (%s), queue insert seconds: %s, queue poll seconds: %s, socket seconds: %s, stop thread seconds: %s, finish upload seconds: %s",
logger.info("WriteClientTime ({}), queue insert seconds: {}, queue poll seconds: {}, socket seconds: {}, stop thread seconds: {}, finish upload seconds: {}",
currentAppTaskAttemptId,
TimeUnit.NANOSECONDS.toSeconds(queueInsertTime.get()),
TimeUnit.NANOSECONDS.toSeconds(queuePollTime.get()),
TimeUnit.NANOSECONDS.toSeconds(socketTime.get()),
TimeUnit.NANOSECONDS.toSeconds(stopThreadTime),
TimeUnit.NANOSECONDS.toSeconds(underlyingClientFinishUploadTime)));
TimeUnit.NANOSECONDS.toSeconds(underlyingClientFinishUploadTime));
} finally {
stopwatch.stop();
}
Expand Down Expand Up @@ -308,7 +307,7 @@ private void connectSingleClient(ServerConnectionInfo server) {
private void closeClient(ReplicatedWriteClient client) {
try {
if (client != null) {
logger.debug(String.format("Closing client: %s", client));
logger.debug("Closing client: {}", client);
client.close();
}
} catch (Throwable ex) {
Expand All @@ -327,7 +326,7 @@ private void stopThreads() {
if (!inserted) {
throw new RssQueueNotReadyException(String.format("stopThreads: Record queue has no space available after waiting %s millis", networkTimeoutMillis));
}
logger.debug(String.format("Inserted stop marker to record queue %s", i));
logger.debug("Inserted stop marker to record queue {}", i);
} catch (InterruptedException e) {
throw new RssException("Interrupted when inserting stop marker to record queue", e);
}
Expand Down Expand Up @@ -370,7 +369,7 @@ private void closeMetrics() {
metrics.close();
} catch (Throwable e) {
M3Stats.addException(e, this.getClass().getSimpleName());
logger.warn(String.format("Failed to close metrics: %s", this), e);
logger.warn("Failed to close metrics: {}", this, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void sendHeartbeats() {
try {
sendHeartbeat(serverDetail);
} catch (Throwable ex) {
logger.warn(String.format("Failed to send RSS heartbeat to %s", serverDetail), ex);
logger.warn("Failed to send RSS heartbeat to {}", serverDetail, ex);
}
}
}
Expand All @@ -138,7 +138,7 @@ private void sendHeartbeat(ServerDetail serverDetail) {
client.sendHeartbeat();
logger.info("Sent RSS heartbeat to {}, duration millis: {}", serverDetail, System.currentTimeMillis() - startTime);
} catch (Throwable ex) {
logger.warn(String.format("Failed to send RSS heartbeat to %s", serverDetail), ex);
logger.warn("Failed to send RSS heartbeat to {}", serverDetail, ex);
if (serverConnectionRefresher != null) {
ServerDetail refreshedServerDetail = serverConnectionRefresher.refreshConnection(serverDetail);
if (refreshedServerDetail != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private void connectAndInitializeClient() {
}

ServerReplicationGroup serverReplicationGroup = servers.get(nextClientIndex);
logger.info(String.format("Fetching data from server: %s (%s out of %s), partition: %s", serverReplicationGroup, nextClientIndex + 1, servers.size(), appShufflePartitionId));
logger.info("Fetching data from server: {} ({} out of {}), partition: {}", serverReplicationGroup, nextClientIndex + 1, servers.size(), appShufflePartitionId);

ExceptionWrapper<Throwable> exceptionWrapper = new ExceptionWrapper<>();
String failMsg = String.format("Failed to connect to server: %s, partition: %s", serverReplicationGroup, appShufflePartitionId);
Expand Down Expand Up @@ -184,7 +184,7 @@ private void closeClient(ReplicatedReadClient client) {
client.close();
}
} catch (Throwable ex) {
logger.warn(String.format("Failed to close client %s", client));
logger.warn("Failed to close client {}", client, ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public MultiServerSyncWriteClient(Collection<ServerReplicationGroup> servers, in
this.servers.size()));
}

logger.info(String.format("Created %s", this.getClass().getSimpleName()));
logger.info("Created {}", this.getClass().getSimpleName());
}

@Override
Expand Down Expand Up @@ -174,7 +174,7 @@ private void connectSingleClient(ServerConnectionInfo server) {
private void closeClient(ReplicatedWriteClient client) {
try {
if (client != null) {
logger.debug(String.format("Closing client: %s", client));
logger.debug("Closing client: {}", client);
client.close();
}
} catch (Throwable ex) {
Expand All @@ -187,7 +187,7 @@ private void closeMetrics() {
metrics.close();
} catch (Throwable e) {
M3Stats.addException(e, this.getClass().getSimpleName());
logger.warn(String.format("Failed to close metrics: %s", this), e);
logger.warn("Failed to close metrics: {}", this, e);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/uber/rss/clients/NotifyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public ConnectNotifyResponse connect() {
throw new RssInvalidStateException(String.format("Already connected to server, cannot connect again: %s", connectionInfo));
}

logger.debug(String.format("Connecting to server: %s", connectionInfo));
logger.debug("Connecting to server: {}", connectionInfo);

connectSocket();

Expand All @@ -63,7 +63,7 @@ public ConnectNotifyResponse connect() {

ConnectNotifyResponse connectResponse = readResponseMessage(MessageConstants.MESSAGE_ConnectNotifyResponse, ConnectNotifyResponse::deserialize);

logger.info(String.format("Connected to server: %s, response: %s", connectionInfo, connectResponse));
logger.info("Connected to server: {}, response: {}", connectionInfo, connectResponse);

return connectResponse;
}
Expand Down Expand Up @@ -94,7 +94,7 @@ private void closeMetrics() {
}
} catch (Throwable e) {
M3Stats.addException(e, this.getClass().getSimpleName());
logger.warn(String.format("Failed to close metrics: %s", connectionInfo), e);
logger.warn("Failed to close metrics: {}", connectionInfo, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ public boolean isReusable() {
}

public void closeWithoutReuse() {
logger.info(String.format("Closing connection %s without reuse", this));
logger.info("Closing connection {} without reuse", this);
reusable = false;
try {
delegate.close();
} catch (Exception e) {
logger.warn(String.format("Failed to close underlying client %s", delegate), e);
logger.warn("Failed to close underlying client {}", delegate, e);
}
}

Expand Down
Loading