pixie icon indicating copy to clipboard operation
pixie copied to clipboard

Planner incorrectly plans `upid_to_pod_name` for some scripts

Open htroisi opened this issue 3 years ago • 2 comments

Describe the bug This MySQL Spans script does not error when used as a plugin export script, however it does error when used in the Scratch Pad.

To Reproduce

  1. Add the following script as a custom export script and observe that it successfully exports records:
      #px:set max_output_rows_per_table=500

      import px

      def remove_duplicate_traces(df):
          ''' Removes duplicate traces.
              For historical reasons, Pixie traces MySQL requests on both the client AND server side:
              https://github.com/pixie-io/pixie/blob/5e5598ac46f39219148a36468b5318b1466a92d4/src/stirling/source_connectors/socket_tracer/conn_tracker.cc#L639
          '''
          # Keep client-side traces if server is outside the cluster (can't be resolved to pod or svc)
          df.remote_pod_id = px.ip_to_pod_id(df.remote_addr)
          df.remote_service_id = px.ip_to_service_id(df.remote_addr)
          df.remote_outside_cluster = df.remote_pod_id == '' and df.remote_service_id == ''
          df_client_traces = df[df.trace_role == 1 and df.remote_outside_cluster]
          df_server_traces = df[df.trace_role == 2]
          df_server_traces.append(df_client_traces)
          return df_server_traces

      def remove_ns_prefix(column):
          return px.replace('[a-z0-9\-]*/', column, '')

      def add_source_dest_columns(df):
          df.pod = df.ctx['pod']
          df.namespace = df.ctx['namespace']
          df.container = df.ctx['container']
          # If remote_addr is a pod, get its name. If not, use IP address.
          df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr))
          df.is_ra_pod = df.ra_pod != ''
          df.ra_name = px.select(df.is_ra_pod, df.ra_pod, df.remote_addr)
          df.is_server_tracing = df.trace_role == 2
          df.is_source_pod_type = px.select(df.is_server_tracing, df.is_ra_pod, True)
          df.is_dest_pod_type = px.select(df.is_server_tracing, True, df.is_ra_pod)
          # Set client and server based on trace_role.
          df.source_pod = px.select(df.is_server_tracing, df.ra_name, df.pod)
          df.destination_pod = px.select(df.is_server_tracing, df.pod, df.ra_name)
          df.source_service = px.pod_name_to_service_name(df.source_pod)
          df.destination_service = px.pod_name_to_service_name(df.destination_pod)
          df.destination_namespace = px.pod_name_to_namespace(df.destination_pod)
          df.source_namespace = px.pod_name_to_namespace(df.source_pod)
          df.source_service = remove_ns_prefix(df.source_service)
          df.destination_service = remove_ns_prefix(df.destination_service)
          df.source_container = px.select(df.is_server_tracing, '', df.container)
          df.source_pod = remove_ns_prefix(df.source_pod)
          df.destination_pod = remove_ns_prefix(df.destination_pod)
          # If the destination service is missing then try the nslookup
          df.destination_service = px.select(
              df.destination_service != '',
              df.destination_service,
              px.nslookup(df.remote_addr),
          )
          # If the destination service is still missing then set the remote_addr
          df.destination_service = px.select(
              df.destination_service != '',
              df.destination_service,
              df.remote_addr,
          )
          return df.drop(['ra_pod', 'is_ra_pod', 'ra_name', 'is_server_tracing'])

      df = px.DataFrame('mysql_events', start_time=px.plugin.start_time, end_time=px.plugin.end_time)
      df = remove_duplicate_traces(df)
      df = add_source_dest_columns(df)
      df.normed_query_struct = px.normalize_mysql(df.req_body, df.req_cmd)
      df.query = px.pluck(df.normed_query_struct, 'query')
      df = df[df.query != ""]
      df.start_time = df.time_ - df.latency
      df.latency = df.latency / (1000 * 1000)
      df.req_bytes = px.length(df.req_body)
      df.resp_bytes = px.length(df.resp_body)
      df.req_cmd = px.mysql_command_name(df.req_cmd)
      df.cluster_name = px.vizier_name()
      df.cluster_id = px.vizier_id()
      df.pixie = 'pixie'
      df.db_system = 'mysql'

      # Export client span format for the client side of the request
      px.export(
        df, px.otel.Data(
          resource={
            'service.name': df.source_service,
            'k8s.container.name': df.source_container,
            'service.instance.id': df.source_pod,
            'k8s.pod.name': df.source_pod,
            'k8s.namespace.name': df.source_namespace,
            'px.cluster.id': df.cluster_id,
            'k8s.cluster.name': df.cluster_name,
            'instrumentation.provider': df.pixie,
          },
          data=[
            px.otel.trace.Span(
              name=df.query,
              start_time=df.start_time,
              end_time=df.time_,
              kind=px.otel.trace.SPAN_KIND_CLIENT,
              attributes={
                'db.system': df.db_system,
              },
            ),
          ],
        ),
      )

      # Export server span format for the server (MySQL) side of the reques
      px.export(
        df, px.otel.Data(
          resource={
            'service.name': df.source_service,
            'service.instance.id': df.source_pod,
            'k8s.namespace.name': df.source_namespace,
            'mysql.service.name': df.destination_service,
            'mysql.pod.name': df.destination_pod,
            'mysql.namespace.name': df.destination_namespace,
            'k8s.cluster.name': df.cluster_name,
            'px.cluster.id': df.cluster_id,
            'instrumentation.provider': df.pixie,
          },
          data=[
            px.otel.trace.Span(
              name=df.query,
              start_time=df.start_time,
              end_time=df.time_,
              kind=px.otel.trace.SPAN_KIND_SERVER,
              attributes={
                'mysql.req_cmd': df.req_cmd,
                'mysql.req_body': df.req_body,
                'mysql.req_bytes': df.req_bytes,
                'mysql.resp_bytes': df.resp_bytes,
                'mysql.resp_latency': df.latency,
                'mysql.resp_status': df.resp_status,
                'mysql.resp_body': df.resp_body,
                'db.system': df.db_system,
              },
            ),
          ],
        ),
      )
  1. Copy the script into the Scratch Pad with the following modifications:
  • remove px.plugin.start_time
  • remove the calls to px.export() and replace with px.display()
  1. Observe the compiler error:
Compiler error on line 23, column 20: UDF 'upid_to_pod_name' must execute before blocking nodes such as limit, agg, and join..

Expected behavior Scripts executed in the Plugin should present the same compiler errors as those executed in the Scratch Pad.

htroisi avatar Nov 15 '22 19:11 htroisi

The bug is that we are not pushing upid_to_pod_name when px.display is present. Not sure why that happens, seems like a strange edge case.

The plugin behavior actually looks like it shouldn't throw an error. Added

#px:set explain=true
#px:set analyze=true

And saw that the upid_to_pod_name function got planned on PEMs rather than on Kelvin. It seems like when you add px.display, the planner doesn't do the right thing here.

philkuz avatar Nov 15 '22 19:11 philkuz

I ran into the same error message recently while working on #2078. The root cause of that situation was different from this issue, however, I believe I've tracked down the source of this problem.

The SplitPEMAndKelvinOnlyUDFOperatorRule compiler rule works by splitting PEM and Kelvin only UDFs contained within the same Map (source). The PxL script above pxl script is hand crafted to contain Kelvin only UDFs prior to PEM only UDFs. Since the SplitPEMAndKelvinOnlyUDFOperatorRule rule only works if conflicting UDFs are inside the same node, the graph ends up in a situation where it can't schedule the functions properly.

This can be seen with the following stripped down example script:

import px
def remove_duplicate_traces(df):
    df.remote_pod_id = px.ip_to_pod_id(df.remote_addr)
    df.remote_service_id = px.ip_to_service_id(df.remote_addr)
    df.remote_outside_cluster = df.remote_pod_id == '' and df.remote_service_id == ''
    return df
def add_source_dest_columns(df):
    df.pod = df.ctx['pod']
    df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr))
    df.is_ra_pod = df.ra_pod != ''
    df.ra_name = px.select(df.is_ra_pod, df.ra_pod, df.remote_addr)
    return df.drop(['ra_pod', 'is_ra_pod', 'ra_name'])
df = px.DataFrame('http_events', select=['time_', 'upid', 'remote_addr'], start_time='-5m')
df = remove_duplicate_traces(df)   # Runs Kelvin only UDFs (px.ip_to_pod_id) & px.ip_to_service_id
df = add_source_dest_columns(df) # Runs PEM only UDFs
px.display(df)

Running this script as is results in the compiler error mentioned above, but reordering the remove_duplicate_traces and add_source_dest_columns calls results in a working query. I did not investigate why using px.export results in different behavior, but it seems being able to handle this case would be ideal.

I have a crude code change that fixes this on the following branch. From some early testing, it appears that change also results in slightly better performance for the src/e2e_test/vizier/exectime:exectime benchmark.

This bug doesn't appear to be pressing, so it may take some time for me to clean my change up and understand how the px.export case works. But I will follow up when I have time to do so.

ddelnano avatar Jan 16 '25 01:01 ddelnano