Skip to content

Tests: Remove .seconds-based timeouts in some Suites #8810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 136 additions & 140 deletions Tests/BasicsTests/ConcurrencyHelpersTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,13 @@ struct ConcurrencyHelpersTest {
lock.withLock {
expected[index] = value
}
cache.memoize(index) {
value
}
cache.memoize(index) {
Int.random(in: Int.min ..< Int.max)
}
cache.memoize(index) { value }
cache.memoize(index) { Int.random(in: Int.min ..< Int.max) }
}
}

try #require(sync.wait(timeout: .now() + .seconds(2)) == .success)
sync.wait()

expected.forEach { key, value in
#expect(cache[key] == value)
}
Expand All @@ -71,199 +68,198 @@ struct ConcurrencyHelpersTest {
cache.append(value)
}
}
sync.wait()

try #require(sync.wait(timeout: .now() + .seconds(2)) == .success)
let expectedSorted = expected.sorted()
let resultsSorted = cache.get().sorted()
let resultsSorted = cache.get().sorted()
#expect(expectedSorted == resultsSorted)
}
}
}

@Test
func threadSafeBox() throws {
let queue = DispatchQueue(label: "ConcurrencyHelpersTest", attributes: .concurrent)
for _ in 0 ..< 100 {
let sync = DispatchGroup()
@Test
func threadSafeBox() throws {
let queue = DispatchQueue(label: "ConcurrencyHelpersTest", attributes: .concurrent)
for _ in 0 ..< 100 {
let sync = DispatchGroup()

var winner: Int?
let lock = NSLock()
var winner: Int?
let lock = NSLock()

let serial = DispatchQueue(label: "testThreadSafeBoxSerial")
let serial = DispatchQueue(label: "testThreadSafeBoxSerial")

let cache = ThreadSafeBox<Int>()
for index in 0 ..< 1000 {
queue.async(group: sync) {
Thread.sleep(forTimeInterval: Double.random(in: 100 ... 300) * 1.0e-6)
serial.async(group: sync) {
lock.withLock {
if winner == nil {
winner = index
let cache = ThreadSafeBox<Int>()
for index in 0 ..< 1000 {
queue.async(group: sync) {
Thread.sleep(forTimeInterval: Double.random(in: 100 ... 300) * 1.0e-6)
serial.async(group: sync) {
lock.withLock {
if winner == nil {
winner = index
}
}
}
cache.memoize {
index
cache.memoize { index }
}
}
}
}

try #require(sync.wait(timeout: .now() + .seconds(2)) == .success)
#expect(cache.get() == winner)
}
}
sync.wait()

@Suite
struct AsyncOperationQueueTests {
fileprivate actor ResultsTracker {
var results = [Int]()
var maxConcurrent = 0
var currentConcurrent = 0

func incrementConcurrent() {
currentConcurrent += 1
maxConcurrent = max(maxConcurrent, currentConcurrent)
#expect(cache.get() == winner)
}
}

func decrementConcurrent() {
currentConcurrent -= 1
}

func appendResult(_ value: Int) {
results.append(value)
}
}
@Suite
struct AsyncOperationQueueTests {
fileprivate actor ResultsTracker {
var results = [Int]()
var maxConcurrent = 0
var currentConcurrent = 0

@Test
func limitsConcurrentOperations() async throws {
let queue = AsyncOperationQueue(concurrentTasks: 5)

let totalTasks = 20
let tracker = ResultsTracker()

try await withThrowingTaskGroup(of: Void.self) { group in
for index in 0..<totalTasks {
group.addTask {
try await queue.withOperation {
await tracker.incrementConcurrent()
try? await Task.sleep(nanoseconds: 5_000_000)
await tracker.decrementConcurrent()
await tracker.appendResult(index)
}
}
func incrementConcurrent() {
currentConcurrent += 1
maxConcurrent = max(maxConcurrent, currentConcurrent)
}
try await group.waitForAll()
}

let maxConcurrent = await tracker.maxConcurrent
let results = await tracker.results

// Check that at no point did we exceed 5 concurrent operations
#expect(maxConcurrent == 5)
#expect(results.count == totalTasks)
}
func decrementConcurrent() {
currentConcurrent -= 1
}

@Test
func passesThroughWhenUnderConcurrencyLimit() async throws {
let queue = AsyncOperationQueue(concurrentTasks: 5)

let totalTasks = 5
let tracker = ResultsTracker()

try await withThrowingTaskGroup(of: Void.self) { group in
for index in 0..<totalTasks {
group.addTask {
try await queue.withOperation {
await tracker.incrementConcurrent()
try? await Task.sleep(nanoseconds: 5_000_000)
await tracker.decrementConcurrent()
await tracker.appendResult(index)
}
}
func appendResult(_ value: Int) {
results.append(value)
}
try await group.waitForAll()
}

let maxConcurrent = await tracker.maxConcurrent
let results = await tracker.results
@Test
func limitsConcurrentOperations() async throws {
let queue = AsyncOperationQueue(concurrentTasks: 5)

// Check that we never exceeded the concurrency limit
#expect(maxConcurrent <= 5)
#expect(results.count == totalTasks)
}
let totalTasks = 20
let tracker = ResultsTracker()

@Test
func handlesImmediateCancellation() async throws {
let queue = AsyncOperationQueue(concurrentTasks: 5)
let totalTasks = 20
let tracker = ResultsTracker()

await #expect(throws: _Concurrency.CancellationError.self) {
try await withThrowingTaskGroup(of: Void.self) { group in
// Cancel the task group immediately
group.cancelAll()

for index in 0..<totalTasks {
group.addTask {
try await queue.withOperation {
if Task.isCancelled {
throw _Concurrency.CancellationError()
}
await tracker.incrementConcurrent()
// sleep for a long time to ensure cancellation can occur.
// If this is too short the cancellation may be triggered after
// all tasks have completed.
try await Task.sleep(nanoseconds: 10_000_000_000)
try? await Task.sleep(nanoseconds: 5_000_000)
await tracker.decrementConcurrent()
await tracker.appendResult(index)
}
}
}
try await group.waitForAll()
}
}

let maxConcurrent = await tracker.maxConcurrent
let results = await tracker.results
let maxConcurrent = await tracker.maxConcurrent
let results = await tracker.results

#expect(maxConcurrent <= 5)
#expect(results.count < totalTasks)
}
// Check that at no point did we exceed 5 concurrent operations
#expect(maxConcurrent == 5)
#expect(results.count == totalTasks)
}

@Test
func handlesCancellationDuringWait() async throws {
let queue = AsyncOperationQueue(concurrentTasks: 5)
let totalTasks = 20
let tracker = ResultsTracker()
@Test
func passesThroughWhenUnderConcurrencyLimit() async throws {
let queue = AsyncOperationQueue(concurrentTasks: 5)

let totalTasks = 5
let tracker = ResultsTracker()

await #expect(throws: _Concurrency.CancellationError.self) {
try await withThrowingTaskGroup(of: Void.self) { group in
for index in 0..<totalTasks {
group.addTask {
try await queue.withOperation {
if Task.isCancelled {
throw _Concurrency.CancellationError()
}
await tracker.incrementConcurrent()
try? await Task.sleep(nanoseconds: 5_000_000)
await tracker.decrementConcurrent()
await tracker.appendResult(index)
}
}
}
try await group.waitForAll()
}

let maxConcurrent = await tracker.maxConcurrent
let results = await tracker.results

// Check that we never exceeded the concurrency limit
#expect(maxConcurrent <= 5)
#expect(results.count == totalTasks)
}

@Test
func handlesImmediateCancellation() async throws {
let queue = AsyncOperationQueue(concurrentTasks: 5)
let totalTasks = 20
let tracker = ResultsTracker()

group.addTask { [group] in
await #expect(throws: _Concurrency.CancellationError.self) {
try await withThrowingTaskGroup(of: Void.self) { group in
// Cancel the task group immediately
group.cancelAll()

for index in 0..<totalTasks {
group.addTask {
try await queue.withOperation {
if Task.isCancelled {
throw _Concurrency.CancellationError()
}
await tracker.incrementConcurrent()
// sleep for a long time to ensure cancellation can occur.
// If this is too short the cancellation may be triggered after
// all tasks have completed.
try await Task.sleep(nanoseconds: 10_000_000_000)
await tracker.decrementConcurrent()
await tracker.appendResult(index)
}
}
}
try await group.waitForAll()
}
try await group.waitForAll()
}

let maxConcurrent = await tracker.maxConcurrent
let results = await tracker.results

#expect(maxConcurrent <= 5)
#expect(results.count < totalTasks)
}

let maxConcurrent = await tracker.maxConcurrent
let results = await tracker.results
@Test
func handlesCancellationDuringWait() async throws {
let queue = AsyncOperationQueue(concurrentTasks: 5)
let totalTasks = 20
let tracker = ResultsTracker()

await #expect(throws: _Concurrency.CancellationError.self) {
try await withThrowingTaskGroup(of: Void.self) { group in
for index in 0..<totalTasks {
group.addTask {
try await queue.withOperation {
if Task.isCancelled {
throw _Concurrency.CancellationError()
}
await tracker.incrementConcurrent()
try? await Task.sleep(nanoseconds: 5_000_000)
await tracker.decrementConcurrent()
await tracker.appendResult(index)
}
}
}

group.addTask { [group] in
group.cancelAll()
}
try await group.waitForAll()
}
}

let maxConcurrent = await tracker.maxConcurrent
let results = await tracker.results

#expect(maxConcurrent <= 5)
#expect(results.count < totalTasks)
#expect(maxConcurrent <= 5)
#expect(results.count < totalTasks)
}
}
}
}
Loading