apple/Docs, iOS, Swift

Swift AsyncStream, AsyncThrowingStream 정리

lgvv 2024. 8. 16. 19:48

Swift AsyncStream, AsyncThrowingStream 정리

 

 

AsyncStream

  • 클로저에서 생성된 비동기 시퀀스로, 새로운 요소를 생성하기 위해 계속 호출
  • AsyncSequence를 준수해 asynchronous iterator(비동기 이터레이터) 수동으로 구현하지 않고도 비동기 시퀀스를 간편하게 생성할 수 있음.
  • 특히,  콜백 또는 딜리게이트 기반 API를 async-await와 함께 사용할 수 있도록 적합하게 설계되었음.
  • AsyncStream을 초기화할 때, AsyncStream.Continuation을 받는 클로저를 전달. 이 클로저 내에서 요소를 생성하고, 해당 요소를 계속해서 `yield(_:)` 메서드를 호출하여 스트림에 제공
    • RxSwift의 Observable에서 emit하는 것과 비슷한 개념
  • 더 이상 생성할 요소가 없을 때는 `finish()` 메서드를 호출하여 스트림을 종료. 이는 시퀀스 반복자가 nil을 반환하여 시퀀스가 종료되도록 만듭니다. `Continuation`은 `Sendable`을 준수하므로, AsyncStream의 반복과는 별개의 동시성 컨텍스트에서도 호출할 수 있음.
  • 임의의 Element는 호출자가 반복하는 속도보다 빠르게 요소를 생성할 수 있음. 이러한 이유로 `AsyncStream`은 버퍼링 동작을 정의하여 스트림이 가장 오래된 또는 최신의 특정 수의 요소를 버퍼링할 수 있도록 함.
  • 기본적으로 버퍼 제한은 Int.max로 설정되어 있으며, 이는 값이 무제한임을 의미

AsyncThrowingStream

  • AsyncStream과 달리, AsyncThrowingStream은 대기 중인 next() 호출에서 오류를 발생시킬 수 있으며, 이 오류는 스트림을 종료시킴.
  • 만약 오류가 발생하면, finish(throwing:) 메서드를 호출하여 대기 중인 호출 지점에 오류를 전달하면서 스트림을 종료.
  • 나머지 내용은 AsyncStream과 모두 동일.

 

AsyncStream 코드 알아보기

    private let messageStream = AsyncStream(Int.self) { continuation in
        continuation.onTermination = { termination in
            switch termination {
            case .finished:
                print("finished")
            case .cancelled:
                print("cancelled")
            }
        }
        
        for count in 1...10 {
             continuation.yield(count)
        }
        continuation.finish()
    }

 

continuation을 통해 이벤트를 전달받음.

 

아래에 사용할 수 있는 옵션 나열

continuation

 

 

  • onTermination: termiation 될 때 클로저로 처리
    • finished: 종료 되었을 때
    • cancelled: 취소 되었을 때
  • finish(): 이벤트 종료 상태 변경
    • 이후 Element를 더이상 방출하지 않음.
  • yield(_ value:): 값을 방출
  • yield(with: ): Result Type으로 방출

 

 

messageStream이 위치하는 영역에 따라 결과가 달라짐.

 

  • 함수 외부에 위치할 경우 한번 방출후에 termiation되지 않아도 방출되지 않음.
class Exampple {
   // ✅ 메시지 스트림을 함수 밖에 위치했을 경우
   let messageStream = AsyncStream() { ... } 

   func execute() { 
      do {
         for try await number in messageStream {
            let time = Date.now.formatted(date: .omitted, time: .standard)
            print("\(time) | \(number)")
         }
      } catch {
         print(error)
      }   
   }
}

// execute() 호출
// 방출
// execute()
// 방출 없음.

 

  • 함수 내부에 위치할 경우에는 지속적으로 스트림을 생성하므로 계속 방출
class Exampple {

   func execute() { 
      // ✅ 메시지 스트림을 함수 내에 위치할 경우
      let messageStream = AsyncStream() { ... } 
      do {
         for try await number in messageStream {
            let time = Date.now.formatted(date: .omitted, time: .standard)
            print("\(time) | \(number)")
         }
      } catch {
         print(error)
      }   
   }
}

// execute() 호출
// 방출
// execute()
// 방출
// execute()
// 방출

 

 

 

continuation.finish() 와 onTermination 살펴보기

 

두 동작은 비슷하면서도 다름.

  • continuation.finish(): continutation을 그 시점에 즉시 종료
    • 스트림 내부에서 명시적으로 종료할 경우
  • continuation.onTermination?(.cancelled): onTermination의 cancelled 호출되고 continutatios가 전부 실행된다.
    • 혹은 stream이 취소 되었을 경우
  • continuation.onTermination?(.finished): onTermination의 finished 호출되고 continutatios가 전부 실행된다.
    • 혹은 stream이 종료 되었을 경우

샘플코드

        let messageStream = AsyncStream(Int.self) { continuation in
            
            continuation.onTermination = { termination in
                switch termination {
                case .finished:
                    print("finished")
                case .cancelled:
                    print("cancelled")
                }
            }
            
            for count in 1...10 {
                if count == 5 {
                    continuation.onTermination?(.cancelled)
                } else {
                    continuation.yield(count)
                }
            }
        }

 

 

continuation.finish()

(결과)

finished
5:43:23 PM | 1
5:43:23 PM | 2
5:43:23 PM | 3
5:43:23 PM | 4

 

continuation.onTermination?(.finished)

finished
5:45:02 PM | 1
5:45:02 PM | 2
5:45:02 PM | 3
5:45:02 PM | 4
5:45:02 PM | 6
5:45:02 PM | 7
5:45:02 PM | 8
5:45:02 PM | 9
5:45:02 PM | 10

 

continuation.onTermination?(.cancelled)

cancelled
5:45:28 PM | 1
5:45:28 PM | 2
5:45:28 PM | 3
5:45:28 PM | 4
5:45:28 PM | 6
5:45:28 PM | 7
5:45:28 PM | 8
5:45:28 PM | 9
5:45:28 PM | 10

 

 

AsyncThrowingStream

  • 에러를 던질 수 있음
    • 에러를 던지기 때문에 do - catch를 통해 처리

 

func doSomething() async {
        /// 스트림
        let messageStream = AsyncThrowingStream<Int, Error> { continuation in
            
            continuation.onTermination = { termination in
                switch termination {
                case .finished:
                    print("finished")
                case .cancelled:
                    print("cancelled")
                }
            }
            
            for count in 1...10 {
                if count == 5 {
//                    continuation.onTermination?(.cancelled)
//                    continuation.onTermination?(.finished)
//                    continuation.finish()
                    continuation.finish(throwing: AsyncStreamError.failed)
                } else {
                    continuation.yield(count)
                }
            }
        }
        
        do {
            for try await number in messageStream {
                let time = Date.now.formatted(date: .omitted, time: .standard)
                print("\(time) | \(number)")
            }
        } catch {
            print(error)
        }
    }

 

 

 

 

 

(참고)

https://developer.apple.com/documentation/swift/asyncstream

 

AsyncStream | Apple Developer Documentation

An asynchronous sequence generated from a closure that calls a continuation to produce new elements.

developer.apple.com

https://developer.apple.com/documentation/swift/asyncthrowingstream

 

AsyncThrowingStream | Apple Developer Documentation

An asynchronous sequence generated from an error-throwing closure that calls a continuation to produce new elements.

developer.apple.com