Skip to content

Commit 91318fc

Browse files
authored
Merge pull request #82 from delphi-hub/feature/dockerLogs
Implemented endpoints to retrieve docker logs
2 parents 2323b0b + e717aef commit 91318fc

File tree

5 files changed

+198
-30
lines changed

5 files changed

+198
-30
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1+
akka.http.client.idle-timeout = infinite
2+
akka.http.host-connection-pool.idle-timeout = infinite
13
akka.http.server.websocket.periodic-keep-alive-max-idle = 10 seconds

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/ContainerCommands.scala

Lines changed: 73 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
11
package de.upb.cs.swt.delphi.instanceregistry.Docker
22

33

4-
import akka.NotUsed
5-
import akka.actor.ActorSystem
4+
import java.nio.ByteOrder
5+
6+
import akka.{Done, NotUsed}
7+
import akka.actor.{ActorSystem, PoisonPill}
68
import akka.http.scaladsl.client.RequestBuilding._
79
import akka.http.scaladsl.model.MediaTypes.`application/json`
810
import akka.http.scaladsl.model.Uri.{Path, Query}
911
import akka.http.scaladsl.model.{HttpEntity, HttpResponse, StatusCodes}
1012
import akka.http.scaladsl.unmarshalling.Unmarshal
11-
import akka.stream.scaladsl.{Flow, Source}
13+
import akka.stream.scaladsl.{Flow, Framing, Keep, Sink, Source}
1214
import de.upb.cs.swt.delphi.instanceregistry.{AppLogging, Registry}
1315
import spray.json._
1416
import PostDataFormatting.commandJsonRequest
17+
import akka.http.scaladsl.Http
18+
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
19+
import akka.stream.OverflowStrategy
20+
import akka.util.ByteString
21+
import org.reactivestreams.Publisher
1522

16-
import scala.concurrent.{ExecutionContext, Future}
23+
import scala.collection.mutable
24+
import scala.concurrent.{Await, ExecutionContext, Future}
25+
import scala.util.{Failure, Success, Try}
1726

1827

1928
class ContainerCommands(connection: DockerConnection) extends JsonSupport with Commands with AppLogging {
@@ -184,25 +193,70 @@ class ContainerCommands(connection: DockerConnection) extends JsonSupport with C
184193
}
185194
}
186195

187-
def logs(
188-
containerId: String
189-
)(implicit ec: ExecutionContext): Source[String, NotUsed] = {
190-
val query = Query("stdout" -> "true" )
196+
def retrieveLogs(
197+
containerId: String,
198+
stdErrSelected: Boolean
199+
)(implicit ec: ExecutionContext): Future[String] = {
200+
201+
val query = Query("stdout" -> (!stdErrSelected).toString, "stderr" -> stdErrSelected.toString, "follow" -> "false", "tail" -> "all", "timestamps" -> "true")
191202
val request = Get(buildUri(containersPath / containerId.substring(0,11) / "logs", query))
192203

193-
val flow =
194-
Flow[HttpResponse].map {
195-
case HttpResponse(StatusCodes.OK, _, HttpEntity.Chunked(_, chunks), _) =>
196-
chunks.map(_.data().utf8String)
197-
case HttpResponse(StatusCodes.NotFound, _, HttpEntity.Strict(_, data), _) =>
198-
log.warning(s"DOCKER LOGS FAILED: ${data.utf8String}")
204+
connection.sendRequest(request).flatMap {response =>
205+
response.status match {
206+
case StatusCodes.OK =>
207+
Unmarshal(response.entity).to[String]
208+
case StatusCodes.UpgradeRequired =>
209+
log.warning(s"Unexpected upgrade response while reading logs for container $containerId")
210+
log.warning(s"Got $response")
211+
unknownResponseFuture(response)
212+
case StatusCodes.NotFound =>
199213
throw ContainerNotFoundException(containerId)
200-
case response =>
201-
unknownResponse(response)
202-
}.flatMapConcat(identity)
214+
case _ =>
215+
unknownResponseFuture(response)
216+
}
217+
}
218+
}
219+
220+
def streamLogs(containerId: String, stdErrSelected: Boolean) (implicit ec: ExecutionContext) : Try[Publisher[Message]] = {
221+
222+
// Select stdout / stderr in query params
223+
val queryParams = Query("stdout" -> (!stdErrSelected).toString, "stderr" -> stdErrSelected.toString, "follow" -> "true", "tail" -> "all", "timestamps" -> "false")
224+
225+
// Create actor-publisher pair, publisher will be returned
226+
val (streamActor, streamPublisher) = Source.actorRef[Message](bufferSize = 10, OverflowStrategy.dropNew)
227+
.toMat(Sink.asPublisher(fanout = true))(Keep.both)
228+
.run()
229+
230+
// Delimiter flow splits incoming traffic into lines based on dockers multiplex-protocol
231+
// Docker prepends an 8-byte header, where the last 4 byte encode line length in big endian
232+
// See https://docs.docker.com/engine/api/v1.30/#operation/ContainerAttach
233+
val delimiter: Flow[ByteString, ByteString, NotUsed] = Framing.lengthField(4, 4, 100000, ByteOrder.BIG_ENDIAN)
234+
235+
// Flow that removes header bytes from payload
236+
val removeHeaderFlow: Flow[ByteString, ByteString, NotUsed] = Flow.fromFunction(in => in.slice(8, in.size))
237+
238+
// Build request
239+
val request = Get(buildUri(containersPath / containerId.substring(0,11) / "logs", queryParams))
240+
241+
// Execute request
242+
val res = connection.sendRequest(request).flatMap { res =>
243+
// Extract payload ByteString from data stream using above flows. Map to string.
244+
val logLines = res.entity.dataBytes.via(delimiter).via(removeHeaderFlow).map(_.utf8String)
245+
logLines.runForeach { line =>
246+
// Send each log line to the stream actor, which will publish them
247+
log.debug(s"Streaming log message $line")
248+
streamActor ! TextMessage(line)
249+
}
250+
}
251+
252+
// Kill actor on completion
253+
res.onComplete{ _ =>
254+
log.info("Log stream finished successfully.")
255+
streamActor ! PoisonPill
256+
}
203257

204-
Source.fromFuture(connection.sendRequest(request))
205-
.via(flow)
258+
// Return publish so server can subscribe to it
259+
Success(streamPublisher)
206260
}
207261

208262
def commandCreate(

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Docker/DockerActor.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package de.upb.cs.swt.delphi.instanceregistry.Docker
22

33
import akka.actor.{Actor, ActorLogging, ActorSystem, Props, Status}
4+
import akka.http.scaladsl.model.ws.Message
45
import akka.stream.ActorMaterializer
56
import de.upb.cs.swt.delphi.instanceregistry.Docker.DockerActor._
67
import de.upb.cs.swt.delphi.instanceregistry.Registry
78
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.ComponentType
9+
import org.reactivestreams.Publisher
810

911
import scala.concurrent.duration._
1012
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -120,9 +122,23 @@ class DockerActor(connection: DockerConnection) extends Actor with ActorLogging
120122
}
121123
}
122124

123-
case logs(containerId: String) =>
124-
log.info(s"Fetching Container logs")
125-
sender ! container.logs(containerId)
125+
case logs(containerId: String, stdErrSelected: Boolean, stream: Boolean) =>
126+
127+
log.info(s"Fetching Container logs: stdErrSelected -> $stdErrSelected, stream -> $stream")
128+
129+
if(!stream){
130+
val logResult = Try(Await.result(container.retrieveLogs(containerId, stdErrSelected), Duration.Inf))
131+
logResult match {
132+
case Failure(ex) =>
133+
log.warning(s"Failed to get container logs with ${ex.getMessage}")
134+
sender ! Failure(ex)
135+
case Success(logContent) =>
136+
sender ! Success(logContent)
137+
}
138+
} else {
139+
sender ! container.streamLogs(containerId, stdErrSelected)
140+
}
141+
126142

127143
case x => log.warning("Received unknown message: [{}] ", x)
128144
}
@@ -146,7 +162,7 @@ object DockerActor {
146162

147163
case class restart(containerId: String)
148164

149-
case class logs(containerId: String)
165+
case class logs(containerId: String, stdErrSelected: Boolean, stream: Boolean)
150166

151167
case class runCommand(
152168
containerId: String,

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package de.upb.cs.swt.delphi.instanceregistry
22

3-
import akka.NotUsed
43
import akka.actor._
54
import akka.http.scaladsl.model.StatusCodes
5+
import akka.http.scaladsl.model.ws.Message
66
import akka.pattern.{AskTimeoutException, ask}
77
import akka.util.Timeout
88
import de.upb.cs.swt.delphi.instanceregistry.Docker.DockerActor._
@@ -14,6 +14,7 @@ import de.upb.cs.swt.delphi.instanceregistry.daos.InstanceDAO
1414
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState}
1515
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState
1616
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model._
17+
import org.reactivestreams.Publisher
1718

1819
import scala.concurrent.duration._
1920
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -28,7 +29,6 @@ class RequestHandler(configuration: Configuration, instanceDao: InstanceDAO, con
2829
implicit val materializer : Materializer = ActorMaterializer()
2930
implicit val ec: ExecutionContext = system.dispatcher
3031

31-
3232
val (eventActor, eventPublisher) = Source.actorRef[RegistryEvent](10, OverflowStrategy.dropNew)
3333
.toMat(Sink.asPublisher(fanout = true))(Keep.both)
3434
.run()
@@ -674,23 +674,59 @@ class RequestHandler(configuration: Configuration, instanceDao: InstanceDAO, con
674674
* @param id Id of the instance
675675
* @return Tuple of OperationResult and Option[Source[...] ]
676676
*/
677-
def handleGetLogs(id: Long) : (OperationResult.Value, Option[Source[String, NotUsed]]) = {
677+
def handleGetLogs(id: Long, stdErrSelected: Boolean) : (OperationResult.Value, Option[String]) = {
678678
if(!instanceDao.hasInstance(id)){
679679
(OperationResult.IdUnknown, None)
680680
} else if(!isInstanceDockerContainer(id)){
681681
(OperationResult.NoDockerContainer, None)
682682
} else {
683683
val instance = instanceDao.getInstance(id).get
684684

685-
val f : Future[(OperationResult.Value, Option[Source[String, NotUsed]])]= (dockerActor ? logs(instance.dockerId.get))(configuration.dockerOperationTimeout).map{
686-
source: Any =>
687-
(OperationResult.Ok, Option(source.asInstanceOf[Source[String, NotUsed]]))
685+
val f : Future[(OperationResult.Value, Option[String])] = (dockerActor ? logs(instance.dockerId.get, stdErrSelected, stream = false))(configuration.dockerOperationTimeout).map{
686+
logVal: Any =>
687+
val logResult = logVal.asInstanceOf[Try[String]]
688+
logResult match {
689+
case Success(logContent) =>
690+
(OperationResult.Ok, Some(logContent))
691+
case Failure(ex) =>
692+
log.warning(s"Failed to get logs from actor, exception: ${ex.getMessage}")
693+
(OperationResult.InternalError, None)
694+
}
695+
688696
}.recover{
689697
case ex: Exception =>
690698
fireDockerOperationErrorEvent(Some(instance), errorMessage = s"Failed to get logs with message: ${ex.getMessage}")
691699
(OperationResult.InternalError, None)
692700
}
693-
Await.result(f, Duration.Inf)
701+
Await.result(f, configuration.dockerOperationTimeout.duration)
702+
}
703+
}
704+
705+
def handleStreamLogs(id: Long, stdErrSelected: Boolean) : (OperationResult.Value, Option[Publisher[Message]]) = {
706+
if(!instanceDao.hasInstance(id)){
707+
(OperationResult.IdUnknown, None)
708+
} else if(!isInstanceDockerContainer(id)){
709+
(OperationResult.NoDockerContainer, None)
710+
} else {
711+
val instance = instanceDao.getInstance(id).get
712+
713+
val f : Future[(OperationResult.Value, Option[Publisher[Message]])] = (dockerActor ? logs(instance.dockerId.get, stdErrSelected, stream = true))(configuration.dockerOperationTimeout).map{
714+
publisherVal: Any =>
715+
val publisherResult = publisherVal.asInstanceOf[Try[Publisher[Message]]]
716+
publisherResult match {
717+
case Success(publisher) =>
718+
(OperationResult.Ok, Some(publisher))
719+
case Failure(ex) =>
720+
log.warning(s"Failed to stream logs from actor, exception: ${ex.getMessage}")
721+
(OperationResult.InternalError, None)
722+
}
723+
724+
}.recover{
725+
case ex: Exception =>
726+
fireDockerOperationErrorEvent(Some(instance), errorMessage = s"Failed to stream logs with message: ${ex.getMessage}")
727+
(OperationResult.InternalError, None)
728+
}
729+
Await.result(f, configuration.dockerOperationTimeout.duration)
694730
}
695731
}
696732

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class Server (handler: RequestHandler) extends HttpApp
6767
path("delete") { deleteContainer()} ~
6868
path("assignInstance") { assignInstance()} ~
6969
path("command") { runCommandInContainer()} ~
70+
path("logs") { retrieveLogs()} ~
71+
path("attach") { streamLogs()} ~
7072
/****************EVENT OPERATIONS****************/
7173
path("events") { streamEvents()}
7274

@@ -675,6 +677,64 @@ class Server (handler: RequestHandler) extends HttpApp
675677
}
676678
}
677679

680+
def retrieveLogs(): server.Route = parameters('Id.as[Long], 'StdErr.as[Boolean].?) { (id, stdErrOption) =>
681+
authenticateOAuth2[AccessToken]("Secure Site", AuthProvider.authenticateOAuthRequire(_, userType = UserType.Admin)){ token =>
682+
get {
683+
log.debug(s"GET /logs?Id=$id has been called")
684+
685+
val stdErrSelected = stdErrOption.isDefined && stdErrOption.get
686+
687+
handler.handleGetLogs(id, stdErrSelected) match {
688+
case (handler.OperationResult.IdUnknown, _) =>
689+
log.warning(s"Cannot get logs, id $id not found.")
690+
complete{HttpResponse(StatusCodes.NotFound, entity = s"Cannot get logs, id $id not found.")}
691+
case (handler.OperationResult.NoDockerContainer, _) =>
692+
log.warning(s"Cannot get logs, id $id is no docker container.")
693+
complete{HttpResponse(StatusCodes.BadRequest,entity = s"Cannot get logs, id $id is no docker container.")}
694+
case (handler.OperationResult.Ok, Some(logString)) =>
695+
complete{logString}
696+
case (handler.OperationResult.InternalError, _) =>
697+
complete{HttpResponse(StatusCodes.InternalServerError, entity = s"Internal server error")}
698+
case _ =>
699+
complete{HttpResponse(StatusCodes.InternalServerError, entity = s"Internal server error")}
700+
}
701+
}
702+
}
703+
}
704+
705+
def streamLogs(): server.Route = parameters('Id.as[Long], 'StdErr.as[Boolean].?) { (id, stdErrOption) =>
706+
707+
708+
val stdErrSelected = stdErrOption.isDefined && stdErrOption.get
709+
710+
handler.handleStreamLogs(id, stdErrSelected) match {
711+
case (handler.OperationResult.IdUnknown, _) =>
712+
complete{HttpResponse(StatusCodes.NotFound, entity = s"Cannot stream logs, id $id not found.")}
713+
case (handler.OperationResult.NoDockerContainer, _) =>
714+
complete{HttpResponse(StatusCodes.BadRequest, entity = s"Cannot stream logs, id $id is no docker container.")}
715+
case (handler.OperationResult.Ok, Some(publisher)) =>
716+
handleWebSocketMessages {
717+
Flow[Message]
718+
.via(
719+
Flow.fromSinkAndSource(Sink.ignore, Source.fromPublisher(publisher))
720+
)
721+
.watchTermination() {(_, done) =>
722+
done.onComplete {
723+
case Success(_) =>
724+
log.info("Log stream route completed successfully")
725+
case Failure(ex) =>
726+
log.error(s"Log stream route completed with failure : $ex")
727+
}
728+
}
729+
}
730+
case (handler.OperationResult.InternalError, _) =>
731+
complete{HttpResponse(StatusCodes.InternalServerError, entity = s"Internal server error")}
732+
case _ =>
733+
complete{HttpResponse(StatusCodes.InternalServerError, entity = s"Internal server error")}
734+
}
735+
736+
}
737+
678738
/**
679739
* Creates a WebSocketConnection that streams events that are issued by the registry to all connected clients.
680740
* @return Server route that maps to the WebSocketConnection

0 commit comments

Comments
 (0)