Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.raquo.laminar.DomApi
import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml
import ru.d10xa.jsonlogviewer.decline.yaml.Feed
import ru.d10xa.jsonlogviewer.decline.Config
import ru.d10xa.jsonlogviewer.shell.ShellImpl

import scala.util.chaining.*

Expand Down Expand Up @@ -61,7 +62,14 @@ object ViewElement {
)
fs2.Stream
.eval(configYamlRefIO)
.flatMap(configYamlRef => LogViewerStream.stream(c, configYamlRef))
.flatMap(configYamlRef =>
LogViewerStream.stream(
c,
configYamlRef,
new StdInLinesStreamImpl,
new ShellImpl
)
)
.compile
.toList
.map(stringsToHtmlElement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import fs2.*
import ru.d10xa.jsonlogviewer.decline.ConfigInit
import ru.d10xa.jsonlogviewer.decline.ConfigInitImpl
import ru.d10xa.jsonlogviewer.decline.DeclineOpts
import ru.d10xa.jsonlogviewer.shell.ShellImpl

object Application
extends CommandIOApp(
Expand All @@ -21,14 +22,18 @@ object Application
Supervisor[IO].use { supervisor =>
configInit.initConfigYaml(config, supervisor).use { configRef =>
LogViewerStream
.stream(config, configRef)
.stream(
config = config,
configYamlRef = configRef,
stdinStream = new StdInLinesStreamImpl,
shell = new ShellImpl
)
.through(text.utf8.encode)
.through(fs2.io.stdout)
.compile
.drain
.as(ExitCode.Success)
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,20 @@ import ru.d10xa.jsonlogviewer.decline.Config.FormatIn
import ru.d10xa.jsonlogviewer.formatout.ColorLineFormatter
import ru.d10xa.jsonlogviewer.formatout.RawFormatter
import ru.d10xa.jsonlogviewer.logfmt.LogfmtLogLineParser
import ru.d10xa.jsonlogviewer.shell.Shell
import ru.d10xa.jsonlogviewer.shell.ShellImpl

import scala.util.matching.Regex
import scala.util.Failure
import scala.util.Success
import scala.util.Try

object LogViewerStream {

private var stdInLinesStreamImpl: StdInLinesStream =
new StdInLinesStreamImpl()

def getStdInLinesStreamImpl: StdInLinesStream = stdInLinesStreamImpl

def setStdInLinesStreamImpl(impl: StdInLinesStream): Unit =
stdInLinesStreamImpl = impl

private def stdinLinesStream: Stream[IO, String] =
stdInLinesStreamImpl.stdinLinesStream

def stream(
config: Config,
configYamlRef: Ref[IO, Option[ConfigYaml]]
configYamlRef: Ref[IO, Option[ConfigYaml]],
stdinStream: StdInLinesStream,
shell: Shell
): Stream[IO, String] = {
def processStreamWithConfig(
inputStream: Stream[IO, String],
Expand All @@ -58,7 +49,7 @@ object LogViewerStream {
Stream.empty
} else if (resolvedConfigs.length > 1) {
val feedStreams = resolvedConfigs.map { resolvedConfig =>
val feedStream = commandsAndInlineInputToStream(
val feedStream = shell.mergeCommandsAndInlineInput(
resolvedConfig.commands,
resolvedConfig.inlineInput
)
Expand All @@ -67,14 +58,17 @@ object LogViewerStream {
Stream.emits(feedStreams).parJoin(feedStreams.size)
} else {
val resolvedConfig = resolvedConfigs.head
val inputStream = if (resolvedConfig.inlineInput.isDefined) {
commandsAndInlineInputToStream(
resolvedConfig.commands,
resolvedConfig.inlineInput
)
} else {
stdinLinesStream
}
val inputStream =
if (
resolvedConfig.inlineInput.isDefined || resolvedConfig.commands.nonEmpty
) {
shell.mergeCommandsAndInlineInput(
resolvedConfig.commands,
resolvedConfig.inlineInput
)
} else {
stdinStream.stdinLinesStream
}
processStreamWithConfig(inputStream, resolvedConfig)
}

Expand Down Expand Up @@ -213,12 +207,6 @@ object LogViewerStream {
case Failure(_) => parseResult.raw
}

private def commandsAndInlineInputToStream(
commands: List[String],
inlineInput: Option[String]
): Stream[IO, String] =
new ShellImpl().mergeCommandsAndInlineInput(commands, inlineInput)

def makeNonCsvLogLineParser(
resolvedConfig: ResolvedConfig
): LogLineParser = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import ru.d10xa.jsonlogviewer.decline.Config
import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig
import ru.d10xa.jsonlogviewer.decline.TimestampConfig
import ru.d10xa.jsonlogviewer.query.QueryCompiler
import ru.d10xa.jsonlogviewer.shell.ShellImpl

import scala.concurrent.duration.*

Expand Down Expand Up @@ -90,13 +91,9 @@ class LogViewerStreamIntegrationTest extends CatsEffectSuite {
logInputChannel.stream
}

// Save original implementation and use test implementation
originalImpl = LogViewerStream.getStdInLinesStreamImpl
_ <- IO(LogViewerStream.setStdInLinesStreamImpl(testStreamImpl))

// Start stream processing in background
streamFiber <- LogViewerStream
.stream(baseConfig, configRef)
.stream(baseConfig, configRef, testStreamImpl, new ShellImpl)
.evalTap(result => IO(results.append(result)))
.compile
.drain
Expand Down Expand Up @@ -130,7 +127,6 @@ class LogViewerStreamIntegrationTest extends CatsEffectSuite {

// Cleanup
_ <- streamFiber.cancel
_ <- IO(LogViewerStream.setStdInLinesStreamImpl(originalImpl))

} yield {
// Verify initial INFO filter
Expand Down Expand Up @@ -215,13 +211,9 @@ class LogViewerStreamIntegrationTest extends CatsEffectSuite {
logInputChannel.stream
}

// Save original implementation and use test implementation
originalImpl = LogViewerStream.getStdInLinesStreamImpl
_ <- IO(LogViewerStream.setStdInLinesStreamImpl(testStreamImpl))

// Start stream processing in background
streamFiber <- LogViewerStream
.stream(errorFilterConfig, configRef)
.stream(errorFilterConfig, configRef, testStreamImpl, new ShellImpl)
.evalTap(result => IO(results.append(result)))
.compile
.drain
Expand Down Expand Up @@ -255,7 +247,6 @@ class LogViewerStreamIntegrationTest extends CatsEffectSuite {

// Cleanup
_ <- streamFiber.cancel
_ <- IO(LogViewerStream.setStdInLinesStreamImpl(originalImpl))

} yield {
// Verify initial field name mapping
Expand Down Expand Up @@ -287,4 +278,4 @@ class LogViewerStreamIntegrationTest extends CatsEffectSuite {
)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package ru.d10xa.jsonlogviewer

import cats.effect.IO
import cats.effect.Ref
import fs2.Stream
import munit.CatsEffectSuite
import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml
import ru.d10xa.jsonlogviewer.decline.yaml.Feed
import ru.d10xa.jsonlogviewer.decline.Config
import ru.d10xa.jsonlogviewer.decline.FieldNamesConfig
import ru.d10xa.jsonlogviewer.decline.TimestampConfig
import ru.d10xa.jsonlogviewer.shell.Shell

/** Tests to verify the proper command execution behavior based on YAML
* configuration. Ensures that commands from YAML are executed when present and
* stdin is used when no commands are available.
*/
class YamlCommandExecutionTest extends CatsEffectSuite {

private val basicConfig = Config(
configFile = None,
fieldNames = FieldNamesConfig(
timestampFieldName = "@timestamp",
levelFieldName = "level",
messageFieldName = "message",
stackTraceFieldName = "stack_trace",
loggerNameFieldName = "logger_name",
threadNameFieldName = "thread_name"
),
timestamp = TimestampConfig(None, None),
grep = List.empty,
filter = None,
formatIn = None,
formatOut = None,
showEmptyFields = false
)

test("should use commands from YAML when inlineInput is absent") {
val testStdinStream = new StdInLinesStream {
override def stdinLinesStream: Stream[IO, String] =
Stream.emit("FROM_STDIN")
}

val testShell = new Shell {
override def mergeCommandsAndInlineInput(
commands: List[String],
inlineInput: Option[String]
): Stream[IO, String] =
Stream.emit(s"FROM_COMMAND:${commands.mkString(",")}")
}

val configYaml = ConfigYaml(
fieldNames = None,
feeds = Some(
List(
Feed(
name = Some("test-feed"),
commands = List("cat test.log"),
inlineInput = None, // No inline input
filter = None,
formatIn = None,
fieldNames = None,
rawInclude = None,
rawExclude = None,
excludeFields = None,
showEmptyFields = None
)
)
),
showEmptyFields = None
)

for {
yamlRef <- Ref.of[IO, Option[ConfigYaml]](Some(configYaml))
output <- LogViewerStream
.stream(
basicConfig,
yamlRef,
testStdinStream,
testShell
)
.compile
.toList
_ <- IO {
assert(
output.exists(_.contains("FROM_COMMAND")),
"Should use output from commands in YAML"
)
assert(
!output.exists(_.contains("FROM_STDIN")),
"Should not use stdin"
)
}
} yield ()
}

test("should use stdin when no commands or inlineInput are present") {
val testStdinStream = new StdInLinesStream {
override def stdinLinesStream: Stream[IO, String] =
Stream.emit("FROM_STDIN")
}

val testShell = new Shell {
override def mergeCommandsAndInlineInput(
commands: List[String],
inlineInput: Option[String]
): Stream[IO, String] =
Stream.emit("FROM_COMMAND")
}

val configYaml = ConfigYaml(
fieldNames = None,
feeds = Some(
List(
Feed(
name = Some("test-feed"),
commands = List.empty, // Empty commands list
inlineInput = None, // No inline input
filter = None,
formatIn = None,
fieldNames = None,
rawInclude = None,
rawExclude = None,
excludeFields = None,
showEmptyFields = None
)
)
),
showEmptyFields = None
)

for {
yamlRef <- Ref.of[IO, Option[ConfigYaml]](Some(configYaml))
output <- LogViewerStream
.stream(
basicConfig,
yamlRef,
testStdinStream,
testShell
)
.compile
.toList
_ <- IO {
assert(
output.exists(_.contains("FROM_STDIN")),
"Should use stdin"
)
assert(
!output.exists(_.contains("FROM_COMMAND")),
"Should not use command output when no commands are present"
)
}
} yield ()
}
}
Loading
Loading