sentry-python icon indicating copy to clipboard operation
sentry-python copied to clipboard

apache_beam + beam integration not sending exceptions in GCP

Open gshoultz42 opened this issue 9 months ago • 21 comments

How do you use Sentry?

Sentry Saas (sentry.io)

Version

2.23.1

Steps to Reproduce

  1. Add sentry to python dataflow job
  2. Add try catch to process on a DoFn
  3. inside of catch, capture_Exception
  4. set dataflow job as stream in GCP
  5. Deploy dataflow job to cloud runner
  6. send event that will trigger exception

Expected Result

Error exception is sent to Sentry

Actual Result

Sentry does not get an exception.

gshoultz42 avatar Mar 26 '25 21:03 gshoultz42

Hey @gshoultz42 ! Can you show me your sentry_sdk.init() and also the call to capture_exception() including the try/except?

What you also can do:

  • Enable debug output (sentry_sdk.init(debug=True)) then you will see console output of what the Sentry SDK is doing. (Look for log messages that contain Sending envelope)
  • Make sure that wherever the Sentry SDK is running, it can make a HTTP connection to ingest.sentry.io.

Hope this helps

antonpirker avatar Mar 27 '25 09:03 antonpirker

I just fed your question into our bot in our discord server, and it has these suggestions: https://discord.com/channels/621778831602221064/1354782809020960779/1354782809020960779 (its not super helpful I have to admit, but it is a nice experiment)

antonpirker avatar Mar 27 '25 11:03 antonpirker

Sentry Init

sentry_sdk.init(
        dsn=sentry_dsn,
        environment=env,
        integrations=[BeamIntegration()],
    )

Exception call

class ParseEvent(beam.DoFn):
    def process(self, element: bytes, *args, **kwargs):
        try:
            event_json = json.loads(element)
            event_name = event_json["message"]["payload"]["eventName"]
            model = get_model(event_name)
            event = model.model_validate_json(element)
            yield event
        except (ValidationError, KeyError, JSONDecodeError, TypeError) as error:
            logging.exception("Error parsing event: %r", element)
            error_output = {
                "error": repr(error),
                "payload": (
                    event_json
                    if not isinstance(error, JSONDecodeError)
                    else element.decode()
                ),
            }
            yield TaggedOutput("parse_errors", json.dumps(error_output).encode())

gshoultz42 avatar Mar 27 '25 12:03 gshoultz42

Thanks for the follow up!

The line

logging.exception("Error parsing event: %r", element)

should send an error if the logging integration is enabled (which the sentry SDK enables by default.)

If you want to have your error to show up in sentry you can add a sentry_sdk.capture_exception(error) in your except block.

Because you handle the exception yourself, it never bubbles up for Sentry to capture it.

You could also add a simple raise after your last yield so the Exception is raised again and eventually captured by Sentry.

antonpirker avatar Mar 27 '25 15:03 antonpirker

Wanted to provide an update. I am still looking into this. I added debug to sentry but did not get the log. Working with our admin to make sure the logging is setup correctly in our GCP

gshoultz42 avatar Apr 08 '25 15:04 gshoultz42

I see this on the main job but I don't see any logs from the worker logs where the exception is happening `[sentry] DEBUG: Setting up integrations (with default = True) [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.aiohttp.AioHttpIntegration: AIOHTTP not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.anthropic.AnthropicIntegration: Anthropic not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.ariadne.AriadneIntegration: ariadne is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.arq.ArqIntegration: Arq is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.asyncpg.AsyncPGIntegration: asyncpg not installed. [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.boto3.Boto3Integration: botocore is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.bottle.BottleIntegration: Bottle not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.celery.CeleryIntegration: Celery not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.chalice.ChaliceIntegration: Chalice is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.clickhouse_driver.ClickhouseDriverIntegration: clickhouse-driver not installed. [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.cohere.CohereIntegration: Cohere not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.django.DjangoIntegration: Django not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.falcon.FalconIntegration: Falcon not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.fastapi.FastApiIntegration: Starlette is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.flask.FlaskIntegration: Flask is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.gql.GQLIntegration: gql is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.graphene.GrapheneIntegration: graphene is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.httpx.HttpxIntegration: httpx is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.huey.HueyIntegration: Huey is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.huggingface_hub.HuggingfaceHubIntegration: Huggingface not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.langchain.LangchainIntegration: langchain not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.litestar.LitestarIntegration: Litestar is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.loguru.LoguruIntegration: LOGURU is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.openai.OpenAIIntegration: OpenAI not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.pyramid.PyramidIntegration: Pyramid not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.quart.QuartIntegration: Quart is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.rq.RqIntegration: RQ not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.sanic.SanicIntegration: Sanic not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.sqlalchemy.SqlalchemyIntegration: SQLAlchemy not installed. [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.starlette.StarletteIntegration: Starlette is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.starlite.StarliteIntegration: Starlite is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.strawberry.StrawberryIntegration: strawberry-graphql is not installed [sentry] DEBUG: Did not import default integration sentry_sdk.integrations.tornado.TornadoIntegration: Tornado not installed [sentry] DEBUG: Setting up previously not enabled integration beam [sentry] DEBUG: Setting up previously not enabled integration argv [sentry] DEBUG: Setting up previously not enabled integration atexit [sentry] DEBUG: Setting up previously not enabled integration dedupe [sentry] DEBUG: Setting up previously not enabled integration excepthook [sentry] DEBUG: Setting up previously not enabled integration logging [sentry] DEBUG: Setting up previously not enabled integration modules [sentry] DEBUG: Setting up previously not enabled integration stdlib [sentry] DEBUG: Setting up previously not enabled integration threading [sentry] DEBUG: Setting up previously not enabled integration pymongo [sentry] DEBUG: Setting up previously not enabled integration redis [sentry] DEBUG: Enabling integration beam [sentry] DEBUG: Enabling integration argv [sentry] DEBUG: Enabling integration atexit [sentry] DEBUG: Enabling integration dedupe [sentry] DEBUG: Enabling integration excepthook [sentry] DEBUG: Enabling integration logging [sentry] DEBUG: Enabling integration modules [sentry] DEBUG: Enabling integration stdlib [sentry] DEBUG: Enabling integration threading [sentry] DEBUG: Enabling integration pymongo [sentry] DEBUG: Enabling integration redis [sentry] DEBUG: Setting SDK name to 'sentry.python.beam' [sentry] DEBUG: [Profiling] Setting up continuous profiler in thread mode

[sentry] DEBUG: Sending envelope [envelope with 1 items (error)] project:xxx host:o58632.ingest.us.sentry.io DEBUG:sentry_sdk.errors:Sending envelope [envelope with 1 items (error)] project:xxx host:o58632.ingest.us.sentry.io DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): o58632.ingest.us.sentry.io:443 assert duration or terminated, ( AssertionError: Job did not reach to a terminal state after waiting indefinitely. Console URL: https://console.cloud.google.com/dataflow/jobs/<RegionId>/2025-04-23_13_00_24-3193749921814308417?project=<ProjectId> [sentry] DEBUG: atexit: got shutdown signal DEBUG:sentry_sdk.errors:atexit: got shutdown signal [sentry] DEBUG: atexit: shutting down client DEBUG:sentry_sdk.errors:atexit: shutting down client [sentry] DEBUG: Flushing HTTP transport DEBUG:sentry_sdk.errors:Flushing HTTP transport [sentry] DEBUG: background worker got flush request DEBUG:sentry_sdk.errors:background worker got flush request [sentry] DEBUG: 2 event(s) pending on flush DEBUG:sentry_sdk.errors:2 event(s) pending on flush Sentry is attempting to send 2 pending events Waiting up to 2 seconds Press Ctrl-C to quit DEBUG:urllib3.connectionpool:https://xx.ingest.us.sentry.io:443 "POST /api/xx/envelope/ HTTP/1.1" 200 0 [sentry] DEBUG: background worker flushed DEBUG:sentry_sdk.errors:background worker flushed [sentry] DEBUG: Killing HTTP transport DEBUG:sentry_sdk.errors:Killing HTTP transport [sentry] DEBUG: background worker got kill request DEBUG:sentry_sdk.errors:background worker got kill request `

gshoultz42 avatar Apr 23 '25 20:04 gshoultz42

Hey @gshoultz42 thanks for the logs.

From the log you posted I see: The SDK sends one error to Sentry. (envelope is the generic format we created to send all kinds of data types to Sentry)

[sentry] DEBUG: Sending envelope [envelope with 1 items (error)] project:xxx host:o58632.ingest.us.sentry.io

There is also a debug from urllib3 that the post request succeeded:

DEBUG:urllib3.connectionpool:[https://xx.ingest.us.sentry.io:443](https://xx.ingest.us.sentry.io/) "POST /api/xx/envelope/ HTTP/1.1" 200 0

So everything looks normal and the error should end up in Sentry. As far as I can tell, the SDK is not the problem.

You can have a look at https://sentry.io/stats/ to see if received errors have been dropped. (Errors can be dropped by "inbound data filters" (that you can configure) or if you are over quota, or by rate limitting if you send tons of errors at the same time)

antonpirker avatar Apr 24 '25 09:04 antonpirker

That error did go up but it wasn't the exception that was being fired to be captured. I can see the log entry for the expected exception but I don't see that exception in sentry. This is what I have noticed.

  • The main job exceptions can be caught.
  • Any exceptions coming from a worker is not caught.
  • Sentry logs are not making it to the logs in gcp. I can only view the logs from my console as I deploy.

Is there a way to force sentry to use a different logger?

gshoultz42 avatar Apr 24 '25 11:04 gshoultz42

The line logging.exception("Error parsing event: %r", element) is sending one error to Sentry. But the error you are capturing in your except (ValidationError, KeyError, JSONDecodeError, TypeError) as error: block is handled by your code, so it is NOT sent to sentry.

You can do a sentry_sdk.capture_exception(error) in your except block to send it to Sentry.

In general Sentry is only capturing unhandled errors from your application. If you catch them and handle them, they will not be sent to Sentry.

antonpirker avatar Apr 24 '25 15:04 antonpirker

Current code block that is not working

except (ValidationError, KeyError, JSONDecodeError, TypeError) as error:
            with sentry_sdk.push_scope() as scope:
                # Add any additional context
                scope.set_extra("exception", str(error))
                sentry_sdk.capture_exception(error)
                exc_info = exc_info_from_error(error)
                exceptions = exceptions_from_error_tuple(exc_info)
                sentry_sdk.capture_event(
                    {
                        "message": str(error),
                        "level": "error",
                        "exception": {"values": exceptions},
                    }
                )
            logging.exception("Error parsing event: %r", element)
            error_output = {
                "error": repr(error),
                "payload": (
                    event_json
                    if not isinstance(error, JSONDecodeError)
                    else element.decode()
                ),
            }

I should have posted this sooner. I made some changes mentioned in the discord channel as well.

gshoultz42 avatar Apr 25 '25 15:04 gshoultz42

I did the following actions as well.

  • I updated to a newer version as I saw some changes that may have been related to this issue. 2.27.0
  • I created a custom transport, one was an httpTransport and another was just a Transport. Both would send log entries and print out the information into std output. I did not get any logs
  • I confirmed that the workers did have the correct sentry_sdk and at one point, I removed the sdk from the requirements file which resulted in an expected error.

The sentry apache_beam demo is not a streaming dataflow job and I am curious if it is something related to that or if it is related to the GCP included parts of the apache_beam sdk apache_beam[gcp]

gshoultz42 avatar Apr 25 '25 15:04 gshoultz42

Hi @gshoultz42, to send the exception to Sentry, all you should need is the sentry_sdk.capture_exception line. Please try modifying your snippet like so:

     except (ValidationError, KeyError, JSONDecodeError, TypeError) as error:
+           sentry_sdk.capture_exception(error)
-           with sentry_sdk.push_scope() as scope:
-               # Add any additional context
-               scope.set_extra("exception", str(error))
-               sentry_sdk.capture_exception(error)
-               exc_info = exc_info_from_error(error)
-               exceptions = exceptions_from_error_tuple(exc_info)
-               sentry_sdk.capture_event(
-                   {
-                       "message": str(error),
-                       "level": "error",
-                       "exception": {"values": exceptions},
-                   }
-               )
            logging.exception("Error parsing event: %r", element)
            error_output = {
                "error": repr(error),
                "payload": (
                    event_json
                    if not isinstance(error, JSONDecodeError)
                    else element.decode()
                ),
            }

Note that the default issue view in Sentry only shows issues with medium or high issue priority. Manually captured exceptions typically have low priority, and would be filtered out by the default filter.

You can see the filter in the "Issues" page search bar:

Image

To see the manually-captured exception, you need to remove the issue priority filter.

szokeasaurusrex avatar Apr 28 '25 06:04 szokeasaurusrex

That was what I had in the beginning but I updated to the code just in case and I got the same result. Still can't see the debug log

gshoultz42 avatar Apr 28 '25 16:04 gshoultz42

That was what I had in the beginning but I updated to the code just in case and I got the same result. Still can't see the debug log

@gshoultz42 I understand, that must be quite frustrating.

In any case, you should use the simplified code snippet I sent you with only the sentry_sdk.capture_exception; the other stuff you added should not have any affect on the logs, and they may also cause unexpected behavior, since you are essentially sending the exception twice with what you wrote.

But, just to confirm, which Python SDK version were you using just now? And, you seeing any other log outputs from sources other than the Sentry-Python SDK? Also, is the error event still not showing up in Sentry?

It would be super helpful if you could also provide the full logs from your latest attempt, and also the code you are running (especially the sentry_sdk.init call, and the lines where the exception is raised).

szokeasaurusrex avatar Apr 28 '25 16:04 szokeasaurusrex

I'm seeing the same thing with a beam pipeline and the GCP dataflow runner. I can only have events sent when using the direct runner since sentry is initialized in the same process where the pipeline is executed.

When using the dataflow runner events are not sent to sentry. The dataflow worker logs show exceptions being wrapped by the sentry BeamIntegration but events aren't sent. I think this is because sentry is only being initialized on the pipeline launcher where the pipeline is defined, but is not initialized on the dataflow workers where the data processing DoFns are distributed to.

jbandoro avatar Jun 16 '25 22:06 jbandoro

I'm not that familiar with Beam -- is there a way to run some sort of setup code on the workers where you could call sentry_sdk.init()? Or is there a way to add the init() to a place in the code that gets executed by the workers?

sentrivana avatar Jun 17 '25 06:06 sentrivana

I'm not that familiar with Beam -- is there a way to run some sort of setup code on the workers where you could call sentry_sdk.init()? Or is there a way to add the init() to a place in the code that gets executed by the workers?

Since the beam framework can run pipelines on different execution engines (runners) it might be different for each of the supporter runners. For the python sdk there is a SdkWorker which I think it used by the different runners (maybe not direct runner) and might be a good place to initialize sentry: https://github.com/apache/beam/blob/730388a688bbd37c2d0636c3fe0ac20c2b0f4641/sdks/python/apache_beam/runners/worker/sdk_worker.py#L642C1-L649C22

There isn't a way I know of to inject custom code on worker start up with the python sdk.

One way we found to send events to sentry from the workers on dataflow is to do the sentry_sdk.init() in the DoFn.setup like in the example below:

class MyDoFn(beam.DoFn):

    def setup(self) -> None:
        sentry_sdk.init(
            dsn=SENTRY_DSN,
            environment=ENVIRONMENT,
            integrations=[BeamIntegration()],
            auto_enabling_integrations=False,
        )

    def process(self, element):
        ...

This isn't ideal though since it only works for user defined DoFn's and not for the beam provided ptransforms. I had to disable the auto enabling of integrations or else in the dataflow worker environment it tries to import integrations like Starlette that aren't there.

jbandoro avatar Jun 17 '25 16:06 jbandoro

Could you all try also enabling the GCP Integration, since you are using GCP? You would do this like so:

import sentry_sdk
from sentry_sdk.integrations.beam import BeamIntegration
from sentry_sdk.integrations.gcp import GcpIntegration

sentry_sdk.init(
    integrations=[BeamIntegration(), GcpIntegration(), ...],
    ...
)

Please let me know whether that fixes anything!

szokeasaurusrex avatar Jun 23 '25 08:06 szokeasaurusrex

Thanks for helping! I tried the GCPIntegration for initializing sentry in the beam pipeline definition and it doesn't resolve the issue of the dataflow workers not sending events to sentry. From the docs for the GCPIntegration it's meant for Google Cloud Functions which is maybe why there is no effect when using this for an apache beam pipeline with the GCP Dataflow runner.

Below is a minimal example of a beam pipeline with a ValueError added and sentry being initialized. Sentry events are sent when running the beam pipeline locally with the DirectRunner since the pipeline graph parsing and execution is done in the same python process.

import apache_beam as beam
import sentry_sdk
from apache_beam.options.pipeline_options import PipelineOptions
from sentry_sdk.integrations.beam import BeamIntegration


class MyDoFn(beam.DoFn):
    """User defined DoFn for processing elements in a PCollection."""

    def process(self, element):
        if element == "pipeline":
            # Simulate an error
            raise ValueError("Simulated error in processing element: 'pipeline'")
        yield f"Processed: {element}"


def run_pipeline():
    # Use env vars for initializing sentry_sdk, SENTRY_DSN, SENTRY_ENVIRONMENT
    sentry_sdk.init(integrations=[BeamIntegration()])

    options = PipelineOptions()
    with beam.Pipeline(options=options) as p:
        input_data = ["hello", "world", "beam", "pipeline"]
        elements = p | "Create Elements" >> beam.Create(input_data)

        processed_elements = elements | "Apply MyDoFn" >> beam.ParDo(MyDoFn())

        processed_elements | "Print Results" >> beam.Map(print)


if __name__ == "__main__":
    run_pipeline()

However when using the DataflowRunner, sentry is only initialized in the pipeline parser/launcher. When the dataflow distributes work to their workers to run MyDoFn, sentry has not yet been initialized in the worker environment and events are not sent.

This is why the workaround below works where you initialize sentry in the DoFn.setup method which is run on a worker:

class MyDoFn(beam.DoFn):
    """User defined DoFn for processing elements in a PCollection."""

    def setup(self):
        if not sentry_sdk.is_initialized():
            sentry_sdk.init(
                integrations=[BeamIntegration()],
                auto_enabling_integrations=False,
            )

    def process(self, element):
        if element == "pipeline":
            # Simulate an error
            raise ValueError("Simulated error in processing element: 'pipeline'")
        yield f"Processed: {element}"

As mentioned in an earlier comment, I'm not aware of a way to inject custom startup code on worker with the beam python sdk.

jbandoro avatar Jun 23 '25 17:06 jbandoro

Thanks for the workaround and for providing examples @jbandoro! This is very helpful.

Another Beam noobie question from me: Is there any way to communicate with the workers or provide any data to them? E.g. by setting environment vars? That's currently the only way I can imagine we could make this work automatically (monkeypatch e.g. the SdkWorker -- or whatever makes sense, further investigation needed -- to initialize Sentry if not initialized, using options from env vars).

sentrivana avatar Jun 24 '25 15:06 sentrivana

Another Beam noobie question from me: Is there any way to communicate with the workers or provide any data to them? E.g. by setting environment vars? That's currently the only way I can imagine we could make this work automatically (monkeypatch e.g. the SdkWorker -- or whatever makes sense, further investigation needed -- to initialize Sentry if not initialized, using options from env vars).

AFAIK there is no configuration or setting in the beam python sdk to allow this, but you can build your own custom SDK container to use in dataflow (docs here). So in that image you could set environment variables there. We currently do this to set SENTRY_DSN.

jbandoro avatar Jun 24 '25 18:06 jbandoro