Skip to content

Commit f503572

Browse files
author
Johannes Duesing
committed
Now successfully parsing log lines for streaming.
WIP: Timeouts seem to kill the stream after around a minute of no log activity. Documentation missing.
1 parent 063880e commit f503572

File tree

1 file changed

+5
-37
lines changed

1 file changed

+5
-37
lines changed

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/ContainerCommands.scala

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package de.upb.cs.swt.delphi.instanceregistry.Docker
22

33

4+
import java.nio.ByteOrder
5+
46
import akka.{Done, NotUsed}
57
import akka.actor.{ActorSystem, PoisonPill}
68
import akka.http.scaladsl.client.RequestBuilding._
@@ -223,54 +225,20 @@ class ContainerCommands(connection: DockerConnection) extends JsonSupport with C
223225
.toMat(Sink.asPublisher(fanout = true))(Keep.both)
224226
.run()
225227

226-
val sink = Sink.foreach[String] { msg =>
227-
println(s"Got log message: $msg")
228-
streamActor ! TextMessage(msg)
229-
}
230-
231-
val flow: Flow[String, Message, Future[Done]] = Flow.fromSinkAndSourceMat(sink, Source.empty[Message]) (Keep.left)
232-
233-
val delimiter: Flow[ByteString, ByteString, NotUsed] = Framing.delimiter(
234-
ByteString("\uFFFD"), //TODO: Understand and implement dockers MUX - protocol for log entries ...
235-
maximumFrameLength = 100000,
236-
allowTruncation = true
237-
)
228+
val delimiter: Flow[ByteString, ByteString, NotUsed] = Framing.lengthField(4, 4, 100000, ByteOrder.BIG_ENDIAN)
238229

239230
val request = Get(buildUri(containersPath / containerId.substring(0,11) / "logs", queryParams))
240231

241232
val res = connection.sendRequest(request).flatMap { res =>
242233
val logLines = res.entity.dataBytes.via(delimiter).map(_.utf8String)
243234
logLines.runForeach { line =>
244-
println(s"Got log message $line")
235+
log.debug(s"Streaming log message $line")
245236
streamActor ! TextMessage(line)
246237
}
247238
}
248239

249-
/*
250-
251-
val (upgradeResponseFuture, closed) = Http().singleWebSocketRequest(WebSocketRequest(buildUri(containersPath / containerId.substring(0,11) / "logs", queryParams).withScheme("http")), flow)
252-
253-
val connected = upgradeResponseFuture.map { upgrade =>
254-
255-
if(upgrade.response.status == StatusCodes.OK){
256-
println(s"Response: ${upgrade.response}")
257-
println(s"Response Headers: ${upgrade.response.headers}")
258-
println(s"Response Entity: ${Unmarshal(upgrade.response.entity).to[String]}")
259-
Done
260-
} else {
261-
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
262-
}
263-
264-
}
265-
266-
connected.onComplete(println)
267-
closed.onComplete { _ =>
268-
streamActor ! PoisonPill
269-
println("Closed completed.")
270-
}
271-
*/
272240
res.onComplete{ _ =>
273-
println("Closed")
241+
log.info("Log stream finished successfully.")
274242
streamActor ! PoisonPill
275243
}
276244
Success(streamPublisher)

0 commit comments

Comments
 (0)