Gevent greenlet stuck when using stream recognize request in a separate thread.
import logging
import gevent
from google.cloud import speech_v1
import queue
import sentry_sdk
from gevent import spawn
# https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-speech/samples/generated_samples/speech_v1_generated_speech_streaming_recognize_async.py
logger = logging.getLogger('streaming_session')
class StreamingSession:
END_OF_STREAM = object()
def __init__(self, sample_rate=16000):
# Initialize the Google Speech-to-Text client
self.client = speech_v1.SpeechClient()
# Configure the streaming request
self.streaming_config = speech_v1.StreamingRecognitionConfig(
config=speech_v1.RecognitionConfig(
encoding=speech_v1.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=sample_rate,
language_code="en-US",
enable_automatic_punctuation=True,
enable_spoken_punctuation=True,
),
interim_results=False
)
# Initialize the request generator
self.__start_session()
def reset(self):
"""Reset the streaming session."""
self.__start_session()
def __start_session(self):
"""Resets the streaming session to start a new audio stream."""
self.queue = queue.Queue()
self.stream_recognize_task = spawn(self.stream_recognize)
# gevent.sleep(0)
self.transcription = ""
self.closed = False
def request_generator(self):
"""Generator function that yields streaming requests."""
# Then, continuously send audio chunks as they arrive
while not self.closed:
try:
# Use a blocking get() to wait for the first chunk
chunk = self.queue.get()
if chunk is self.END_OF_STREAM:
print("return here: END OF STREAM")
return
data = [chunk]
# Drain the queue of any additional data
while True:
try:
chunk = self.queue.get(block=False)
if chunk is self.END_OF_STREAM:
self.closed = True
print("set closed to true")
break
data.append(chunk)
except queue.Empty:
break
yield speech_v1.StreamingRecognizeRequest(audio_content=b"".join(data))
except Exception as e:
sentry_sdk.capture_exception(e)
break
def add_chunk(self, chunk):
"""Send an audio chunk to the streaming session in a non-blocking manner."""
self.queue.put(chunk, block=False)
def stream_recognize(self):
return self.client.streaming_recognize(config=self.streaming_config,
requests=self.request_generator())
def get_transcription(self):
"""Finalize the session and return the complete transcription."""
self.queue.put(self.END_OF_STREAM)
self.stream_recognize_task.join()
responses = self.stream_recognize_task.get()
# Process responses from the stream
try:
for response in responses:
# Check for the presence of results in the response
if not response.results:
continue
# Iterate over the results in the response
for result in response.results:
# Check if the result is a final result
if result.is_final:
# Get the best transcription from the final result
self.transcription += result.alternatives[0].transcript + ' '
else:
sentry_sdk.capture_message("GCS streaming result is not final")
except Exception as e:
self.client = speech_v1.SpeechClient()
raise e
# Capture sentry alert.
if len(self.transcription) == 0:
sentry_sdk.capture_message("GCS streaming transcription is empty, recreate speech client...")
self.client = speech_v1.SpeechClient()
return self.transcription
This is my code for running each recognize task in a separate gevent greenlet and this causes the greenlet to stuck:
+--- <Greenlet "Greenlet-2" at 0xffff6c59d860: spawn_greenlets>
: Parent: <Hub '' at 0xffff819ffd60 epoll default pending=0 ref=116 fileno=7 resolver=<gevent.resolver.thread.Resolver at 0xffff75a3ad90 pool=<ThreadPool at 0xffff819c86d0 tasks=11 size=10 maxsize=10 hub=<Hub at 0xffff819ffd60 thread_ident=0xffff83b5b020>>> threadpool=<ThreadPool at 0xffff819c86d0 tasks=11 size=10 maxsize=10 hub=<Hub at 0xffff819ffd60 thread_ident=0xffff83b5b020>> thread_ident=0xffff83b5b020>
: Running:
: File "/usr/local/lib/python3.8/site-packages/gevent/pool.py", line 161, in apply
: return self.spawn(func, *args, **kwds).get()
: Spawned at:
: File "/app/app/audiohub/audiohub_client.py", line 48, in add_chunk
: self.__init_states(session_id, sample_rate=sample_rate)
: File "/app/app/audiohub/audiohub_client.py", line 190, in __init_states
: self.streaming_sessions[session_id] = StreamingSession(sample_rate=sample_rate)
: File "/app/app/audiohub/gcp/streaming_session.py", line 15, in __init__
: self.client = speech_v1.SpeechClient()
: File "/usr/local/lib/python3.8/site-packages/google/cloud/speech_v1/services/speech/client.py", line 461, in __init__
: self._transport = Transport(
: File "/usr/local/lib/python3.8/site-packages/google/cloud/speech_v1/services/speech/transports/grpc.py", line 162, in __init__
: self._grpc_channel = type(self).create_channel(
: File "/usr/local/lib/python3.8/site-packages/google/cloud/speech_v1/services/speech/transports/grpc.py", line 217, in create_channel
: return grpc_helpers.create_channel(
: File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
: return grpc.secure_channel(
: File "/usr/local/lib/python3.8/site-packages/grpc/__init__.py", line 2119, in secure_channel
: return _channel.Channel(
: File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 2056, in __init__
: cygrpc.gevent_increment_channel_count()
: File "/usr/local/lib/python3.8/site-packages/gevent/pool.py", line 392, in spawn
: greenlet = self.greenlet_class(*args, **kwargs)
I've also included patch for my gevent application:
from gevent import monkey
monkey.patch_all()
import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
So after the above greenlet stuck, my whole application will basically hang.
Note that if I do the stream recognition synchronously without using gevent greenlet, it works fine. However I still prefer to use it in separate thread to improve on latency. I wonder if this is also another grpc incompatibility issue with gevent.
So upon load testing, I found that if I have 30 concurrent streaming recognize sessions, it would create problem where all threads stuck. I have tried to use both gevent greenlet and native thread pool, nothing helps with this limit.
On the other hand, concurrency under 25 seems to work fine. I've also seen this post arguing about grpc max concurrency settings. I assume we are using grpc underneath this speech recognition package. Is there a way we can increase concurrency on speech client?
Could someone help take a look? Thank you so much!
Hi @dongzeli95 ,
Thanks for reporting this issue. This is potentially related to https://github.com/grpc/grpc/issues/36265, https://github.com/googleapis/python-bigtable/issues/949 and https://github.com/googleapis/google-cloud-python/issues/12423. To confirm, can you try downgrading to grpcio==1.58.0?
@parthea Just confirmed that changing dependency onto grpcio==1.58.0 didn't help with this issue. Hope that helps.