diff --git a/.gitignore b/.gitignore index b791028c2..eaee00a37 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ *.orig *.app +.history Instruments/ActorInstruments/ActorInstruments.xcodeproj/xcuserdata Instruments/ActorInstruments/build/ diff --git a/Package.swift b/Package.swift index 6b29667ec..a4def51a9 100644 --- a/Package.swift +++ b/Package.swift @@ -39,11 +39,13 @@ var targets: [PackageDescription.Target] = [ .product(name: "NIOSSL", package: "swift-nio-ssl"), .product(name: "NIOExtras", package: "swift-nio-extras"), .product(name: "SwiftProtobuf", package: "swift-protobuf"), - .product(name: "Logging", package: "swift-log"), - .product(name: "Metrics", package: "swift-metrics"), .product(name: "ServiceDiscovery", package: "swift-service-discovery"), .product(name: "Backtrace", package: "swift-backtrace"), .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), + // Observability + .product(name: "Logging", package: "swift-log"), + .product(name: "Metrics", package: "swift-metrics"), + .product(name: "Tracing", package: "swift-distributed-tracing"), ] ), @@ -182,7 +184,8 @@ var dependencies: [Package.Dependency] = [ .package(url: "https://github.com/apple/swift-collections", from: "1.0.1"), // ~~~ Observability ~~~ - .package(url: "https://github.com/apple/swift-log", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-log", from: "1.4.0"), + .package(url: "https://github.com/apple/swift-distributed-tracing", from: "0.3.0"), // swift-metrics 1.x and 2.x are almost API compatible, so most clients should use .package(url: "https://github.com/apple/swift-metrics", "1.0.0" ..< "3.0.0"), .package(url: "https://github.com/apple/swift-service-discovery", from: "1.0.0"), diff --git a/Samples/Package.swift b/Samples/Package.swift index 3b8decbc1..c2e9b3073 100644 --- a/Samples/Package.swift +++ b/Samples/Package.swift @@ -21,6 +21,7 @@ var targets: [PackageDescription.Target] = [ name: "SampleDiningPhilosophers", dependencies: [ .product(name: "DistributedCluster", package: "swift-distributed-actors"), + "_PrettyLogHandler", ], path: "Sources/SampleDiningPhilosophers", exclude: [ @@ -29,6 +30,33 @@ var targets: [PackageDescription.Target] = [ ] ), + .executableTarget( + name: "SampleClusterTracing", + dependencies: [ + .product(name: "DistributedCluster", package: "swift-distributed-actors"), + .product(name: "OpenTelemetry", package: "opentelemetry-swift"), + .product(name: "OtlpGRPCSpanExporting", package: "opentelemetry-swift"), + "_PrettyLogHandler", + ] + ), + + .executableTarget( + name: "SampleClusterBuilds", + dependencies: [ + .product(name: "DistributedCluster", package: "swift-distributed-actors"), + .product(name: "OpenTelemetry", package: "opentelemetry-swift"), + .product(name: "OtlpGRPCSpanExporting", package: "opentelemetry-swift"), + "_PrettyLogHandler", + ] + ), + + .target( + name: "_PrettyLogHandler", + dependencies: [ + .product(name: "DistributedCluster", package: "swift-distributed-actors"), + ] + ), + /* --- tests --- */ // no-tests placeholder project to not have `swift test` fail on Samples/ @@ -45,6 +73,7 @@ var dependencies: [Package.Dependency] = [ .package(name: "swift-distributed-actors", path: "../"), // ~~~~~~~ only for samples ~~~~~~~ + .package(url: "https://github.com/slashmo/opentelemetry-swift", from: "0.3.0"), ] let package = Package( @@ -58,11 +87,14 @@ let package = Package( ], products: [ /* --- samples --- */ - .executable( name: "SampleDiningPhilosophers", targets: ["SampleDiningPhilosophers"] ), + .executable( + name: "SampleClusterTracing", + targets: ["SampleClusterTracing"] + ), ], dependencies: dependencies, diff --git a/Samples/Sources/SampleClusterBuilds/ClusterBuilds.swift b/Samples/Sources/SampleClusterBuilds/ClusterBuilds.swift new file mode 100644 index 000000000..3d24f63be --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/ClusterBuilds.swift @@ -0,0 +1,69 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +struct ClusterBuilds { + let system: ClusterSystem + var workers: [ActorID: BuildWorker] = [:] + + init(name: String, port: Int) async { + self.system = await ClusterSystem(name) { settings in + settings.bindPort = port + + settings.plugins.install(plugin: ClusterSingletonPlugin()) + + // We are purposefully making allowing long calls: + settings.remoteCall.defaultTimeout = .seconds(20) + + // Try joining this seed node automatically; once we have joined at least once node, we'll learn about others. + settings.discovery = ServiceDiscoverySettings(static: [ + Main.Config.seedEndpoint, + ]) + } + self.system.cluster.join(endpoint: Main.Config.seedEndpoint) + } + + mutating func run(tasks: Int) async throws { + var singletonSettings = ClusterSingletonSettings() + singletonSettings.allocationStrategy = .byLeadership + + // Pretend we have some work to do: + let buildTasks: [BuildTask] = (0 ..< tasks).map { _ in BuildTask() } + + // anyone can host the singleton, but by default, it'll be on the build leader (7330) various strategies are possible. + try await system.singleton.host(name: BuildLeader.singletonName) { actorSystem in + await BuildLeader(buildTasks: buildTasks, actorSystem: actorSystem) + } + + // all nodes, except the build-leader node contain a few workers: + if self.system.isBuildWorker { + for _ in 0 ..< Main.Config.workersPerNode { + await makeWorker() + } + } + } + + private mutating func makeWorker() async { + let worker = await BuildWorker(actorSystem: self.system) + self.workers[worker.id] = worker + } +} diff --git a/Samples/Sources/SampleClusterBuilds/Convenience/Clock+Extensions.swift b/Samples/Sources/SampleClusterBuilds/Convenience/Clock+Extensions.swift new file mode 100644 index 000000000..d1f1434fb --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/Convenience/Clock+Extensions.swift @@ -0,0 +1,28 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +// Sleep, with adding a little bit of noise (additional delay) to the duration. +func noisySleep(for duration: ContinuousClock.Duration) async { + var duration = duration + .milliseconds(Int.random(in: 200 ..< 500)) + try? await Task.sleep(until: ContinuousClock.now + duration, clock: .continuous) +} diff --git a/Samples/Sources/SampleClusterBuilds/Convenience/Roles.swift b/Samples/Sources/SampleClusterBuilds/Convenience/Roles.swift new file mode 100644 index 000000000..a7389d0aa --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/Convenience/Roles.swift @@ -0,0 +1,33 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +// TODO: we want to support "member roles" in membership, this would be then a built in query, and not hardcoded by port either. +extension ClusterSystem { + var isBuildLeader: Bool { + self.cluster.endpoint.port == 7330 + } + + var isBuildWorker: Bool { + self.cluster.endpoint.port > 7330 + } +} diff --git a/Samples/Sources/SampleClusterBuilds/Convenience/Weak.swift b/Samples/Sources/SampleClusterBuilds/Convenience/Weak.swift new file mode 100644 index 000000000..91b199a9e --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/Convenience/Weak.swift @@ -0,0 +1,28 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedCluster + +final class Weak { + weak var actor: Act? + + init(_ actor: Act) { + self.actor = actor + } + + init(idForRemoval id: ClusterSystem.ActorID) { + self.actor = nil + } +} diff --git a/Samples/Sources/SampleClusterBuilds/DefaultActorSystem.swift b/Samples/Sources/SampleClusterBuilds/DefaultActorSystem.swift new file mode 100644 index 000000000..9e16d5067 --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/DefaultActorSystem.swift @@ -0,0 +1,18 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedCluster + +typealias DefaultDistributedActorSystem = ClusterSystem diff --git a/Samples/Sources/SampleClusterBuilds/Model.swift b/Samples/Sources/SampleClusterBuilds/Model.swift new file mode 100644 index 000000000..305696a8e --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/Model.swift @@ -0,0 +1,55 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import struct Foundation.UUID +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +struct BuildTask: Sendable, Codable, Hashable { + let id: BuildTaskID + + // this would have some state about "what to build" + + init() { + self.id = .init() + } + + init(id: BuildTaskID) { + self.id = id + } +} + +/// A trivial "build result" representation, not carrying additional information, just for demonstration purposes. +enum BuildResult: Sendable, Codable, Equatable { + case successful + case failed + case rejected +} + +struct BuildTaskID: Sendable, Codable, Hashable, CustomStringConvertible { + let id: UUID + init() { + self.id = UUID() + } + + var description: String { + "\(id)" + } +} diff --git a/Samples/Sources/SampleClusterBuilds/Roles/BuildLeader.swift b/Samples/Sources/SampleClusterBuilds/Roles/BuildLeader.swift new file mode 100644 index 000000000..56693217c --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/Roles/BuildLeader.swift @@ -0,0 +1,186 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DequeModule +import DistributedCluster +import Logging +import Tracing + +distributed actor BuildLeader: ClusterSingleton, LifecycleWatch { + static let singletonName = "BuildLeader" + + lazy var log = Logger(actor: self) + + var initialTasks: Int + var remainingTasks: Deque + + var totalWorkers: [ActorID: Weak] = [:] + var workerAssignment: [ActorID: BuildTask] = [:] + + var availableWorkers: AsyncStream! + var availableWorkersCC: AsyncStream.Continuation! + + var membership: Cluster.Membership = .empty + + init(buildTasks: [BuildTask], actorSystem: ActorSystem) async { + self.actorSystem = actorSystem + self.initialTasks = buildTasks.count + self.remainingTasks = Deque(buildTasks) + + self.availableWorkers = AsyncStream(BuildWorker.self) { cc in + self.availableWorkersCC = cc + } + + log.notice("\(Self.self) initialized on [\(actorSystem.cluster.node)]") + Task { + await self.discoverBuildWorkers() + } + Task { + await self.subscribeMembership() + } + Task { + try await self.processAllBuildTasks() + } + } + + func processAllBuildTasks() async throws -> BuildStats { + log.notice("Process all build tasks \(self.remainingTasks.count)", metadata: [ + "tasks/remaining": "\(self.remainingTasks.count)", + "workers/count": "\(totalWorkers.count)", + ]) + + let span = InstrumentationSystem.tracer.startSpan("all-\(self.remainingTasks.count)-builds", baggage: .current ?? .topLevel) + defer { span.end() } + + /// Keep searching for available workers until we have processed all BuildTasks. + for try await availableWorker in availableWorkers { + guard let buildTask = self.remainingTasks.popFirst() else { + break // no more tasks! + } + + self.workerAssignment[availableWorker.id] = buildTask + log.notice("Schedule work [\(buildTask.id)] on [\(availableWorker)]!", metadata: [ + "tasks/remaining": "\(self.remainingTasks.count)", + "workers/total": "\(self.totalWorkers.count)", + "workers/available": "\(self.totalWorkers.count - self.workerAssignment.count)", + "workers/assigned": "\(self.workerAssignment.count)", + "workers/nodes": "\(self.membership.count(atLeast: .up))", + ]) + + Task { + await Baggage.$current.withValue(span.baggage) { + do { + let result = try await availableWorker.work(on: buildTask, reportLogs: nil) + self.builtTaskCompleted(result, by: availableWorker) + } catch { + self.builtTaskCompleted(.failed, by: availableWorker) + } + } + } + } + + return self.stats() + } + + private func builtTaskCompleted(_ result: BuildResult, by worker: BuildWorker) { + guard let assignedTask = self.workerAssignment.removeValue(forKey: worker.id) else { + log.warning("Worker [\(worker)] completed task but wasn't assigned any!") + return + } + + log.notice("Task [\(assignedTask.id)] was completed [\(result)] by [\(worker)]!") + switch result { + case .successful: + break + case .rejected, .failed: + log.notice("Task [\(assignedTask.id)] was [\(result)], so we must schedule it again...") + self.remainingTasks.append(assignedTask) + } + + if self.totalWorkers[worker.id] != nil { + log.notice("Worker \(worker.id) is available for other work again!") + // the worker is not terminated, good. + self.availableWorkersCC.yield(worker) + } else { + log.notice("Worker \(worker.id) seems to have failed, not available for new work...") + } + } + + func workerAvailable(_ worker: BuildWorker) { + self.totalWorkers[worker.id] = .init(worker) + } + + func stats() -> BuildStats { + .init(total: self.initialTasks, processed: self.initialTasks - remainingTasks.count) + } +} + +struct BuildStats: Sendable, Codable, CustomStringConvertible { + let total: Int + let processed: Int + + var complete: Bool { + self.total == self.processed + } + + init(total: Int, processed: Int) { + self.total = total + self.processed = processed + } + + var description: String { + "BuildStats(total: \(self.total), processed: \(self.processed), complete: \(self.complete))" + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Maintaining worker statuses + +extension BuildLeader { + func discoverBuildWorkers() async { + log.notice("Discovering \(BuildWorker.self) actors...") + for await worker in await self.actorSystem.receptionist.listing(of: BuildWorker.self) { + self.totalWorkers[worker.id] = .init(worker) + log.notice("Discovered new \(BuildWorker.self)", metadata: [ + "discovered/id": "\(worker.id)", + "workers/count": "\(self.totalWorkers.count)", + ]) + + self.availableWorkersCC.yield(watchTermination(of: worker)) + } + } + + func subscribeMembership() async { + for await event in self.actorSystem.cluster.events { + try? self.membership.apply(event: event) + } + } + + func terminated(actor id: ClusterSystem.ActorID) async { + log.warning("Worker actor \(id) terminated, removed from available workers", metadata: [ + "terminated/id": "\(id)", + "workers/total": "\(self.totalWorkers.count)", + "workers/available": "\(self.totalWorkers.count - self.workerAssignment.count)", + "workers/assigned": "\(self.workerAssignment.count)", + "workers/nodes": "\(self.membership.count(atLeast: .up))", + ]) + + _ = self.totalWorkers.removeValue(forKey: id) + + if let interruptedTask = self.workerAssignment.removeValue(forKey: id) { + self.log.warning("Worker [\(id)] was working on [\(interruptedTask)], re-scheduling this task...") + self.remainingTasks.append(interruptedTask) + } + } +} diff --git a/Samples/Sources/SampleClusterBuilds/Roles/BuildWorker.swift b/Samples/Sources/SampleClusterBuilds/Roles/BuildWorker.swift new file mode 100644 index 000000000..79cf8452f --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/Roles/BuildWorker.swift @@ -0,0 +1,70 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster +import Logging +import Tracing + +distributed actor BuildWorker: CustomStringConvertible { + lazy var log = Logger(actor: self) + var activeBuildTask: BuildTask? + + @ActorID.Metadata(\.receptionID) + var receptionID: String + + init(actorSystem: ActorSystem) async { + self.actorSystem = actorSystem + + self.receptionID = "*" // default key for "all of this type" + await actorSystem.receptionist.checkIn(self) + self.log.notice("Build worker initialized on \(actorSystem.cluster.node)") + } + + distributed func work(on task: BuildTask, reportLogs log: LogCollector? = nil) async -> BuildResult { + if let activeBuildTask = self.activeBuildTask { + self.log.warning("Reject task [\(task)], already working on [\(activeBuildTask)]") + return .rejected + } + + self.activeBuildTask = task + defer { self.activeBuildTask = nil } + + await InstrumentationSystem.tracer.withSpan("build") { _ in + log?.log(line: "Starting build \(task)...") + await noisySleep(for: .seconds(1)) + + for i in 1 ... 5 { + await InstrumentationSystem.tracer.withSpan("build-step-\(i)") { _ in + log?.log(line: "Building file \(i)/5") + await noisySleep(for: .seconds(1)) + } + } + } + + await InstrumentationSystem.tracer.withSpan("all-tests") { _ in + for i in 1 ... 5 { + await InstrumentationSystem.tracer.withSpan("test-step-\(i)") { _ in + log?.log(line: "Testing \(i)/5") + await noisySleep(for: .seconds(1)) + } + } + } + + return .successful + } + + public nonisolated var description: String { + "\(Self.self)(\(self.id))" + } +} diff --git a/Samples/Sources/SampleClusterBuilds/Roles/LogCollector.swift b/Samples/Sources/SampleClusterBuilds/Roles/LogCollector.swift new file mode 100644 index 000000000..a24c74b1c --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/Roles/LogCollector.swift @@ -0,0 +1,19 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import DistributedCluster + +distributed actor LogCollector { + nonisolated func log(line: String) {} +} diff --git a/Samples/Sources/SampleClusterBuilds/boot.swift b/Samples/Sources/SampleClusterBuilds/boot.swift new file mode 100644 index 000000000..1a789af87 --- /dev/null +++ b/Samples/Sources/SampleClusterBuilds/boot.swift @@ -0,0 +1,78 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +@main +enum Main { + enum Config { + // "seed" node that all others will join (and learn about other nodes in the cluster automatically) + static let seedEndpoint = Cluster.Endpoint(host: "127.0.0.1", port: 7330) // just a convention, leader can be decided in various ways + + // How many workers per node + static let workersPerNode = 2 + // How many tasks should the leader try to process, scaling out computation to workers. + static let totalBuildTasks = 100 + } + + static func main() async throws { + print("===---------------------------------------------------------===") + print("| Sample Cluster Builds |") + print("| |") + print("| USAGE: |") + print("| swift run SampleClusterBuilds 7330 (becomes leader) |") + print("| swift run SampleClusterBuilds 7331 worker |") + print("| swift run SampleClusterBuilds ... worker |") + print("===---------------------------------------------------------===") + + let port = Int(CommandLine.arguments.dropFirst().first ?? "7330")! + let role: ClusterNodeRole + if port == 7330 { + role = .leader + } else if CommandLine.arguments.dropFirst(2).first == "worker" { + role = .worker + } else { + fatalError("Undefined role for node: \(port)! Available roles: \(ClusterNodeRole.allCases)") + } + let nodeName = "ClusterBuilds-\(port)-\(role)" + + // Bootstrap logging: + LoggingSystem.bootstrap(SamplePrettyLogHandler.init) + + // Bootstrap OpenTelemetry tracing: + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let exporter = OtlpGRPCSpanExporter(config: OtlpGRPCSpanExporter.Config(eventLoopGroup: group)) + let processor = OTel.SimpleSpanProcessor(exportingTo: exporter) + + let otel = OTel(serviceName: nodeName, eventLoopGroup: group, processor: processor) + InstrumentationSystem.bootstrap(otel.tracer()) + + var app = await ClusterBuilds(name: nodeName, port: port) + try! await app.run(tasks: Main.Config.totalBuildTasks) + + try await app.system.terminated + } +} + +enum ClusterNodeRole: CaseIterable, Hashable { + case leader + case worker +} diff --git a/Samples/Sources/SampleClusterTracing/Clock+Extensions.swift b/Samples/Sources/SampleClusterTracing/Clock+Extensions.swift new file mode 100644 index 000000000..d7fee503b --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Clock+Extensions.swift @@ -0,0 +1,28 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +// Sleep, with adding a little bit of noise (additional delay) to the duration. +func noisySleep(for duration: ContinuousClock.Duration) async { + var duration = duration + .milliseconds(Int.random(in: 100 ..< 300)) + try? await Task.sleep(until: ContinuousClock.now + duration, clock: .continuous) +} diff --git a/Samples/Sources/SampleClusterTracing/Cooking/Chopping.swift b/Samples/Sources/SampleClusterTracing/Cooking/Chopping.swift new file mode 100644 index 000000000..d26b72d72 --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Cooking/Chopping.swift @@ -0,0 +1,44 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import Tracing + +protocol Chopping { + func chop(_ vegetable: Vegetable) async throws -> Vegetable +} + +distributed actor VegetableChopper: Chopping { + @ActorID.Metadata(\.receptionID) + var receptionID: String + + init(actorSystem: ActorSystem) async { + self.actorSystem = actorSystem + + self.receptionID = "*" // default key for "all of this type" + await actorSystem.receptionist.checkIn(self) + } + + distributed func chop(_ vegetable: Vegetable) async throws -> Vegetable { + await InstrumentationSystem.tracer.withSpan(#function) { _ in + await noisySleep(for: .seconds(5)) + + return vegetable.asChopped + } + } +} diff --git a/Samples/Sources/SampleClusterTracing/Cooking/PrimaryCook.swift b/Samples/Sources/SampleClusterTracing/Cooking/PrimaryCook.swift new file mode 100644 index 000000000..42d696b2f --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Cooking/PrimaryCook.swift @@ -0,0 +1,143 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import Tracing + +distributed actor PrimaryCook: LifecycleWatch { + lazy var log = Logger(actor: self) + + var choppers: [ClusterSystem.ActorID: VegetableChopper] = [:] + var waitingForChoppers: (Int, CheckedContinuation)? + + init(actorSystem: ActorSystem) async { + self.actorSystem = actorSystem + + _ = self.startChopperListingTask() + } + + func startChopperListingTask() -> Task { + Task { + for await chopper in await actorSystem.receptionist.listing(of: VegetableChopper.self) { + log.notice("Discovered vegetable chopper: \(chopper.id)") + self.choppers[chopper.id] = chopper + + /// We implement a simple "if we're waiting for N choppers... let's notify the continuation once that is reached" + /// This would be nice to provide as a fun "active" collection type that can be `.waitFor(...)`-ed. + if let waitingForChoppersCount = self.waitingForChoppers?.0, + choppers.count >= waitingForChoppersCount + { + self.waitingForChoppers?.1.resume() + } + } + } + } + + distributed func makeDinner() async throws -> Meal { + try await InstrumentationSystem.tracer.withSpan(#function) { _ in + await noisySleep(for: .milliseconds(200)) + + log.notice("Cooking dinner, but we need [2] vegetable choppers...! Suspend waiting for nodes to join.") + let (first, second) = try await getChoppers() + async let veggies = try chopVegetables(firstChopper: first, secondChopper: second) + async let meat = marinateMeat() + async let oven = preheatOven(temperature: 350) + // ... + return try await cook(veggies, meat, oven) + } + } + + private func getChoppers() async throws -> (some Chopping, some Chopping) { + await withCheckedContinuation { cc in + self.waitingForChoppers = (2, cc) + } + + var chopperIDs = self.choppers.keys.makeIterator() + guard let id1 = chopperIDs.next(), + let first = choppers[id1] + else { + throw NotEnoughChoppersError() + } + guard let id2 = chopperIDs.next(), + let second = choppers[id2] + else { + throw NotEnoughChoppersError() + } + + return (first, second) + } + + // Called by lifecycle watch when a watched actor terminates. + func terminated(actor id: DistributedCluster.ActorID) async { + self.choppers.removeValue(forKey: id) + } +} + +func chopVegetables(firstChopper: some Chopping, + secondChopper: some Chopping) async throws -> [Vegetable] +{ + try await InstrumentationSystem.tracer.withSpan("chopVegetables") { _ in + // Chop the vegetables...! + // + // However, since chopping is a very difficult operation, + // one chopping task can be performed at the same time on a single service! + // (Imagine that... we cannot parallelize these two tasks, and need to involve another service). + async let carrot = try firstChopper.chop(.carrot(chopped: false)) + async let potato = try secondChopper.chop(.potato(chopped: false)) + return try await [carrot, potato] + } +} + +// func chop(_ vegetable: Vegetable, tracer: any Tracer) async throws -> Vegetable { +// await tracer.withSpan("chop-\(vegetable)") { _ in +// await sleep(for: .seconds(5)) +// // ... +// return vegetable // "chopped" +// } +// } + +func marinateMeat() async -> Meat { + await noisySleep(for: .milliseconds(620)) + + return await InstrumentationSystem.tracer.withSpan("marinateMeat") { _ in + await noisySleep(for: .seconds(3)) + // ... + return Meat() + } +} + +func preheatOven(temperature: Int) async -> Oven { + await InstrumentationSystem.tracer.withSpan("preheatOven") { _ in + // ... + await noisySleep(for: .seconds(6)) + return Oven() + } +} + +func cook(_: Any, _: Any, _: Any) async -> Meal { + await InstrumentationSystem.tracer.withSpan("cook") { span in + span.addEvent("children-asking-if-done-already") + await noisySleep(for: .seconds(3)) + span.addEvent("children-asking-if-done-already-again") + await noisySleep(for: .seconds(2)) + // ... + return Meal() + } +} + +struct NotEnoughChoppersError: Error {} diff --git a/Samples/Sources/SampleClusterTracing/DefaultActorSystem.swift b/Samples/Sources/SampleClusterTracing/DefaultActorSystem.swift new file mode 100644 index 000000000..9e16d5067 --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/DefaultActorSystem.swift @@ -0,0 +1,18 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedCluster + +typealias DefaultDistributedActorSystem = ClusterSystem diff --git a/Samples/Sources/SampleClusterTracing/Model.swift b/Samples/Sources/SampleClusterTracing/Model.swift new file mode 100644 index 000000000..366eb3175 --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Model.swift @@ -0,0 +1,40 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +struct Meal: Sendable, Codable {} + +struct Meat: Sendable, Codable {} + +struct Oven: Sendable, Codable {} + +enum Vegetable: Sendable, Codable { + case potato(chopped: Bool) + case carrot(chopped: Bool) + + var asChopped: Self { + switch self { + case .carrot: return .carrot(chopped: true) + case .potato: return .potato(chopped: true) + } + } +} diff --git a/Samples/Sources/SampleClusterTracing/Roles/ChoppingNode.swift b/Samples/Sources/SampleClusterTracing/Roles/ChoppingNode.swift new file mode 100644 index 000000000..7a3438b86 --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Roles/ChoppingNode.swift @@ -0,0 +1,54 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import Tracing + +struct ChoppingNode { + let system: ClusterSystem + + var chopper: VegetableChopper? + + init(name: String, port: Int) async { + self.system = await ClusterSystem(name) { settings in + settings.bindPort = port + + // We are purposefully making very slow calls, so they show up nicely in tracing: + settings.remoteCall.defaultTimeout = .seconds(20) + } + } + + mutating func run() async throws { + monitorMembership(on: self.system) + + let leaderEndpoint = Cluster.Endpoint(host: self.system.cluster.endpoint.host, port: 7330) + self.system.log.notice("Joining: \(leaderEndpoint)") + self.system.cluster.join(endpoint: leaderEndpoint) + + try await self.system.cluster.up(within: .seconds(30)) + self.system.log.notice("Joined!") + + let chopper = await VegetableChopper(actorSystem: system) + self.chopper = chopper + self.system.log.notice("Vegetable chopper \(chopper) started!") + + for await chopper in await self.system.receptionist.listing(of: VegetableChopper.self) { + self.system.log.warning("GOT: \(chopper.id)") + } + } +} diff --git a/Samples/Sources/SampleClusterTracing/Roles/LeaderNode.swift b/Samples/Sources/SampleClusterTracing/Roles/LeaderNode.swift new file mode 100644 index 000000000..1a6844cdd --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/Roles/LeaderNode.swift @@ -0,0 +1,42 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import Tracing + +struct LeaderNode { + let system: ClusterSystem + + init(name: String, port: Int) async { + self.system = await ClusterSystem(name) { settings in + settings.bindPort = port + + // We are purposefully making very slow calls, so they show up nicely in tracing: + settings.remoteCall.defaultTimeout = .seconds(20) + } + } + + func run() async throws { + monitorMembership(on: self.system) + + let cook = await PrimaryCook(actorSystem: system) + let meal = try await cook.makeDinner() + + self.system.log.notice("Made dinner successfully!") + } +} diff --git a/Samples/Sources/SampleClusterTracing/boot.swift b/Samples/Sources/SampleClusterTracing/boot.swift new file mode 100644 index 000000000..c5e838363 --- /dev/null +++ b/Samples/Sources/SampleClusterTracing/boot.swift @@ -0,0 +1,98 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import _PrettyLogHandler +import Distributed +import DistributedCluster +import Logging +import NIO +import OpenTelemetry +import OtlpGRPCSpanExporting +import Tracing + +/* + * Sample showcasing a long traced interaction. + */ +@main enum Main { + static func main() async throws { + print("===-----------------------------------------------------===") + print("| Cluster Tracing Sample App |") + print("| |") + print("| USAGE: |") + print("| swift run SampleClusterTracing # leader |") + print("| swift run SampleClusterTracing 7331 chopping |") + print("| swift run SampleClusterTracing 7332 chopping |") + print("===-----------------------------------------------------===") + + let port = Int(CommandLine.arguments.dropFirst().first ?? "7330")! + let role: ClusterNodeRole + if port == 7330 { + role = .leader + } else if CommandLine.arguments.dropFirst(2).first == "chopping" { + role = .chopping + } else { + fatalError("Undefined role for node: \(port)! Available roles: \(ClusterNodeRole.allCases)") + } + let nodeName = "SampleNode-\(port)-\(role)" + + // Bootstrap logging: + LoggingSystem.bootstrap(SamplePrettyLogHandler.init) + + // Bootstrap OpenTelemetry tracing: + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let exporter = OtlpGRPCSpanExporter(config: OtlpGRPCSpanExporter.Config(eventLoopGroup: group)) + let processor = OTel.SimpleSpanProcessor(exportingTo: exporter) + + let otel = OTel(serviceName: nodeName, eventLoopGroup: group, processor: processor) + InstrumentationSystem.bootstrap(otel.tracer()) + + // Start the sample app node. + // (All nodes attempt to join the leader at 7330, forming a cluster with it). + let system: ClusterSystem + + if role == .leader { + let node = await LeaderNode(name: nodeName, port: port) + system = node.system + try! await node.run() + } else { + var node = await ChoppingNode(name: nodeName, port: port) + system = node.system + try! await node.run() + } + + try await system.terminated + } +} + +func monitorMembership(on system: ClusterSystem) { + Task { + for await event in system.cluster.events { + system.log.debug("Membership change: \(event)") + + let membership = await system.cluster.membershipSnapshot + if membership.members(withStatus: .down).count == membership.count { + system.log.notice("Membership: \(membership.count)", metadata: [ + "cluster/membership": Logger.MetadataValue.array(membership.members(atMost: .down).map { + "\($0)" + }), + ]) + } + } + } +} + +enum ClusterNodeRole: CaseIterable, Hashable { + case leader + case chopping +} diff --git a/Samples/Sources/SampleDiningPhilosophers/boot.swift b/Samples/Sources/SampleDiningPhilosophers/boot.swift index a7c91d2de..79b23a833 100644 --- a/Samples/Sources/SampleDiningPhilosophers/boot.swift +++ b/Samples/Sources/SampleDiningPhilosophers/boot.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import _PrettyLogHandler import Distributed import DistributedCluster import Logging diff --git a/Samples/Sources/SampleDiningPhilosophers/SamplePrettyLogHandler.swift b/Samples/Sources/_PrettyLogHandler/SamplePrettyLogHandler.swift similarity index 90% rename from Samples/Sources/SampleDiningPhilosophers/SamplePrettyLogHandler.swift rename to Samples/Sources/_PrettyLogHandler/SamplePrettyLogHandler.swift index fc9ce8142..89606497d 100644 --- a/Samples/Sources/SampleDiningPhilosophers/SamplePrettyLogHandler.swift +++ b/Samples/Sources/_PrettyLogHandler/SamplePrettyLogHandler.swift @@ -29,7 +29,7 @@ import WASILibc #endif /// Logger that prints "pretty" for showcasing the cluster nicely in sample applications. -struct SamplePrettyLogHandler: LogHandler { +public struct SamplePrettyLogHandler: LogHandler { static let CONSOLE_RESET = "\u{001B}[0;0m" static let CONSOLE_BOLD = "\u{001B}[1m" @@ -52,8 +52,7 @@ struct SamplePrettyLogHandler: LogHandler { } } - // internal for testing only - internal init(label: String) { + public init(label: String) { self.label = label } @@ -75,7 +74,14 @@ struct SamplePrettyLogHandler: LogHandler { nodeInfo += "\(node)" } let label: String - if let path = effectiveMetadata.removeValue(forKey: "actor/path")?.description { + if let id = effectiveMetadata.removeValue(forKey: "actor/id")?.description { + if id.contains("[$wellKnown") { + label = String(id[id.firstIndex(of: "[")! ..< id.endIndex]) + } else { + label = id + } + effectiveMetadata.removeValue(forKey: "actor/path") + } else if let path = effectiveMetadata.removeValue(forKey: "actor/path")?.description { label = path } else { label = "" diff --git a/Samples/docker/collector-config.yaml b/Samples/docker/collector-config.yaml new file mode 100644 index 000000000..1a9f4b8a6 --- /dev/null +++ b/Samples/docker/collector-config.yaml @@ -0,0 +1,24 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: otel-collector:4317 + +exporters: + logging: + logLevel: debug + + jaeger: + endpoint: "jaeger:14250" + tls: + insecure: true + + zipkin: + endpoint: "http://zipkin:9411/api/v2/spans" + + +service: + pipelines: + traces: + receivers: otlp + exporters: [logging, jaeger, zipkin] diff --git a/Samples/docker/docker-compose.yaml b/Samples/docker/docker-compose.yaml new file mode 100644 index 000000000..b4522b41a --- /dev/null +++ b/Samples/docker/docker-compose.yaml @@ -0,0 +1,26 @@ +version: '3' +services: + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + command: ["--config=/etc/config.yaml"] + volumes: + - ./collector-config.yaml:/etc/config.yaml + ports: + - "4317:4317" + networks: [exporter] + depends_on: [zipkin, jaeger] + + zipkin: + image: openzipkin/zipkin:latest + ports: + - "9411:9411" + networks: [exporter] + + jaeger: + image: jaegertracing/all-in-one + ports: + - "16686:16686" + networks: [exporter] + +networks: + exporter: diff --git a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift index c3abba3ba..d696bc0e1 100644 --- a/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift +++ b/Sources/DistributedActorsTestKit/Cluster/ClusteredActorSystemsXCTestCase.swift @@ -300,7 +300,11 @@ extension ClusteredActorSystemsXCTestCase { } return self.lock.withLock { - self._logCaptures[index] + if _logCaptures.count > index { + return self._logCaptures[index] + } else { + fatalError("Attempt to get logs but no logs captured for index \(index)!") + } } } diff --git a/Sources/DistributedCluster/Cluster/Cluster+Membership.swift b/Sources/DistributedCluster/Cluster/Cluster+Membership.swift index 1b621f186..0e8a36ea1 100644 --- a/Sources/DistributedCluster/Cluster/Cluster+Membership.swift +++ b/Sources/DistributedCluster/Cluster/Cluster+Membership.swift @@ -703,6 +703,7 @@ extension Cluster { case notFoundAny(Cluster.Endpoint, in: Cluster.Membership) case atLeastStatusRequirementNotMet(expectedAtLeast: Cluster.MemberStatus, found: Cluster.Member) case statusRequirementNotMet(expected: Cluster.MemberStatus, found: Cluster.Member) + case countRequirementNotMet(expected: Int, expectedStatus: Cluster.MemberStatus) case awaitStatusTimedOut(Duration, Error?) var prettyDescription: String { @@ -721,6 +722,8 @@ extension Cluster { return "Expected \(reflecting: foundMember.node) to be seen as at-least [\(expectedAtLeastStatus)] but was [\(foundMember.status)]" case .statusRequirementNotMet(let expectedStatus, let foundMember): return "Expected \(reflecting: foundMember.node) to be seen as [\(expectedStatus)] but was [\(foundMember.status)]" + case .countRequirementNotMet(let count, let expectedStatus): + return "Expected \(count) nodes to be seen as [\(expectedStatus)], but did not find enough" case .awaitStatusTimedOut(let duration, let lastError): let lastErrorMessage: String if let error = lastError { diff --git a/Sources/DistributedCluster/Cluster/ClusterControl.swift b/Sources/DistributedCluster/Cluster/ClusterControl.swift index 81da6cb71..e4302e471 100644 --- a/Sources/DistributedCluster/Cluster/ClusterControl.swift +++ b/Sources/DistributedCluster/Cluster/ClusterControl.swift @@ -162,10 +162,23 @@ public struct ClusterControl { /// /// - Returns `Cluster.Member` for the joined node. @discardableResult + @available(*, deprecated, renamed: "up(within:)") public func joined(within: Duration) async throws -> Cluster.Member { try await self.waitFor(self.node, .up, within: within) } + /// Wait, within the given duration, until this actor system has joined the cluster and become ``Cluster/MemberStatus/up``. + /// + /// - Parameters + /// - node: The node to be joined by this system. + /// - within: Duration to wait for. + /// + /// - Returns `Cluster.Member` for the joined node. + @discardableResult + public func up(within: Duration) async throws -> Cluster.Member { + try await self.waitFor(self.node, .up, within: within) + } + /// Wait, within the given duration, until the passed in node has joined the cluster and become ``Cluster/MemberStatus/up``. /// /// - Parameters @@ -208,6 +221,22 @@ public struct ClusterControl { } } + /// Wait, within the given duration, for the cluster to have at least `nodes` members of the specified status. + /// + /// - Parameters + /// - nodes: The _least_ (inclusive) number of nodes (including this node) to be part of the cluster membership + /// - status: The expected member status. + /// - within: Duration to wait for. + public func waitFor(countAtLeast: Int, _ status: Cluster.MemberStatus, within: Duration) async throws { + try await self.waitForMembershipEventually(within: within) { membership in + if membership.count(withStatus: status) >= countAtLeast { + return membership + } else { + throw Cluster.MembershipError(.countRequirementNotMet(expected: countAtLeast, expectedStatus: status)) + } + } + } + /// Wait, within the given duration, for this actor system to be a member of all the nodes' respective cluster and have **at least** the specified status. /// /// - Parameters diff --git a/Sources/DistributedCluster/Cluster/ClusterShell.swift b/Sources/DistributedCluster/Cluster/ClusterShell.swift index 7137b277b..0b3a2bdf0 100644 --- a/Sources/DistributedCluster/Cluster/ClusterShell.swift +++ b/Sources/DistributedCluster/Cluster/ClusterShell.swift @@ -148,12 +148,12 @@ internal class ClusterShell { return } - system.log.warning("Terminate existing association [\(reflecting: remoteNode)].") + system.log.info("Terminate existing association [\(reflecting: remoteNode)].") // notify the failure detector, that we shall assume this node as dead from here on. // it's gossip will also propagate the information through the cluster traceLog_Remote(system.cluster.node, "Finish terminate association [\(remoteNode)]: Notifying SWIM, .confirmDead") - system.log.warning("Confirm .dead to underlying SWIM, node: \(reflecting: remoteNode)") + system.log.debug("Confirm .dead to underlying SWIM, node: \(reflecting: remoteNode)") self._swimShell.confirmDead(node: remoteNode) // it is important that we first check the contains; as otherwise we'd re-add a .down member for what was already removed (!) @@ -1059,7 +1059,7 @@ extension ClusterShell { // the change was a replacement and thus we need to down the old member (same host:port as the new one), // and terminate its association. - state.log.info("Accepted handshake from [\(reflecting: directive.handshake.remoteNode)] which replaces the previously known: [\(reflecting: replacedMember)].") + state.log.debug("Accepted handshake from [\(reflecting: directive.handshake.remoteNode)] which replaces the previously known: [\(reflecting: replacedMember)].") // We MUST be careful to first terminate the association and then store the new one in 2) self.terminateAssociation(context.system, state: &state, replacedMember.node) diff --git a/Sources/DistributedCluster/Cluster/Leadership.swift b/Sources/DistributedCluster/Cluster/Leadership.swift index 12db930ec..0feacceca 100644 --- a/Sources/DistributedCluster/Cluster/Leadership.swift +++ b/Sources/DistributedCluster/Cluster/Leadership.swift @@ -243,7 +243,7 @@ extension Leadership { if enoughMembers { return self.selectByLowestAddress(context: context, membership: &membership, membersToSelectAmong: membersToSelectAmong) } else { - context.log.info("Not enough members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)] to run election, members: \(membersToSelectAmong)") + context.log.debug("Not enough members [\(membersToSelectAmong.count)/\(self.minimumNumberOfMembersToDecide)] to run election, members: \(membersToSelectAmong)") if self.loseLeadershipIfBelowMinNrOfMembers { return self.notEnoughMembers(context: context, membership: &membership, membersToSelectAmong: membersToSelectAmong) } else { @@ -300,7 +300,7 @@ extension Leadership { .first if let change = try! membership.applyLeadershipChange(to: leader) { // try! safe, as we KNOW this member is part of membership - context.log.debug( + context.log.info( "Selected new leader: [\(oldLeader, orElse: "nil") -> \(leader, orElse: "nil")]", metadata: [ "membership": "\(membership)", diff --git a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift index 9d3b2dc2d..e26887f57 100644 --- a/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift +++ b/Sources/DistributedCluster/Cluster/Reception/OperationLogDistributedReceptionist.swift @@ -525,9 +525,9 @@ extension OpLogDistributedReceptionist { for registration in registrations { if subscription.tryOffer(registration: registration) { - self.log.notice("OFFERED \(registration.actorID) TO \(subscription)") + self.log.trace("Offered \(registration.actorID) to \(subscription)") } else { - self.log.notice("DROPPED \(registration.actorID) TO \(subscription)") + self.log.trace("Dropped \(registration.actorID) to \(subscription)") } } } @@ -744,11 +744,7 @@ extension OpLogDistributedReceptionist { for subscription in subscriptions { for registration in registrations { - if subscription.tryOffer(registration: registration) { - self.log.notice("OFFERED \(registration.actorID) TO \(subscription)") - } else { - self.log.notice("DROPPED \(registration.actorID) TO \(subscription)") - } + subscription.tryOffer(registration: registration) } } } diff --git a/Sources/DistributedCluster/Cluster/SWIM/SWIMActor.swift b/Sources/DistributedCluster/Cluster/SWIM/SWIMActor.swift index 885118dc0..f63f95064 100644 --- a/Sources/DistributedCluster/Cluster/SWIM/SWIMActor.swift +++ b/Sources/DistributedCluster/Cluster/SWIM/SWIMActor.swift @@ -338,7 +338,7 @@ internal distributed actor SWIMActor: SWIMPeer, SWIMAddressablePeer, CustomStrin // Log the transition switch change.status { case .unreachable: - self.log.info( + self.log.debug( """ Node \(change.member.node) determined [.unreachable]! \ The node is not yet marked [.down], a downing strategy or other Cluster.Event subscriber may act upon this information. @@ -347,7 +347,7 @@ internal distributed actor SWIMActor: SWIMPeer, SWIMAddressablePeer, CustomStrin ] ) default: - self.log.info( + self.log.debug( "Node \(change.member.node) determined [.\(change.status)] (was \(optional: change.previousStatus)).", metadata: [ "swim/member": "\(change.member)", diff --git a/Sources/DistributedCluster/ClusterSystem.swift b/Sources/DistributedCluster/ClusterSystem.swift index a2ff3bb6d..eb057233e 100644 --- a/Sources/DistributedCluster/ClusterSystem.swift +++ b/Sources/DistributedCluster/ClusterSystem.swift @@ -21,6 +21,7 @@ import DistributedActorsConcurrencyHelpers import Foundation // for UUID import Logging import NIO +import Tracing /// A `ClusterSystem` is a confined space which runs and manages Actors. /// @@ -1148,28 +1149,36 @@ extension ClusterSystem { let recipient = _RemoteClusterActorPersonality(shell: clusterShell, id: actor.id._asRemote, system: self) let arguments = invocation.arguments - let reply: RemoteCallReply = try await self.withCallID(on: actor.id, target: target) { callID in - let invocation = InvocationMessage( - callID: callID, - targetIdentifier: target.identifier, - genericSubstitutions: invocation.genericSubstitutions, - arguments: arguments - ) + // -- Pick up the distributed-tracing baggage; It will be injected into the message inside the withCallID body. + let baggage = Baggage.current ?? .topLevel + // TODO: we can enrich this with actor and system information here if not already present. + + return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { _ in + let reply: RemoteCallReply = try await self.withCallID(on: actor.id, target: target) { callID in + var invocation = InvocationMessage( + callID: callID, + targetIdentifier: target.identifier, + genericSubstitutions: invocation.genericSubstitutions, + arguments: arguments + ) - recipient.sendInvocation(invocation) - } + InstrumentationSystem.instrument.inject(baggage, into: &invocation, using: .invocationMessage) - if let error = reply.thrownError { - throw error - } - guard let value = reply.value else { - throw RemoteCallError( - .invalidReply(reply.callID), - on: actor.id, - target: target - ) + recipient.sendInvocation(invocation) + } + + if let error = reply.thrownError { + throw error + } + guard let value = reply.value else { + throw RemoteCallError( + .invalidReply(reply.callID), + on: actor.id, + target: target + ) + } + return value } - return value } public func remoteCallVoid( @@ -1211,18 +1220,27 @@ extension ClusterSystem { let recipient = _RemoteClusterActorPersonality(shell: clusterShell, id: actor.id._asRemote, system: self) let arguments = invocation.arguments - let reply: RemoteCallReply<_Done> = try await self.withCallID(on: actor.id, target: target) { callID in - let invocation = InvocationMessage( - callID: callID, - targetIdentifier: target.identifier, - genericSubstitutions: invocation.genericSubstitutions, - arguments: arguments - ) - recipient.sendInvocation(invocation) - } + // -- Pick up the distributed-tracing baggage; It will be injected into the message inside the withCallID body. + let baggage = Baggage.current ?? .topLevel + // TODO: we can enrich this with actor and system information here if not already present. + + return try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .client) { _ in + let reply: RemoteCallReply<_Done> = try await self.withCallID(on: actor.id, target: target) { callID in + var invocation = InvocationMessage( + callID: callID, + targetIdentifier: target.identifier, + genericSubstitutions: invocation.genericSubstitutions, + arguments: arguments + ) - if let error = reply.thrownError { - throw error + InstrumentationSystem.instrument.inject(baggage, into: &invocation, using: .invocationMessage) + + recipient.sendInvocation(invocation) + } + + if let error = reply.thrownError { + throw error + } } } @@ -1233,8 +1251,10 @@ extension ClusterSystem { ) async throws -> Reply where Reply: AnyRemoteCallReply { + // Make an UUID for the remote call (so we can accept a reply for it) let callID = UUID() + // Prepare timeout handling let timeout = RemoteCall.timeout ?? self.settings.remoteCall.defaultTimeout let timeoutTask: Task = Task.detached { try await Task.sleep(nanoseconds: UInt64(timeout.nanoseconds)) @@ -1273,6 +1293,7 @@ extension ClusterSystem { timeoutTask.cancel() } + /// Call the body which should perform the actual call! let reply: any AnyRemoteCallReply = try await withCheckedThrowingContinuation { continuation in self.inFlightCallLock.withLock { self._inFlightCalls[callID] = continuation // this is to be resumed from an incoming reply or timeout @@ -1403,6 +1424,9 @@ extension ClusterSystem { return } + var baggage: Baggage = .topLevel + InstrumentationSystem.instrument.extract(invocation, into: &baggage, using: .invocationMessage) + Task { var decoder = ClusterInvocationDecoder(system: self, message: invocation) @@ -1420,12 +1444,14 @@ extension ClusterSystem { throw DeadLetterError(recipient: recipient) } - try await executeDistributedTarget( - on: actor, - target: target, - invocationDecoder: &decoder, - handler: resultHandler - ) + try await InstrumentationSystem.tracer.withSpan("\(target)", baggage: baggage, ofKind: .server) { _ in + try await executeDistributedTarget( + on: actor, + target: target, + invocationDecoder: &decoder, + handler: resultHandler + ) + } } catch { // FIXME(distributed): is this right? do { diff --git a/Sources/DistributedCluster/DeadLetters.swift b/Sources/DistributedCluster/DeadLetters.swift index 282952ae6..66517f048 100644 --- a/Sources/DistributedCluster/DeadLetters.swift +++ b/Sources/DistributedCluster/DeadLetters.swift @@ -204,7 +204,7 @@ public final class DeadLetterOffice { } // in all other cases, we want to log the dead letter: - self.log.notice( + self.log.debug( "Dead letter was not delivered \(recipientString)", metadata: { () -> Logger.Metadata in // TODO: more metadata (from Envelope) (e.g. sender) diff --git a/Sources/DistributedCluster/DistributedProgress/DistributedProgress.swift b/Sources/DistributedCluster/DistributedProgress/DistributedProgress.swift new file mode 100644 index 000000000..c9266d749 --- /dev/null +++ b/Sources/DistributedCluster/DistributedProgress/DistributedProgress.swift @@ -0,0 +1,247 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedActorsConcurrencyHelpers +import Logging + +class TEST { + class Thing { + func change() {} + } + + class State { + let mutable: Thing + init(mutable: Thing) { + self.mutable = mutable + } + } + + func test(state: State) { + Task { + state.mutable.change() // data-rade (!) + } + Task { + state.mutable.change() // data-rade (!) + } + } +} + +public distributed actor DistributedProgress { + public typealias ActorSystem = ClusterSystem + lazy var log = Logger(actor: self) + + var step: Steps? + var subscribers: Set = [] + + public init(actorSystem: ActorSystem, steps: Steps.Type = Steps.self) { + self.actorSystem = actorSystem + } + + func to(step: Steps) async throws { + // TODO: checks that we don't move backwards... + self.log.notice("Move to step: \(step)") + self.step = step + + for sub in self.subscribers { + try await sub.currentStep(step) + } + + if step == Steps.allCases.reversed().first { + self.log.notice("Progress completed, clear subscribers.") + self.subscribers = [] + return + } + } + + distributed func subscribe(subscriber: Subscriber) async throws { + self.log.notice("Subscribed \(subscriber.id)...") + self.subscribers.insert(subscriber) + + if let step { + try await subscriber.currentStep(step) + } + } + + distributed actor ProgressSubscriber { + typealias ActorSystem = ClusterSystem + + /// Mutable box that we update as the progress proceeds remotely... + let box: Box + + init(box: Box, actorSystem: ActorSystem) { + self.actorSystem = actorSystem + self.box = box + } + + distributed func currentStep(_ step: Steps) { + self.box.updateStep(step) + } + } + + public final class Box: Codable { + public typealias Element = Steps + + let lock: Lock + private var currentStep: Steps? + + let source: DistributedProgress + let actorSystem: ClusterSystem + private var _sub: ProgressSubscriber? + + private var _nextCC: CheckedContinuation? + private var _completedCC: CheckedContinuation? + + public // FIXME: not public + init(source: DistributedProgress) { + self.source = source + self.actorSystem = source.actorSystem + self.lock = Lock() + self.currentStep = nil + } + + public init(from decoder: Decoder) throws { + let container = try decoder.singleValueContainer() + self.lock = Lock() + self.currentStep = nil + self.actorSystem = decoder.userInfo[.actorSystemKey] as! ClusterSystem + self.source = try container.decode(DistributedProgress.self) + } + + public func encode(to encoder: Encoder) throws { + var container = encoder.singleValueContainer() + try container.encode(self.source) + } + + /// Suspend until this ``DistributedProgress`` has reached its last, and final, "step". + public func completed() async throws { + if self.currentStep == Steps.last { + return + } + + try await self.ensureSubscription() + + await withCheckedContinuation { (cc: CheckedContinuation) in + self._completedCC = cc + } + } + + /// Suspend until this ``DistributedProgress`` receives a next "step". + public func nextStep() async throws -> Steps? { + if self.currentStep == Steps.last { + return nil // last step was already emitted + } + + try await self.ensureSubscription() + + return await withCheckedContinuation { (cc: CheckedContinuation) in + self._nextCC = cc + } + } + + func updateStep(_ step: Steps) { + self.lock.lock() + defer { self.lock.unlock() } + + self.currentStep = step + + if let onNext = _nextCC { + onNext.resume(returning: step) + } + + if step == Steps.last { + if let completed = _completedCC { + completed.resume() + } + } + } + + @discardableResult + private func ensureSubscription() async throws -> ProgressSubscriber { + self.lock.lock() + + if let sub = self._sub { + self.lock.unlock() + return sub + } else { + let sub = ProgressSubscriber(box: self, actorSystem: self.actorSystem) + self._sub = sub + self.lock.unlock() + + try await self.source.subscribe(subscriber: sub) + return sub + } + } + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Progress AsyncSequence + +extension DistributedProgress.Box { + public func steps(file: String = #file, line: UInt = #line) async throws -> DistributedProgressAsyncSequence { + try await self.ensureSubscription() + + return DistributedProgressAsyncSequence(box: self) + } +} + +public struct DistributedProgressAsyncSequence: AsyncSequence { + public typealias Element = Steps + + private let box: DistributedProgress.Box + + public init(box: DistributedProgress.Box) { + self.box = box + } + + public func makeAsyncIterator() -> AsyncIterator { + return AsyncIterator(box: self.box) + } + + public struct AsyncIterator: AsyncIteratorProtocol { + public typealias Element = Steps + let box: DistributedProgress.Box + + init(box: DistributedProgress.Box) { + self.box = box + } + + public func next() async throws -> Steps? { + try await self.box.nextStep() + } + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Progress Steps protocol + +public protocol DistributedProgressSteps: Codable, Sendable, Equatable, CaseIterable { + static var count: Int { get } + static var last: Self { get } +} + +extension DistributedProgressSteps { + public static var count: Int { + precondition(count > 0, "\(Self.self) cannot have zero steps (cases)!") + return Self.allCases.count + } + + public static var last: Self { + guard let last = Self.allCases.reversed().first else { + fatalError("\(Self.self) cannot have zero steps (cases)!") + } + return last + } +} diff --git a/Sources/DistributedCluster/InvocationBehavior.swift b/Sources/DistributedCluster/InvocationBehavior.swift index f641f65d2..328967b5c 100644 --- a/Sources/DistributedCluster/InvocationBehavior.swift +++ b/Sources/DistributedCluster/InvocationBehavior.swift @@ -14,6 +14,7 @@ import Distributed import struct Foundation.Data +import InstrumentationBaggage /// Representation of the distributed invocation in the Behavior APIs. /// This needs to be removed eventually as we remove behaviors. @@ -23,12 +24,19 @@ public struct InvocationMessage: Sendable, Codable, CustomStringConvertible { let genericSubstitutions: [String] let arguments: [Data] + /// Tracing metadata, injected/extracted by distributed-tracing. + var metadata: [String: String] = [:] + + var hasMetadata: Bool { + !self.metadata.isEmpty + } + var target: RemoteCallTarget { RemoteCallTarget(targetIdentifier) } public var description: String { - "InvocationMessage(callID: \(callID), target: \(target), genericSubstitutions: \(genericSubstitutions), arguments: \(arguments.count))" + "InvocationMessage(callID: \(callID), target: \(target), genericSubstitutions: \(genericSubstitutions), arguments: \(arguments.count), metadata: \(metadata))" } } diff --git a/Sources/DistributedCluster/TracingSupport.swift b/Sources/DistributedCluster/TracingSupport.swift new file mode 100644 index 000000000..1259b81bc --- /dev/null +++ b/Sources/DistributedCluster/TracingSupport.swift @@ -0,0 +1,49 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Tracing + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Injector + +struct InvocationMessageInjector: Tracing.Injector { + typealias Carrier = InvocationMessage + + func inject(_ value: String, forKey key: String, into carrier: inout Carrier) { + carrier.metadata[key] = value + } +} + +extension Tracing.Injector where Self == InvocationMessageInjector { + static var invocationMessage: InvocationMessageInjector { + InvocationMessageInjector() + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Extractor + +struct InvocationMessageExtractor: Tracing.Extractor { + typealias Carrier = InvocationMessage + + func extract(key: String, from carrier: Carrier) -> String? { + carrier.metadata[key] + } +} + +extension Tracing.Extractor where Self == InvocationMessageExtractor { + static var invocationMessage: InvocationMessageExtractor { + InvocationMessageExtractor() + } +} diff --git a/Tests/DistributedClusterTests/DistributedProgressTests.swift b/Tests/DistributedClusterTests/DistributedProgressTests.swift new file mode 100644 index 000000000..f3b03a4e7 --- /dev/null +++ b/Tests/DistributedClusterTests/DistributedProgressTests.swift @@ -0,0 +1,175 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Actors open source project +// +// Copyright (c) 2018-2022 Apple Inc. and the Swift Distributed Actors project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.md for the list of Swift Distributed Actors project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Distributed +import DistributedActorsTestKit +@testable import DistributedCluster +import Foundation +import Logging +import XCTest + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Builder + +distributed actor Builder: CustomStringConvertible { + let probe: ActorTestProbe + lazy var log = Logger(actor: self) + + init(probe: ActorTestProbe, actorSystem: ActorSystem) { + self.actorSystem = actorSystem + self.probe = probe + probe.tell("Romeo init") + } + + public distributed func build() -> DistributedProgress.Box { + let progress = DistributedProgress(actorSystem: actorSystem, steps: BuildSteps.self) + + Task { + try await progress.whenLocal { progress in + try await progress.to(step: .prepare) + try await Task.sleep(until: .now + .milliseconds(100), clock: .continuous) + try await progress.to(step: .compile) + try await Task.sleep(until: .now + .milliseconds(200), clock: .continuous) + try await progress.to(step: .test) + try await Task.sleep(until: .now + .milliseconds(200), clock: .continuous) + try await progress.to(step: .complete) + } + } + + return DistributedProgress.Box(source: progress) + } + + nonisolated var description: String { + "\(Self.self)(\(id))" + } +} + +enum BuildSteps: String, DistributedProgressSteps { + case prepare + case compile + case test + case complete +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Juliet + +distributed actor BuildWatcher: CustomStringConvertible { + lazy var log = Logger(actor: self) + + let probe: ActorTestProbe + let builder: Builder + + init(probe: ActorTestProbe, builder: Builder, actorSystem: ActorSystem) { + self.actorSystem = actorSystem + self.probe = probe + self.builder = builder + } + + distributed func runBuild_waitCompleted() async throws { + let progress = try await self.builder.build() + + try await progress.completed() + self.probe.tell("completed") + } + + distributed func runBuild_streamSteps() async throws { + let progress = try await self.builder.build() + + for try await step in try await progress.steps() { + self.probe.tell("received-step:\(step)") + } + } + + nonisolated var description: String { + "\(Self.self)(\(id))" + } +} + +// ==== ---------------------------------------------------------------------------------------------------------------- +// MARK: Tests + +final class DistributedProgressTests: ClusteredActorSystemsXCTestCase, @unchecked Sendable { + override var captureLogs: Bool { + false + } + + override func configureLogCapture(settings: inout LogCapture.Settings) { + settings.excludeActorPaths = [ + "/system/cluster", + "/system/gossip", + "/system/cluster/gossip", + "/system/receptionist", + "/system/receptionist-ref", + "/system/cluster/swim", + "/system/clusterEvents", + ] + } + + func test_progress_happyPath() async throws { + let system = await self.setUpNode("single") + + let pb = self.testKit(system).makeTestProbe(expecting: String.self) + let pw = self.testKit(system).makeTestProbe(expecting: String.self) + let p = self.testKit(system).makeTestProbe(expecting: String.self) + + let builder = Builder(probe: pb, actorSystem: system) + let watcher = BuildWatcher(probe: pw, builder: builder, actorSystem: system) + + Task { + try await watcher.runBuild_waitCompleted() + p.tell("done") + } + + try pw.expectMessage("completed") + try p.expectMessage("done") + } + + func test_progress_stream_local() async throws { + let system = await self.setUpNode("single") + try await self.impl_progress_stream_cluster(first: system, second: system) + } + + func test_progress_stream_cluster() async throws { + let (first, second) = await self.setUpPair() + try await joinNodes(node: first, with: second) + + try await self.impl_progress_stream_cluster(first: first, second: second) + } + + func impl_progress_stream_cluster(first: ClusterSystem, second: ClusterSystem) async throws { + let pb = self.testKit(first).makeTestProbe(expecting: String.self) + let pw = self.testKit(second).makeTestProbe(expecting: String.self) + let p = self.testKit(first).makeTestProbe(expecting: String.self) + + let builder = Builder(probe: pb, actorSystem: first) + let watcher = BuildWatcher(probe: pw, builder: builder, actorSystem: second) + + Task { + try await watcher.runBuild_streamSteps() + p.tell("done") + } + + let messages = try pw.fishForMessages(within: .seconds(5)) { message in + if message == "received-step:\(BuildSteps.last)" { + return .catchComplete + } else { + return .catchContinue + } + } + pinfo("Received \(messages.count) progress updates: \(messages)") + messages.shouldEqual(BuildSteps.allCases.suffix(messages.count).map { "received-step:\($0)" }) + try p.expectMessage("done") + } +}