Streaming real time audio (text-to-speech)
OpenAI API has support for streaming realtime audio by chunks (just like chat token-by-token streaming)
Are there any plans to add support for this feature?
I've opened PR with implementing this feature: https://github.com/MacPaw/OpenAI/pull/189
Hey @batanus
I have tested your PR, and it works. I was able to stream data. Thanks!
On a separate note, how are you able to play with buffer the data as it comes in? any tips?
Hey @morad @DanielhCarranza! So, the moment I opened PR, I began implementing this functionality (Thanks, @mihai8804858, for helping me understand the mechanics of AVPlayer's Custom ResorceLoader). In order to play an audio stream, you need to use AVPlayer with a custom resource loader, which, as new audio data arrives, would provide this data to AVPlayer. The problem with OpenAI is that since the audio file is generated in real-time, in URLResponse, the expectedContentLength field returns -1value, while for AVPlayer, it's necessary to know the total length (contentLength) of the played file. I tried to hardcode contentLength to a certain value (for example, 10_000 or 100_000), but due to the fact that it doesn't match the actual size of the audio file, it doesn't play to the end. So if you have any ideas on how to get around this problem, I'd be glad to hear your ideas.
Here's the custom resource loader implementation:
import Foundation
import AVFoundation
final class ChunkedPlayerItem: AVPlayerItem {
private let resourceLoaderDelegate: ChunkedResourceLoaderDelegate
private let url: URL
init(stream: AsyncThrowingStream<(Data, URLResponse), Error>, fileExtension: String) {
self.url = URL(string: "audio-streamer://whatever/file.\(fileExtension)")!
self.resourceLoaderDelegate = ChunkedResourceLoaderDelegate(stream: stream)
let asset = AVURLAsset(url: url)
asset.resourceLoader.setDelegate(resourceLoaderDelegate, queue: .main)
super.init(asset: asset, automaticallyLoadedAssetKeys: nil)
}
deinit {
resourceLoaderDelegate.cancel()
}
}
private final class ChunkedResourceLoaderDelegate: NSObject, AVAssetResourceLoaderDelegate {
private let stream: AsyncThrowingStream<(Data, URLResponse), Error>
private var streamTask: Task<Void, Never>?
private var pendingRequests = Set<AVAssetResourceLoadingRequest>()
private var receivedData = Data()
private var receivedResponse: URLResponse?
private var receivedError: Error?
init(stream: AsyncThrowingStream<(Data, URLResponse), Error>) {
self.stream = stream
}
func resourceLoader(
_ resourceLoader: AVAssetResourceLoader,
shouldWaitForLoadingOfRequestedResource loadingRequest: AVAssetResourceLoadingRequest
) -> Bool {
startStreamIfNeeded()
pendingRequests.insert(loadingRequest)
processPendingRequests()
return true
}
func resourceLoader(
_ resourceLoader: AVAssetResourceLoader,
didCancel loadingRequest: AVAssetResourceLoadingRequest
) {
pendingRequests.remove(loadingRequest)
}
func cancel() {
streamTask?.cancel()
}
private func startStreamIfNeeded() {
guard streamTask == nil else { return }
streamTask = Task {
do {
for try await (data, response) in stream {
receivedResponse = response
receivedData.append(data)
processPendingRequests()
}
} catch {
receivedError = error
processPendingRequests()
}
}
}
private func processPendingRequests() {
for request in pendingRequests {
fillContentInformationRequest(request.contentInformationRequest)
if let receivedError {
request.finishLoading(with: receivedError)
pendingRequests.remove(request)
} else if tryFulfillDataRequest(request.dataRequest) {
request.finishLoading()
pendingRequests.remove(request)
}
}
}
private func fillContentInformationRequest(_ contentInformationRequest: AVAssetResourceLoadingContentInformationRequest?) {
guard let contentInformationRequest, let receivedResponse else { return }
contentInformationRequest.contentType = receivedResponse.mimeType
// Problem here: receivedResponse.expectedContentLength = -1
contentInformationRequest.contentLength = receivedResponse.expectedContentLength
contentInformationRequest.isByteRangeAccessSupported = true
}
private func tryFulfillDataRequest(_ dataRequest: AVAssetResourceLoadingDataRequest?) -> Bool {
guard let dataRequest else { return false }
let requestedOffset = Int(dataRequest.requestedOffset)
let requestedLength = dataRequest.requestedLength
let currentOffset = Int(dataRequest.currentOffset)
guard receivedData.count > currentOffset else { return false }
let bytesToRespond = min(receivedData.count - currentOffset, requestedLength)
let dataToRespond = receivedData.subdata(in: currentOffset..<(currentOffset + bytesToRespond))
dataRequest.respond(with: dataToRespond)
return receivedData.count >= requestedLength + requestedOffset
}
deinit {
cancel()
}
}
Hey @morad @DanielhCarranza! So, the moment I opened PR, I began implementing this functionality. In order to play an audio stream, you need to use AVPlayer with a custom resource loader, which, as new audio data arrives, would provide this data to AVPlayer. The problem with OpenAI is that since the audio file is generated in real-time, in URLResponse, the
expectedContentLengthfield returns-1value, while forAVPlayer, it's necessary to know the total length (contentLength) of the played file. I tried to hardcode contentLength to a certain value (for example, 10_000 or 100_000), but due to the fact that it doesn't match the actual size of the audio file, it doesn't play to the end. So if you have any ideas on how to get around this problem, I'd be glad to hear your ideas.Here's the custom resource loader implementation:
import Foundation import AVFoundation final class ChunkedPlayerItem: AVPlayerItem { private let resourceLoaderDelegate: ChunkedResourceLoaderDelegate private let url: URL init(stream: AsyncThrowingStream<(Data, URLResponse), Error>, fileExtension: String) { self.url = URL(string: "audio-streamer://whatever/file.\(fileExtension)")! self.resourceLoaderDelegate = ChunkedResourceLoaderDelegate(stream: stream) let asset = AVURLAsset(url: url) asset.resourceLoader.setDelegate(resourceLoaderDelegate, queue: .main) super.init(asset: asset, automaticallyLoadedAssetKeys: nil) } deinit { resourceLoaderDelegate.cancel() } } private final class ChunkedResourceLoaderDelegate: NSObject, AVAssetResourceLoaderDelegate { private let stream: AsyncThrowingStream<(Data, URLResponse), Error> private var streamTask: Task<Void, Never>? private var pendingRequests = Set<AVAssetResourceLoadingRequest>() private var receivedData = Data() private var receivedResponse: URLResponse? private var receivedError: Error? init(stream: AsyncThrowingStream<(Data, URLResponse), Error>) { self.stream = stream } func resourceLoader( _ resourceLoader: AVAssetResourceLoader, shouldWaitForLoadingOfRequestedResource loadingRequest: AVAssetResourceLoadingRequest ) -> Bool { startStreamIfNeeded() pendingRequests.insert(loadingRequest) processPendingRequests() return true } func resourceLoader( _ resourceLoader: AVAssetResourceLoader, didCancel loadingRequest: AVAssetResourceLoadingRequest ) { pendingRequests.remove(loadingRequest) } func cancel() { streamTask?.cancel() } private func startStreamIfNeeded() { guard streamTask == nil else { return } streamTask = Task { do { for try await (data, response) in stream { receivedResponse = response receivedData.append(data) processPendingRequests() } } catch { receivedError = error processPendingRequests() } } } private func processPendingRequests() { for request in pendingRequests { fillContentInformationRequest(request.contentInformationRequest) if let receivedError { request.finishLoading(with: receivedError) pendingRequests.remove(request) } else if tryFulfillDataRequest(request.dataRequest) { request.finishLoading() pendingRequests.remove(request) } } } private func fillContentInformationRequest(_ contentInformationRequest: AVAssetResourceLoadingContentInformationRequest?) { guard let contentInformationRequest, let receivedResponse else { return } contentInformationRequest.contentType = receivedResponse.mimeType // Problem here: receivedResponse.expectedContentLength = -1 contentInformationRequest.contentLength = receivedResponse.expectedContentLength contentInformationRequest.isByteRangeAccessSupported = true } private func tryFulfillDataRequest(_ dataRequest: AVAssetResourceLoadingDataRequest?) -> Bool { guard let dataRequest else { return false } let requestedOffset = Int(dataRequest.requestedOffset) let requestedLength = dataRequest.requestedLength let currentOffset = Int(dataRequest.currentOffset) guard receivedData.count > currentOffset else { return false } let bytesToRespond = min(receivedData.count - currentOffset, requestedLength) let dataToRespond = receivedData.subdata(in: currentOffset..<(currentOffset + bytesToRespond)) dataRequest.respond(with: dataToRespond) return receivedData.count >= requestedLength + requestedOffset } deinit { cancel() } }
Since text-to-speech API uses chunked transfer encoding, the Content-Length is omitted in this case so AVPlayer doesn't know how much data to expect from resource loader so the playback breaks.
I was able to implement real time audio playback of audio chunks as they arrive using following approach:
- Use and
AudioFileStreamIDto parseAudioStreamBasicDescriptionand split chunks into audio packets when new chunks arrive-
AudioFileStreamOpen(_:_:_:_:_:)to open a new audio file stream -
AudioFileStreamParseBytes(_:_:_:_:)to parse ASBD and packets -
AudioFileStreamGetProperty(_:_:_:_:)to get theAudioStreamBasicDescriptionfrom audio file stream inAudioFileStream_PropertyListenerProc - In
AudioFileStream_PacketsProc, converts the audio packets intoCMSampleBuffer - Play the sample buffers using
AVSampleBufferAudioRendererandAVSampleBufferRenderSynchronizer
-
Great! @mihai8804858 do you mind if you can share a sample code of yours on this? Thanks
Great! @mihai8804858 do you mind if you can share a sample code of yours on this? Thanks
Sure, here's the code I came up with:
import Foundation
import AVFoundation
import AudioToolbox
import os
extension CMSampleBuffer: @unchecked Sendable {}
final class AudioPlayer {
private let queue = DispatchQueue(label: "audio.player.queue")
private var task: Task<Void, Never>?
private var buffers = OSAllocatedUnfairLock(initialState: [CMSampleBuffer]())
private var audioStreamID: AudioFileStreamID?
private var audioRenderer: AVSampleBufferAudioRenderer?
private var audioDescription: AudioStreamBasicDescription?
private var audioSynchronizer: AVSampleBufferRenderSynchronizer?
private var audioRendererStatusObservation: NSKeyValueObservation?
private var nextBufferTimeOffset = CMTime.zero
private var dataReceiveComplete = false
deinit {
stop()
}
func start(_ stream: AsyncThrowingStream<Data, Error>, type: AudioFileTypeID? = nil) {
stop()
openFileStream(type: type)
startReceivingData(from: stream)
}
func stop() {
cancelDataTask()
closeFileStream()
removeSampleBuffers()
audioSynchronizer = nil
audioDescription = nil
audioRenderer = nil
nextBufferTimeOffset = .zero
}
// MARK: - Audio File Stream
private func openFileStream(type: AudioFileTypeID?) {
closeFileStream()
let playerInstance = UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())
let status = AudioFileStreamOpen(
playerInstance,
{ playerInstance, _, propertyID, _ in
let player = Unmanaged<AudioPlayer>.fromOpaque(playerInstance).takeUnretainedValue()
player.onFileStreamPropertyReceived(propertyID: propertyID)
},
{ playerInstance, numberBytes, numberPackets, bytes, packets in
let player = Unmanaged<AudioPlayer>.fromOpaque(playerInstance).takeUnretainedValue()
player.onFileStreamPacketsReceived(
numberOfBytes: numberBytes,
bytes: bytes,
numberOfPackets: numberPackets,
packets: packets
)
},
type ?? 0,
&audioStreamID
)
}
private func closeFileStream() {
audioStreamID.flatMap { _ = AudioFileStreamClose($0) }
audioStreamID = nil
}
private func onFileStreamPropertyReceived(propertyID: AudioFilePropertyID) {
guard let audioStreamID = audioStreamID, propertyID == kAudioFileStreamProperty_DataFormat else { return }
var asbdSize: UInt32 = 0
var asbd = AudioStreamBasicDescription()
let getInfoStatus = AudioFileStreamGetPropertyInfo(audioStreamID, propertyID, &asbdSize, nil)
guard getInfoStatus == noErr else { return }
let getPropertyStatus = AudioFileStreamGetProperty(audioStreamID, propertyID, &asbdSize, &asbd)
guard getPropertyStatus == noErr else { return }
let renderer = AVSampleBufferAudioRenderer()
let synchronizer = AVSampleBufferRenderSynchronizer()
synchronizer.addRenderer(renderer)
nextBufferTimeOffset = CMTime(value: 0, timescale: Int32(asbd.mSampleRate))
audioDescription = asbd
audioRenderer = renderer
audioSynchronizer = synchronizer
startRequestingMediaData(audioRenderer: renderer, audioSynchronizer: synchronizer)
}
private func onFileStreamPacketsReceived(
numberOfBytes: UInt32,
bytes: UnsafeRawPointer,
numberOfPackets: UInt32,
packets: UnsafeMutablePointer<AudioStreamPacketDescription>?
) {
guard let audioDescription else { return }
guard let buffer = makeSampleBuffer(
from: Data(bytes: bytes, count: Int(numberOfBytes)),
asbd: audioDescription,
presentationTimeStamp: nextBufferTimeOffset,
packetCount: numberOfPackets,
packetDescriptions: packets
) else { return }
let bufferStartTime = CMSampleBufferGetOutputPresentationTimeStamp(buffer)
let bufferDuration = CMSampleBufferGetOutputDuration(buffer)
nextBufferTimeOffset = bufferStartTime + bufferDuration
buffers.withLock { $0.append(buffer) }
}
// MARK: - Data Stream
private func startReceivingData(from stream: AsyncThrowingStream<Data, Error>) {
cancelDataTask()
task = Task { [weak self] in
guard let self else { return }
do {
for try await data in stream {
parseData(data)
}
finishDataParsing()
} catch {
return
}
}
}
private func cancelDataTask() {
dataReceiveComplete = false
task?.cancel()
task = nil
}
private func parseData(_ data: Data) {
guard let audioStreamID else { return }
data.withUnsafeBytes { pointer in
guard let baseAddress = pointer.baseAddress else { return }
AudioFileStreamParseBytes(audioStreamID, UInt32(data.count), baseAddress, [])
}
}
private func finishDataParsing() {
dataReceiveComplete = true
guard let audioStreamID else { return }
AudioFileStreamParseBytes(audioStreamID, 0, nil, [])
}
// MARK: - Sample Buffers
private func nextSampleBuffer() -> CMSampleBuffer? {
buffers.withLock { buffers in
if buffers.isEmpty { return nil }
return buffers.removeFirst()
}
}
private func removeSampleBuffers() {
buffers.withLock { $0.removeAll() }
audioRenderer?.flush()
}
private func makeSampleBuffer(
from data: Data,
asbd: AudioStreamBasicDescription,
presentationTimeStamp: CMTime,
packetCount: UInt32,
packetDescriptions: UnsafePointer<AudioStreamPacketDescription>?
) -> CMSampleBuffer? {
guard let blockBuffer = makeBlockBuffer(from: data) else { return nil }
guard let formatDescription = try? CMFormatDescription(audioStreamBasicDescription: asbd) else { return nil }
var sampleBuffer: CMSampleBuffer? = nil
let createStatus = CMAudioSampleBufferCreateReadyWithPacketDescriptions(
allocator: kCFAllocatorDefault,
dataBuffer: blockBuffer,
formatDescription: formatDescription,
sampleCount: CMItemCount(packetCount),
presentationTimeStamp: presentationTimeStamp,
packetDescriptions: packetDescriptions,
sampleBufferOut: &sampleBuffer
)
return sampleBuffer
}
private func makeBlockBuffer(from data: Data) -> CMBlockBuffer? {
var blockBuffer: CMBlockBuffer?
let createStatus = CMBlockBufferCreateWithMemoryBlock(
allocator: kCFAllocatorDefault,
memoryBlock: nil,
blockLength: data.count,
blockAllocator: kCFAllocatorDefault,
customBlockSource: nil,
offsetToData: 0,
dataLength: data.count,
flags: kCMBlockBufferAssureMemoryNowFlag,
blockBufferOut: &blockBuffer
)
guard createStatus == noErr else { return nil }
guard let blockBuffer else { return nil }
return data.withUnsafeBytes { pointer in
guard let baseAddress = pointer.baseAddress else { return nil }
let replaceStatus = CMBlockBufferReplaceDataBytes(
with: baseAddress,
blockBuffer: blockBuffer,
offsetIntoDestination: 0,
dataLength: data.count
)
return blockBuffer
}
}
// MARK: - Media Data
private func startRequestingMediaData(
audioRenderer: AVSampleBufferAudioRenderer,
audioSynchronizer: AVSampleBufferRenderSynchronizer
) {
audioRenderer.requestMediaDataWhenReady(on: queue) { [weak self] in
self?.provideMediaDataIfNeeded(
audioRenderer: audioRenderer,
audioSynchronizer: audioSynchronizer
)
}
}
private func provideMediaDataIfNeeded(
audioRenderer: AVSampleBufferAudioRenderer,
audioSynchronizer: AVSampleBufferRenderSynchronizer
) {
while audioRenderer.isReadyForMoreMediaData {
if let buffer = nextSampleBuffer() {
audioRenderer.enqueue(buffer)
startPlaybackIfCan(
audioRenderer: audioRenderer,
audioSynchronizer: audioSynchronizer
)
} else if dataReceiveComplete {
audioRenderer.stopRequestingMediaData()
break
}
}
}
private func startPlaybackIfCan(
audioRenderer: AVSampleBufferAudioRenderer,
audioSynchronizer: AVSampleBufferRenderSynchronizer
) {
guard audioRenderer.hasSufficientMediaDataForReliablePlaybackStart, audioSynchronizer.rate == 0 else { return }
audioSynchronizer.setRate(1.0, time: .zero)
}
}
Great! @mihai8804858 do you mind if you can share a sample code of yours on this? Thanks
Sure, here's the code I came up with: ...
I have created an open-source SPM package of this implementation.