From ad635dad3a761193ccc9232b98b2b6e85a0a2fcd Mon Sep 17 00:00:00 2001 From: zhaojizhuang <571130360@qq.com> Date: Wed, 11 Jan 2023 17:48:46 +0800 Subject: [PATCH] add support logs for namespace k8s.io Signed-off-by: zhaojizhuang <571130360@qq.com> --- cmd/nerdctl/logs.go | 32 ++- pkg/api/types/cri/metadata_types.go | 75 +++++++ pkg/labels/k8slabels/k8slabels.go | 3 + pkg/logging/cri_logger.go | 307 ++++++++++++++++++++++++++++ pkg/logging/cri_logger_test.go | 230 +++++++++++++++++++++ pkg/logging/log_viewer.go | 25 ++- pkg/logging/tail/tail.go | 75 +++++++ pkg/logging/tail/tail_test.go | 57 ++++++ 8 files changed, 794 insertions(+), 10 deletions(-) create mode 100644 pkg/api/types/cri/metadata_types.go create mode 100644 pkg/logging/cri_logger.go create mode 100644 pkg/logging/cri_logger_test.go create mode 100644 pkg/logging/tail/tail.go create mode 100644 pkg/logging/tail/tail_test.go diff --git a/cmd/nerdctl/logs.go b/cmd/nerdctl/logs.go index 83efe359a46..06c635985b4 100644 --- a/cmd/nerdctl/logs.go +++ b/cmd/nerdctl/logs.go @@ -25,9 +25,11 @@ import ( "syscall" "github.com/containerd/containerd" + "github.com/containerd/nerdctl/pkg/api/types/cri" "github.com/containerd/nerdctl/pkg/clientutil" "github.com/containerd/nerdctl/pkg/idutil/containerwalker" "github.com/containerd/nerdctl/pkg/labels" + "github.com/containerd/nerdctl/pkg/labels/k8slabels" "github.com/containerd/nerdctl/pkg/logging" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -62,8 +64,8 @@ func logsAction(cmd *cobra.Command, args []string) error { } switch globalOptions.Namespace { - case "moby", "k8s.io": - logrus.Warn("Currently, `nerdctl logs` only supports containers created with `nerdctl run -d`") + case "moby": + logrus.Warn("Currently, `nerdctl logs` only supports containers created with `nerdctl run -d` or CRI") } client, ctx, cancel, err := clientutil.NewClient(cmd.Context(), globalOptions.Namespace, globalOptions.Address) if err != nil { @@ -113,6 +115,12 @@ func logsAction(cmd *cobra.Command, args []string) error { if err != nil { return err } + + logPath, err := getLogPath(ctx, found.Container) + if err != nil { + return err + } + task, err := found.Container.Task(ctx, nil) if err != nil { return err @@ -143,13 +151,14 @@ func logsAction(cmd *cobra.Command, args []string) error { ContainerID: found.Container.ID(), Namespace: l[labels.Namespace], DatastoreRootPath: dataStore, + LogPath: logPath, Follow: follow, Timestamps: timestamps, Tail: tail, Since: since, Until: until, } - logViewer, err := logging.InitContainerLogViewer(logViewOpts, stopChannel) + logViewer, err := logging.InitContainerLogViewer(l, logViewOpts, stopChannel) if err != nil { return err } @@ -167,6 +176,23 @@ func logsAction(cmd *cobra.Command, args []string) error { return nil } +func getLogPath(ctx context.Context, container containerd.Container) (string, error) { + extensions, err := container.Extensions(ctx) + if err != nil { + return "", fmt.Errorf("get extensions for container %s,failed: %#v", container.ID(), err) + } + metaData := extensions[k8slabels.ContainerMetadataExtension] + var meta cri.ContainerMetadata + if metaData != nil { + err = meta.UnmarshalJSON(metaData.GetValue()) + if err != nil { + return "", fmt.Errorf("unmarshal extensions for container %s,failed: %#v", container.ID(), err) + } + } + + return meta.LogPath, nil +} + func logsShellComplete(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { // show container names (TODO: only show containers with logs) return shellCompleteContainerNames(cmd, nil) diff --git a/pkg/api/types/cri/metadata_types.go b/pkg/api/types/cri/metadata_types.go new file mode 100644 index 00000000000..58b14151ada --- /dev/null +++ b/pkg/api/types/cri/metadata_types.go @@ -0,0 +1,75 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Forked from https://github.com/containerd/containerd/blob/main/pkg/cri/store/container/metadata.go +// Copyright The containerd Authors. +// Licensed under the Apache License, Version 2.0 +// NOTE: we just want to get the key information of metadata( such as logpath), not all the metadata. + +package cri + +import ( + "encoding/json" + "fmt" +) + +// NOTE(random-liu): +// 1) Metadata is immutable after created. +// 2) Metadata is checkpointed as containerd container label. + +// metadataVersion is current version of container metadata. +const metadataVersion = "v1" // nolint + +// ContainerVersionedMetadata is the internal versioned container metadata. +// nolint +type ContainerVersionedMetadata struct { + // Version indicates the version of the versioned container metadata. + Version string + // Metadata's type is criContainerMetadataInternal. If not there will be a recursive call in MarshalJSON. + Metadata criContainerMetadataInternal +} + +// criContainerMetadataInternal is for internal use. +type criContainerMetadataInternal ContainerMetadata + +// ContainerMetadata is the unversioned container metadata. +type ContainerMetadata struct { + // LogPath is the container log path. + LogPath string +} + +// MarshalJSON encodes Metadata into bytes in json format. +func (c *ContainerMetadata) MarshalJSON() ([]byte, error) { + return json.Marshal(&ContainerVersionedMetadata{ + Version: metadataVersion, + Metadata: criContainerMetadataInternal(*c), + }) +} + +// UnmarshalJSON decodes Metadata from bytes. +func (c *ContainerMetadata) UnmarshalJSON(data []byte) error { + versioned := &ContainerVersionedMetadata{} + if err := json.Unmarshal(data, versioned); err != nil { + return err + } + // Handle old version after upgrade. + switch versioned.Version { + case metadataVersion: + *c = ContainerMetadata(versioned.Metadata) + return nil + } + return fmt.Errorf("unsupported version: %q", versioned.Version) +} diff --git a/pkg/labels/k8slabels/k8slabels.go b/pkg/labels/k8slabels/k8slabels.go index f44c17fa2f5..2c4588021d2 100644 --- a/pkg/labels/k8slabels/k8slabels.go +++ b/pkg/labels/k8slabels/k8slabels.go @@ -21,4 +21,7 @@ const ( PodNamespace = "io.kubernetes.pod.namespace" PodName = "io.kubernetes.pod.name" ContainerName = "io.kubernetes.container.name" + + ContainerMetadataExtension = "io.cri-containerd.container.metadata" + ContainerType = "io.cri-containerd.kind" ) diff --git a/pkg/logging/cri_logger.go b/pkg/logging/cri_logger.go new file mode 100644 index 00000000000..081d896668b --- /dev/null +++ b/pkg/logging/cri_logger.go @@ -0,0 +1,307 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* + Forked from https://github.com/kubernetes/kubernetes/blob/a66aad2d80dacc70025f95a8f97d2549ebd3208c/pkg/kubelet/kuberuntime/logs/logs.go + Copyright The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 +*/ + +package logging + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "math" + "os" + "path/filepath" + "time" + + "github.com/containerd/containerd/log" + "github.com/containerd/nerdctl/pkg/logging/tail" + "github.com/sirupsen/logrus" +) + +// LogStreamType is the type of the stream in CRI container log. +type LogStreamType string + +const ( + // Stdout is the stream type for stdout. + Stdout LogStreamType = "stdout" + // Stderr is the stream type for stderr. + Stderr LogStreamType = "stderr" +) + +// LogTag is the tag of a log line in CRI container log. +// Currently defined log tags: +// * First tag: Partial/Full - P/F. +// The field in the container log format can be extended to include multiple +// tags by using a delimiter, but changes should be rare. If it becomes clear +// that better extensibility is desired, a more extensible format (e.g., json) +// should be adopted as a replacement and/or addition. +type LogTag string + +const ( + // LogTagPartial means the line is part of multiple lines. + LogTagPartial LogTag = "P" + // LogTagFull means the line is a single full line or the end of multiple lines. + LogTagFull LogTag = "F" + // LogTagDelimiter is the delimiter for different log tags. + LogTagDelimiter = ":" +) + +// Loads log entries from logfiles produced by the Text-logger driver and forwards +// them to the provided io.Writers after applying the provided logging options. +func viewLogsCRI(lvopts LogViewOptions, stdout, stderr io.Writer, stopChannel chan os.Signal) error { + if lvopts.LogPath == "" { + return fmt.Errorf("logpath is nil ") + } + + return ReadLogs(context.Background(), &lvopts, stdout, stderr) +} + +// ReadLogs read the container log and redirect into stdout and stderr. +// Note that containerID is only needed when following the log, or else +// just pass in empty string "". +func ReadLogs(ctx context.Context, opts *LogViewOptions, stdout, stderr io.Writer) error { + var logPath = opts.LogPath + evaluated, err := filepath.EvalSymlinks(logPath) + if err != nil { + return fmt.Errorf("failed to try resolving symlinks in path %q: %v", logPath, err) + } + logPath = evaluated + f, err := os.Open(logPath) + if err != nil { + return fmt.Errorf("failed to open log file %q: %v", logPath, err) + } + defer f.Close() + + // Search start point based on tail line. + start, err := tail.FindTailLineStartIndex(f, opts.Tail) + if err != nil { + return fmt.Errorf("failed to tail %d lines of log file %q: %v", opts.Tail, logPath, err) + } + + if _, err := f.Seek(start, io.SeekStart); err != nil { + return fmt.Errorf("failed to seek in log file %q: %v", logPath, err) + } + + limitedMode := (opts.Tail > 0) && (!opts.Follow) + limitedNum := opts.Tail + // Start parsing the logs. + r := bufio.NewReader(f) + + var stop bool + isNewLine := true + writer := newLogWriter(stdout, stderr, opts) + msg := &logMessage{} + for { + if stop || (limitedMode && limitedNum == 0) { + logrus.Debugf("Finished parsing log file, path; %s", logPath) + return nil + } + l, err := r.ReadBytes(eol[0]) + if err != nil { + if err != io.EOF { // This is an real error + return fmt.Errorf("failed to read log file %q: %v", logPath, err) + } + if opts.Follow { + + // Reset seek so that if this is an incomplete line, + // it will be read again. + if _, err := f.Seek(-int64(len(l)), io.SeekCurrent); err != nil { + return fmt.Errorf("failed to reset seek in log file %q: %v", logPath, err) + } + + // If the container exited consume data until the next EOF + continue + } + // Should stop after writing the remaining content. + stop = true + if len(l) == 0 { + continue + } + logrus.Debugf("Incomplete line in log file, path: %s line: %s", logPath, l) + } + + // Parse the log line. + msg.reset() + if err := ParseCRILog(l, msg); err != nil { + logrus.WithError(err).Errorf("Failed when parsing line in log file, path: %s line: %s", logPath, l) + continue + } + // Write the log line into the stream. + if err := writer.write(msg, isNewLine); err != nil { + if err == errMaximumWrite { + logrus.Debugf("Finished parsing log file, hit bytes limit path: %s", logPath) + return nil + } + logrus.WithError(err).Errorf("Failed when writing line to log file, path: %s line: %s", logPath, l) + return err + } + if limitedMode { + limitedNum-- + } + if len(msg.log) > 0 { + isNewLine = msg.log[len(msg.log)-1] == eol[0] + } else { + isNewLine = true + } + } +} + +var ( + // eol is the end-of-line sign in the log. + eol = []byte{'\n'} + // delimiter is the delimiter for timestamp and stream type in log line. + delimiter = []byte{' '} + // tagDelimiter is the delimiter for log tags. + tagDelimiter = []byte(":") +) + +// logWriter controls the writing into the stream based on the log options. +type logWriter struct { + stdout io.Writer + stderr io.Writer + opts *LogViewOptions + remain int64 +} + +// errMaximumWrite is returned when all bytes have been written. +var errMaximumWrite = errors.New("maximum write") + +// errShortWrite is returned when the message is not fully written. +var errShortWrite = errors.New("short write") + +func newLogWriter(stdout io.Writer, stderr io.Writer, opts *LogViewOptions) *logWriter { + w := &logWriter{ + stdout: stdout, + stderr: stderr, + opts: opts, + remain: math.MaxInt64, // initialize it as infinity + } + //if opts.bytes >= 0 { + // w.remain = opts.bytes + //} + return w +} + +// writeLogs writes logs into stdout, stderr. +func (w *logWriter) write(msg *logMessage, addPrefix bool) error { + + //if msg.timestamp.Before(ts) { + // // Skip the line because it's older than since + // return nil + //} + line := msg.log + if w.opts.Timestamps && addPrefix { + prefix := append([]byte(msg.timestamp.Format(log.RFC3339NanoFixed)), delimiter[0]) + line = append(prefix, line...) + } + // If the line is longer than the remaining bytes, cut it. + if int64(len(line)) > w.remain { + line = line[:w.remain] + } + // Get the proper stream to write to. + var stream io.Writer + switch msg.stream { + case Stdout: + stream = w.stdout + case Stderr: + stream = w.stderr + default: + return fmt.Errorf("unexpected stream type %q", msg.stream) + } + n, err := stream.Write(line) + w.remain -= int64(n) + if err != nil { + return err + } + // If the line has not been fully written, return errShortWrite + if n < len(line) { + return errShortWrite + } + // If there are no more bytes left, return errMaximumWrite + if w.remain <= 0 { + return errMaximumWrite + } + return nil +} + +// logMessage is the CRI internal log type. +type logMessage struct { + timestamp time.Time + stream LogStreamType + log []byte +} + +// reset the log to nil. +func (l *logMessage) reset() { + l.timestamp = time.Time{} + l.stream = "" + l.log = nil +} + +// ParseCRILog parses logs in CRI log format. CRI Log format example: +// +// 2016-10-06T00:17:09.669794202Z stdout P log content 1 +// 2016-10-06T00:17:09.669794203Z stderr F log content 2 +func ParseCRILog(log []byte, msg *logMessage) error { + var err error + // Parse timestamp + idx := bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("timestamp is not found") + } + msg.timestamp, err = time.Parse(time.RFC3339Nano, string(log[:idx])) + if err != nil { + return fmt.Errorf("unexpected timestamp format %q: %v", time.RFC3339Nano, err) + } + + // Parse stream type + log = log[idx+1:] + idx = bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("stream type is not found") + } + msg.stream = LogStreamType(log[:idx]) + if msg.stream != Stdout && msg.stream != Stderr { + return fmt.Errorf("unexpected stream type %q", msg.stream) + } + + // Parse log tag + log = log[idx+1:] + idx = bytes.Index(log, delimiter) + if idx < 0 { + return fmt.Errorf("log tag is not found") + } + // Keep this forward compatible. + tags := bytes.Split(log[:idx], tagDelimiter) + partial := LogTag(tags[0]) == LogTagPartial + // Trim the tailing new line if this is a partial line. + if partial && len(log) > 0 && log[len(log)-1] == '\n' { + log = log[:len(log)-1] + } + + // Get log content + msg.log = log[idx+1:] + + return nil +} diff --git a/pkg/logging/cri_logger_test.go b/pkg/logging/cri_logger_test.go new file mode 100644 index 00000000000..daa469d1aab --- /dev/null +++ b/pkg/logging/cri_logger_test.go @@ -0,0 +1,230 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* + Forked from https://github.com/kubernetes/kubernetes/blob/a66aad2d80dacc70025f95a8f97d2549ebd3208c/pkg/kubelet/kuberuntime/logs/logs_test.go + Copyright The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 +*/ + +package logging + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "os" + "reflect" + "testing" + "time" +) + +func TestReadLogs(t *testing.T) { + file, err := os.CreateTemp("", "TestFollowLogs") + if err != nil { + t.Fatalf("unable to create temp file") + } + defer os.Remove(file.Name()) + file.WriteString(`2016-10-06T00:17:09.669794202Z stdout F line1` + "\n") + file.WriteString(`2016-10-06T00:17:10.669794202Z stdout F line2` + "\n") + file.WriteString(`2016-10-06T00:17:11.669794202Z stdout F line3` + "\n") + testCases := []struct { + name string + logViewOptions LogViewOptions + expected string + }{ + { + name: "default log options should output all lines", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 0, + }, + expected: "line1\nline2\nline3\n", + }, + { + name: "using Tail 2 should output last 2 lines", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 2, + }, + expected: "line2\nline3\n", + }, + { + name: "using Tail 4 should output all lines when the log has less than 4 lines", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 4, + }, + expected: "line1\nline2\nline3\n", + }, + { + name: "using Tail 0 should output all", + logViewOptions: LogViewOptions{ + LogPath: file.Name(), + Tail: 0, + }, + expected: "line1\nline2\nline3\n", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + stdoutBuf := bytes.NewBuffer(nil) + stderrBuf := bytes.NewBuffer(nil) + err = ReadLogs(context.TODO(), &tc.logViewOptions, stdoutBuf, stderrBuf) + + if err != nil { + t.Fatalf(err.Error()) + } + if stderrBuf.Len() > 0 { + t.Fatalf("Stderr: %v", stderrBuf.String()) + } + if actual := stdoutBuf.String(); tc.expected != actual { + t.Fatalf("Actual output does not match expected.\nActual: %v\nExpected: %v\n", actual, tc.expected) + } + }) + } +} + +func TestParseLog(t *testing.T) { + timestamp, err := time.Parse(time.RFC3339Nano, "2016-10-20T18:39:20.57606443Z") + + if err != nil { + t.Fatalf("Parse Time err %s", err.Error()) + } + logmsg := &logMessage{} + for c, test := range []struct { + line string + msg *logMessage + err bool + }{ + { // CRI log format stdout + line: "2016-10-20T18:39:20.57606443Z stdout F cri stdout test log\n", + msg: &logMessage{ + timestamp: timestamp, + stream: Stdout, + log: []byte("cri stdout test log\n"), + }, + }, + { // CRI log format stderr + line: "2016-10-20T18:39:20.57606443Z stderr F cri stderr test log\n", + msg: &logMessage{ + timestamp: timestamp, + stream: Stderr, + log: []byte("cri stderr test log\n"), + }, + }, + { // Unsupported Log format + line: "unsupported log format test log\n", + msg: &logMessage{}, + err: true, + }, + { // Partial CRI log line + line: "2016-10-20T18:39:20.57606443Z stdout P cri stdout partial test log\n", + msg: &logMessage{ + timestamp: timestamp, + stream: Stdout, + log: []byte("cri stdout partial test log"), + }, + }, + { // Partial CRI log line with multiple log tags. + line: "2016-10-20T18:39:20.57606443Z stdout P:TAG1:TAG2 cri stdout partial test log\n", + msg: &logMessage{ + timestamp: timestamp, + stream: Stdout, + log: []byte("cri stdout partial test log"), + }, + }, + } { + t.Logf("TestCase #%d: %+v", c, test) + + err = ParseCRILog([]byte(test.line), logmsg) + if err != nil { + if test.err { + continue + } else { + t.Errorf("ParseCRILog err %s ", err.Error()) + } + } + + if !reflect.DeepEqual(test.msg, logmsg) { + t.Errorf("ParseCRILog failed, msg is %#v,test.msg is %#v", logmsg, test.msg) + } + + } +} + +func TestReadLogsLimitsWithTimestamps(t *testing.T) { + logLineFmt := "2022-10-29T16:10:22.592603036-05:00 stdout P %v\n" + logLineNewLine := "2022-10-29T16:10:22.592603036-05:00 stdout F \n" + + tmpfile, err := os.CreateTemp("", "log.*.txt") + if err != nil { + t.Fatalf("unable to create temp file") + } + + count := 10000 + + for i := 0; i < count; i++ { + tmpfile.WriteString(fmt.Sprintf(logLineFmt, i)) + } + tmpfile.WriteString(logLineNewLine) + + for i := 0; i < count; i++ { + tmpfile.WriteString(fmt.Sprintf(logLineFmt, i)) + } + tmpfile.WriteString(logLineNewLine) + + // two lines are in the buffer + + defer os.Remove(tmpfile.Name()) // clean up + + tmpfile.Close() + + var buf bytes.Buffer + w := io.MultiWriter(&buf) + + err = ReadLogs(context.Background(), &LogViewOptions{LogPath: tmpfile.Name(), Tail: 0, Timestamps: true}, w, w) + if err != nil { + t.Errorf("ReadLogs file %s failed %s", tmpfile.Name(), err.Error()) + } + + lineCount := 0 + scanner := bufio.NewScanner(bytes.NewReader(buf.Bytes())) + for scanner.Scan() { + lineCount++ + + // Split the line + ts, logline, _ := bytes.Cut(scanner.Bytes(), []byte(" ")) + + // Verification + // 1. The timestamp should exist + // 2. The last item in the log should be 9999 + _, err = time.Parse(time.RFC3339, string(ts)) + if err != nil { + t.Errorf("timestamp not found, err: %s", err.Error()) + } + + if !bytes.HasSuffix(logline, []byte("9999")) { + t.Errorf("the complete log found, err: %s", err.Error()) + } + } + + if lineCount != 2 { + t.Errorf("should have two lines, lineCount= %d", lineCount) + } +} diff --git a/pkg/logging/log_viewer.go b/pkg/logging/log_viewer.go index f6bfb00d730..6a8c1610409 100644 --- a/pkg/logging/log_viewer.go +++ b/pkg/logging/log_viewer.go @@ -23,6 +23,7 @@ import ( "os/exec" "path/filepath" + "github.com/containerd/nerdctl/pkg/labels/k8slabels" "github.com/sirupsen/logrus" ) @@ -44,6 +45,7 @@ func RegisterLogViewer(driverName string, lvfn LogViewerFunc) { func init() { RegisterLogViewer("json-file", viewLogsJSONFile) RegisterLogViewer("journald", viewLogsJournald) + RegisterLogViewer("cri", viewLogsCRI) } // Returns a LogViewerFunc for the provided logging driver name. @@ -64,6 +66,9 @@ type LogViewOptions struct { // Absolute path to the nerdctl datastore's root. DatastoreRootPath string + // LogPath specify the log path for container created via CRI + LogPath string + // Whether or not to follow the output of the container logs. Follow bool @@ -110,15 +115,21 @@ type ContainerLogViewer struct { // Validates the given LogViewOptions, loads the logging config for the // given container and returns a ContainerLogViewer. -func InitContainerLogViewer(lvopts LogViewOptions, stopChannel chan os.Signal) (*ContainerLogViewer, error) { - if err := lvopts.Validate(); err != nil { - return nil, fmt.Errorf("invalid LogViewOptions provided (%#v): %s", lvopts, err) - } +func InitContainerLogViewer(containerLabels map[string]string, lvopts LogViewOptions, stopChannel chan os.Signal) (contlv *ContainerLogViewer, err error) { + var lcfg LogConfig + if _, ok := containerLabels[k8slabels.ContainerType]; ok { + lcfg.Driver = "cri" + } else { + if err := lvopts.Validate(); err != nil { + return nil, fmt.Errorf("invalid LogViewOptions provided (%#v): %s", lvopts, err) + } - lcfg, err := LoadLogConfig(lvopts.DatastoreRootPath, lvopts.Namespace, lvopts.ContainerID) - if err != nil { - return nil, fmt.Errorf("failed to load logging config: %s", err) + lcfg, err = LoadLogConfig(lvopts.DatastoreRootPath, lvopts.Namespace, lvopts.ContainerID) + if err != nil { + return nil, fmt.Errorf("failed to load logging config: %s", err) + } } + lv := &ContainerLogViewer{ loggingConfig: lcfg, logViewingOptions: lvopts, diff --git a/pkg/logging/tail/tail.go b/pkg/logging/tail/tail.go new file mode 100644 index 00000000000..e696841f7e6 --- /dev/null +++ b/pkg/logging/tail/tail.go @@ -0,0 +1,75 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* + Forked from https://github.com/kubernetes/kubernetes/blob/master/pkg/util/tail/tail.go + Copyright The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 +*/ + +package tail + +import ( + "bytes" + "io" +) + +const ( + // blockSize is the block size used in tail. + blockSize = 1024 +) + +var ( + // eol is the end-of-line sign in the log. + eol = []byte{'\n'} +) + +// FindTailLineStartIndex returns the start of last nth line. +// * If n <= 0, return the beginning of the file. +// * If n > 0, return the beginning of last nth line. +// Notice that if the last line is incomplete (no end-of-line), it will not be counted +// as one line. +func FindTailLineStartIndex(f io.ReadSeeker, n uint) (int64, error) { + if n <= 0 { + return 0, nil + } + size, err := f.Seek(0, io.SeekEnd) + if err != nil { + return 0, err + } + var left, cnt int64 + buf := make([]byte, blockSize) + for right := size; right > 0 && uint(cnt) <= n; right -= blockSize { + left = right - blockSize + if left < 0 { + left = 0 + buf = make([]byte, right) + } + if _, err := f.Seek(left, io.SeekStart); err != nil { + return 0, err + } + if _, err := f.Read(buf); err != nil { + return 0, err + } + cnt += int64(bytes.Count(buf, eol)) + } + for ; uint(cnt) > n; cnt-- { + idx := bytes.Index(buf, eol) + 1 + buf = buf[idx:] + left += int64(idx) + } + return left, nil +} diff --git a/pkg/logging/tail/tail_test.go b/pkg/logging/tail/tail_test.go new file mode 100644 index 00000000000..0895603f2ee --- /dev/null +++ b/pkg/logging/tail/tail_test.go @@ -0,0 +1,57 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* + Forked from https://github.com/kubernetes/kubernetes/blob/master/pkg/util/tail/tail_test.go + Copyright The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 +*/ + +package tail + +import ( + "bytes" + "strings" + "testing" +) + +func TestTail(t *testing.T) { + line := strings.Repeat("a", blockSize) + testBytes := []byte(line + "\n" + + line + "\n" + + line + "\n" + + line + "\n" + + line[blockSize/2:]) // incomplete line + + for c, test := range []struct { + n uint32 + start int64 + }{ + {n: 0, start: 0}, + {n: 1, start: int64(len(line)+1) * 3}, + {n: 9999, start: 0}, + } { + t.Logf("TestCase #%d: %+v", c, test) + r := bytes.NewReader(testBytes) + s, err := FindTailLineStartIndex(r, uint(test.n)) + if err != nil { + t.Error(err) + } + if s != test.start { + t.Errorf("%d != %d", s, test.start) + } + } +}