Skip to content

Commit e717aef

Browse files
author
Johannes Duesing
committed
Fixed timeout issue, removed 8 byte docker header from log lines
Added documentation
1 parent f503572 commit e717aef

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1+
akka.http.client.idle-timeout = infinite
2+
akka.http.host-connection-pool.idle-timeout = infinite
13
akka.http.server.websocket.periodic-keep-alive-max-idle = 10 seconds

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/ContainerCommands.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,28 +219,43 @@ class ContainerCommands(connection: DockerConnection) extends JsonSupport with C
219219

220220
def streamLogs(containerId: String, stdErrSelected: Boolean) (implicit ec: ExecutionContext) : Try[Publisher[Message]] = {
221221

222-
val queryParams = Query("stdout" -> (!stdErrSelected).toString, "stderr" -> stdErrSelected.toString, "follow" -> "true", "tail" -> "all", "timestamps" -> "true")
222+
// Select stdout / stderr in query params
223+
val queryParams = Query("stdout" -> (!stdErrSelected).toString, "stderr" -> stdErrSelected.toString, "follow" -> "true", "tail" -> "all", "timestamps" -> "false")
223224

225+
// Create actor-publisher pair, publisher will be returned
224226
val (streamActor, streamPublisher) = Source.actorRef[Message](bufferSize = 10, OverflowStrategy.dropNew)
225227
.toMat(Sink.asPublisher(fanout = true))(Keep.both)
226228
.run()
227229

230+
// Delimiter flow splits incoming traffic into lines based on dockers multiplex-protocol
231+
// Docker prepends an 8-byte header, where the last 4 byte encode line length in big endian
232+
// See https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach
228233
val delimiter: Flow[ByteString, ByteString, NotUsed] = Framing.lengthField(4, 4, 100000, ByteOrder.BIG_ENDIAN)
229234

235+
// Flow that removes header bytes from payload
236+
val removeHeaderFlow: Flow[ByteString, ByteString, NotUsed] = Flow.fromFunction(in => in.slice(8, in.size))
237+
238+
// Build request
230239
val request = Get(buildUri(containersPath / containerId.substring(0,11) / "logs", queryParams))
231240

241+
// Execute request
232242
val res = connection.sendRequest(request).flatMap { res =>
233-
val logLines = res.entity.dataBytes.via(delimiter).map(_.utf8String)
243+
// Extract payload ByteString from data stream using above flows. Map to string.
244+
val logLines = res.entity.dataBytes.via(delimiter).via(removeHeaderFlow).map(_.utf8String)
234245
logLines.runForeach { line =>
246+
// Send each log line to the stream actor, which will publish them
235247
log.debug(s"Streaming log message $line")
236248
streamActor ! TextMessage(line)
237249
}
238250
}
239251

252+
// Kill actor on completion
240253
res.onComplete{ _ =>
241254
log.info("Log stream finished successfully.")
242255
streamActor ! PoisonPill
243256
}
257+
258+
// Return publish so server can subscribe to it
244259
Success(streamPublisher)
245260
}
246261

0 commit comments

Comments
 (0)