apple/Docs, iOS, Swift
Swift AsyncStream, AsyncThrowingStream 정리
lgvv
2024. 8. 16. 19:48
Swift AsyncStream, AsyncThrowingStream 정리
AsyncStream
- 클로저에서 생성된 비동기 시퀀스로, 새로운 요소를 생성하기 위해 계속 호출
- AsyncSequence를 준수해 asynchronous iterator(비동기 이터레이터) 수동으로 구현하지 않고도 비동기 시퀀스를 간편하게 생성할 수 있음.
- 특히, 콜백 또는 딜리게이트 기반 API를 async-await와 함께 사용할 수 있도록 적합하게 설계되었음.
- AsyncStream을 초기화할 때, AsyncStream.Continuation을 받는 클로저를 전달. 이 클로저 내에서 요소를 생성하고, 해당 요소를 계속해서 `yield(_:)` 메서드를 호출하여 스트림에 제공
- RxSwift의 Observable에서 emit하는 것과 비슷한 개념
- 더 이상 생성할 요소가 없을 때는 `finish()` 메서드를 호출하여 스트림을 종료. 이는 시퀀스 반복자가 nil을 반환하여 시퀀스가 종료되도록 만듭니다. `Continuation`은 `Sendable`을 준수하므로, AsyncStream의 반복과는 별개의 동시성 컨텍스트에서도 호출할 수 있음.
- 임의의 Element는 호출자가 반복하는 속도보다 빠르게 요소를 생성할 수 있음. 이러한 이유로 `AsyncStream`은 버퍼링 동작을 정의하여 스트림이 가장 오래된 또는 최신의 특정 수의 요소를 버퍼링할 수 있도록 함.
- 기본적으로 버퍼 제한은 Int.max로 설정되어 있으며, 이는 값이 무제한임을 의미
AsyncThrowingStream
- AsyncStream과 달리, AsyncThrowingStream은 대기 중인 next() 호출에서 오류를 발생시킬 수 있으며, 이 오류는 스트림을 종료시킴.
- 만약 오류가 발생하면, finish(throwing:) 메서드를 호출하여 대기 중인 호출 지점에 오류를 전달하면서 스트림을 종료.
- 나머지 내용은 AsyncStream과 모두 동일.
AsyncStream 코드 알아보기
private let messageStream = AsyncStream(Int.self) { continuation in
continuation.onTermination = { termination in
switch termination {
case .finished:
print("finished")
case .cancelled:
print("cancelled")
}
}
for count in 1...10 {
continuation.yield(count)
}
continuation.finish()
}
continuation을 통해 이벤트를 전달받음.
아래에 사용할 수 있는 옵션 나열
- onTermination: termiation 될 때 클로저로 처리
- finished: 종료 되었을 때
- cancelled: 취소 되었을 때
- finish(): 이벤트 종료 상태 변경
- 이후 Element를 더이상 방출하지 않음.
- yield(_ value:): 값을 방출
- yield(with: ): Result Type으로 방출
messageStream이 위치하는 영역에 따라 결과가 달라짐.
- 함수 외부에 위치할 경우 한번 방출후에 termiation되지 않아도 방출되지 않음.
class Exampple {
// ✅ 메시지 스트림을 함수 밖에 위치했을 경우
let messageStream = AsyncStream() { ... }
func execute() {
do {
for try await number in messageStream {
let time = Date.now.formatted(date: .omitted, time: .standard)
print("\(time) | \(number)")
}
} catch {
print(error)
}
}
}
// execute() 호출
// 방출
// execute()
// 방출 없음.
- 함수 내부에 위치할 경우에는 지속적으로 스트림을 생성하므로 계속 방출
class Exampple {
func execute() {
// ✅ 메시지 스트림을 함수 내에 위치할 경우
let messageStream = AsyncStream() { ... }
do {
for try await number in messageStream {
let time = Date.now.formatted(date: .omitted, time: .standard)
print("\(time) | \(number)")
}
} catch {
print(error)
}
}
}
// execute() 호출
// 방출
// execute()
// 방출
// execute()
// 방출
continuation.finish() 와 onTermination 살펴보기
두 동작은 비슷하면서도 다름.
- continuation.finish(): continutation을 그 시점에 즉시 종료
- 스트림 내부에서 명시적으로 종료할 경우
- continuation.onTermination?(.cancelled): onTermination의 cancelled 호출되고 continutatios가 전부 실행된다.
- 혹은 stream이 취소 되었을 경우
- continuation.onTermination?(.finished): onTermination의 finished 호출되고 continutatios가 전부 실행된다.
- 혹은 stream이 종료 되었을 경우
샘플코드
let messageStream = AsyncStream(Int.self) { continuation in
continuation.onTermination = { termination in
switch termination {
case .finished:
print("finished")
case .cancelled:
print("cancelled")
}
}
for count in 1...10 {
if count == 5 {
continuation.onTermination?(.cancelled)
} else {
continuation.yield(count)
}
}
}
continuation.finish()
(결과)
finished
5:43:23 PM | 1
5:43:23 PM | 2
5:43:23 PM | 3
5:43:23 PM | 4
continuation.onTermination?(.finished)
finished
5:45:02 PM | 1
5:45:02 PM | 2
5:45:02 PM | 3
5:45:02 PM | 4
5:45:02 PM | 6
5:45:02 PM | 7
5:45:02 PM | 8
5:45:02 PM | 9
5:45:02 PM | 10
continuation.onTermination?(.cancelled)
cancelled
5:45:28 PM | 1
5:45:28 PM | 2
5:45:28 PM | 3
5:45:28 PM | 4
5:45:28 PM | 6
5:45:28 PM | 7
5:45:28 PM | 8
5:45:28 PM | 9
5:45:28 PM | 10
AsyncThrowingStream
- 에러를 던질 수 있음
- 에러를 던지기 때문에 do - catch를 통해 처리
func doSomething() async {
/// 스트림
let messageStream = AsyncThrowingStream<Int, Error> { continuation in
continuation.onTermination = { termination in
switch termination {
case .finished:
print("finished")
case .cancelled:
print("cancelled")
}
}
for count in 1...10 {
if count == 5 {
// continuation.onTermination?(.cancelled)
// continuation.onTermination?(.finished)
// continuation.finish()
continuation.finish(throwing: AsyncStreamError.failed)
} else {
continuation.yield(count)
}
}
}
do {
for try await number in messageStream {
let time = Date.now.formatted(date: .omitted, time: .standard)
print("\(time) | \(number)")
}
} catch {
print(error)
}
}
(참고)
https://developer.apple.com/documentation/swift/asyncstream
https://developer.apple.com/documentation/swift/asyncthrowingstream