airflow icon indicating copy to clipboard operation
airflow copied to clipboard

feat: log client db messages for provider postgres

Open dondaum opened this issue 1 year ago • 3 comments

closes: #40116

Implement a new option to log database messages or errors sent to the client. In my opinion, changing the SQLExecuteQueryOperator is the best approach as it avoids a lot of code duplication and allows other database hooks to implement similar behavior.

To give it a try:

import datetime
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


proc_name = """
CREATE PROCEDURE raise_notice (s text) LANGUAGE PLPGSQL AS
 $$
 BEGIN
     raise notice 'Message from db: %', s;
 END;
 $$;
"""

with DAG(
    dag_id="pg_dag",
    start_date=datetime.datetime(2021, 1, 1),
    schedule=None,
):
    proc = SQLExecuteQueryOperator(
        task_id="create_proc",
        conn_id="postgres_default",
        sql=proc_name,
    )


    no_notice = SQLExecuteQueryOperator(
        task_id="create_proc",
        conn_id="postgres_default",
        sql="call raise_notice('42')",
    )


    with_notice = SQLExecuteQueryOperator(
        task_id="create_proc",
        conn_id="postgres_default",
        sql=proc_name,
        sql="call raise_notice('42')",
        hook_params={"enable_log_db_messages": True},
    )


    delete_proc = SQLExecuteQueryOperator(
        task_id="create_proc",
        conn_id="postgres_default",
        sql="DROP PROCEDURE raise_notice (s text)",
    )

    proc >> [no_notice, with_notice] >> delete_proc

^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

dondaum avatar Jun 11 '24 09:06 dondaum

What's working for me right now is this:

 const onConnectStart: OnConnectStart = useCallback((evt, node) => {
    const { nodeId } = node
    const pNode = nodeId ? getNode(nodeId) : {}
    // @ts-ignore
    parentNode.current = pNode
    connectingNodeId.current = nodeId
  }, [])

  const onConnectEnd: OnConnectEnd = useCallback(
    (event) => {
      if (!connectingNodeId.current) return

      const targetIsPane = (event.target as Element).classList.contains(
        'react-flow__pane'
      )

      if (targetIsPane) {
        // Get mouse position
        // @ts-ignore
        const clientX = (event?.clientX as number) || 0
        // @ts-ignore
        const clientY = (event?.clientY as number) || 0

        // Convert mouse coordinates to relative position
        const screenPos = screenToFlowPosition({
          x: clientX,
          y: clientY,
        })

        // Get the absolute position of the parent node (if any)
        const parentX = parentNode.current?.positionAbsolute?.x || 0
        const parentY = parentNode.current?.positionAbsolute?.y || 0

        // Subtract parent coords from relative screenPos
        const posX = screenPos.x - parentX
        const posY = screenPos.y - parentY

        const newId = getId()

        const newNode = NewNode({
          id: newId,
          position: { x: posX, y: posY },
          parentId: connectingNodeId.current,
        })

        setNodes((nds) => nds.concat(newNode))

        setEdges((eds) =>
          // @ts-ignore
          eds.concat({
            id: newId,
            source: connectingNodeId.current,
            target: newId,
          })
        )
      }
    },
    [screenToFlowPosition]
  )

This won't work anymore when I upgrade to v12 though...

TrySpace avatar May 02 '24 18:05 TrySpace

Yes, I also encountered the same problem, is there any solution?

Fzlw avatar Jul 05 '24 01:07 Fzlw

@TrySpace the solution you posted is the actual way you'd do it. The reason this does not translate 1:1 to RF 12 is because we moved positionAbsolute to the internals.

You can use getInternalNode(parentId) to get the InternalNode (node with all kinds of tracking information attached) and access parentNode.internals.positionAbsolute.

Does this answer your question?

peterkogo avatar Aug 14 '24 09:08 peterkogo

I will close this issue. Feel free to ask anything over at our Discord.

peterkogo avatar Sep 19 '24 10:09 peterkogo