PocketFlow icon indicating copy to clipboard operation
PocketFlow copied to clipboard

Multiple flow, no unique ID, params are not unique per instance ?

Open johnr14 opened this issue 1 year ago • 5 comments

Hi, great work.

I am playing around and wanted to make a few more examples while learning.

I find that Node and Flow are missing a unique id that can be printed from within for debugging purpose. like print(self.Node.name) and print(self.Flow.name). So I am trying to use params to make a flow_id variable. Sub flow will be the next step, but I am having a problem right now.

It could also be nice for a node to know where it is in a Flow or a subFlow like print(self.Flow.node_history) and find parent Flow if we are in a subflow print(self.Flow.parents)

Also, unique node and flow names would be useful for graph when naming has meaning.

So I got this code :


from pocketflow import *
import os

def call_llm(prompt):
    # Your API logic here
    return prompt

class LoadFile(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        print(f"Flow name: {self.params["flow_id"]}")
        """Load file from disk"""
        filename = self.params["filename"]
        with open(filename, "r") as file:
            return file.read()

    def exec(self, prep_res):
        """Return file content"""
        return prep_res

    def post(self, shared, prep_res, exec_res):
        """Store file content in shared"""
        shared["file_content"] = exec_res
        return "default"


class GetOpinion(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        """Get file content from shared"""
        if not shared.get("reworked_file_content"):
            return shared["file_content"]
        else:
            return "Original text :\n" + shared["file_content"] + "Revised version:\n" + shared["reworked_file_content"]

    def exec(self, prep_res):
        """Ask LLM for opinion on file content"""
        prompt = f"What's your opinion on this text: {prep_res}. Provide opinion on how to make it better."
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        """Store opinion in shared"""
        shared["opinion"] = exec_res
        return "default"

class GetValidation(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        """Get file content from shared"""
        shared['discussion'] = shared["file_content"] + shared["opinion"] + "Final revised text : " + shared["reworked_file_content"]
        return

    def exec(self, prep_res):
        """Ask LLM for opinion on file content"""
        prompt = f"Validate that the final revised text is valid and reflects the changes proposed in opinion : {prep_res}. \nReply `IS VALID` if it is of `NOT VALID` if it needs some more work."
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        """Store rework count in shared"""
        if "IS VALID" in exec_res:
            return "default"
        else:
            return "invalid"


class ReworkFile(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        """Get file content and opinion from shared"""
        return shared["file_content"], shared["opinion"]

    def exec(self, prep_res):
        """Ask LLM to rework file based on opinion"""
        file_content, opinion = prep_res
        prompt = f"Rework this text based on the opinion: {opinion}\n\nOriginal text: {file_content}"
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        """Store reworked file content in shared"""
        rework_count = self.params["rework2_flow_min_count"]
        shared["reworked_file_content"] = exec_res
        if not shared.get("reworked_file_content_count"):
            shared["reworked_file_content_count"] = 1
        elif shared.get("reworked_file_content_count"):
            shared["reworked_file_content_count"] += 1

        if shared["reworked_file_content_count"] < rework_count and self.params["flow_id"] == "rework_flow2":
            print(f"Less than {self.params["rework2_flow_min_count"]} rework for rework2_flow, so going for pass #{shared["reworked_file_content_count"]}.")
            return "rework"
        else:
            return "default"


class SaveFile(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        """Get reworked file content and original filename from shared"""
        filename = self.params["filename"]
        return shared["reworked_file_content"], filename

    def exec(self, prep_res):
        """Save reworked file content to new file"""
        reworked_file_content, filename = prep_res
        new_filename = f"{filename.split('.')[0]}_v2.{filename.split('.')[-1]}"
        with open(new_filename, "w") as file:
            file.write(reworked_file_content)
        return reworked_file_content

    def post(self, shared, prep_res, exec_res):
        filename = self.params["filename"]
        """Return success message"""
        print(f"Saved to {filename} the content : \n{exec_res}")


##############################################
# # # Comment this from here
# # First flow
# Create nodes
load_Node = LoadFile()
opinion_Node = GetOpinion()
rework_Node = ReworkFile()
save_Node = SaveFile()

# Connect nodes
load_Node >> opinion_Node >> rework_Node >> save_Node

# Create flow
rework_Flow = Flow(start=load_Node)

# Set flow params
rework_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow"})

# Run flow
shared = {}
rework_Flow.run(shared)
# # # To here for second workflow to work
##############################################

# # Second flow
# Create nodes
load2_Node = LoadFile()
opinion2_Node = GetOpinion()
rework2_Node = ReworkFile()
valid2_Node = GetValidation()
save2_Node = SaveFile()

# Connect nodes
load2_Node >> opinion2_Node
opinion2_Node >> rework2_Node

rework2_Node - "default" >> valid2_Node
rework2_Node - "rework" >> opinion2_Node

# Get second opinion it if rework asked because in rework_flow2 and less than 2 rework
valid2_Node - "invalid" >> opinion2_Node
valid2_Node - "default" >> save2_Node

# Create flow
rework2_Flow = Flow(start=load2_Node)

# Set flow params
# This will not set params if class Flow was already initialized with other params ?
rework2_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow2", "rework2_flow_min_count" : 3})

# Run flow
shared2 = {}
rework2_Flow.run(shared2)

It seems that rework2_Flow doesn't get the param rework2_flow_min_count if rework_Flow was initialized before.

Other ways ?

I use uuid in a other file, but since it only has one Flow, it works. Since this is built to make multiple flows and interconnect them, addressing this issue should be important. The other way would be to use shared to store node and flow history, but it seems overly complex for the task.

This is the answer I got from llama 405b:

  1. Use instance-level parameters If you still want to keep the set_params method, you can modify it to set instance-level parameters instead of class-level parameters.

class Flow:
    def __init__(self, start):
        self.start = start
        self._params = {}

    def set_params(self, params):
        self._params = params

    @property
    def params(self):
        return self._params

This way, each flow instance will have its own set of parameters.

Any input on this, ?

Thanks

johnr14 avatar Jan 15 '25 15:01 johnr14

Hey thank you for the question!

So I am trying to use params to make a flow_id variable. Sub flow will be the next step, but I am having a problem right now

Note that a node instance could get reused for many times (especially for batch) so instance level id is not suggested. But I see you use class level id print(f" In : {self.__class__.__name__}") which is smart!!

zachary62 avatar Jan 15 '25 16:01 zachary62

It could also be nice for a node to know where it is in a Flow or a subFlow like print(self.Flow.node_history) and find parent Flow if we are in a subflow print(self.Flow.parents)

Good point! I need to think about this more. The challenge is that a node/flow can be used by many different flows. This history stack is better to be collected during run time. Something like inspect but I need more time to work on it:

import inspect

class Bar:
    def __init__(self):
        pass
    
    def exec(self):
        stack = inspect.stack()
        for frame in stack:
            frame_info = frame.frame
            if 'self' in frame_info.f_locals:  # Check if it's in a class
                caller_instance = frame_info.f_locals['self']
                class_name = type(caller_instance).__name__
                print(f"Method 'exec' in Bar was called by class: {class_name}")
                return
        print("No class context found")

class Foo:
    def __init__(self, bar):
        self.bar = bar

    def exec(self):
        self.bar.exec()

# Instantiate Bar and pass it to Foo
bar_instance = Bar()
foo_instance = Foo(bar_instance)

# Call exec on Foo, which in turn calls exec on Bar
foo_instance.exec()

zachary62 avatar Jan 15 '25 16:01 zachary62

It seems that rework2_Flow doesn't get the param rework2_flow_min_count if rework_Flow was initialized before.

I don't fully follow the issue here. I tried to run your codes with a dummy example.txt However, the first rework_Flow won't run and throw an error because key error rework_count = self.params["rework2_flow_min_count"]. It doesn't reach the rework2_Flow.

However, if I comment out rework_Flow, the outputs of rework2_Flow look good to me:

 In : LoadFile
Flow name: rework_flow2
 In : GetOpinion
 In : ReworkFile
Less than 3 rework for rework2_flow, so going for pass #1.
 In : GetOpinion
 In : ReworkFile
Less than 3 rework for rework2_flow, so going for pass #2.
 In : GetOpinion
 In : ReworkFile
 In : GetValidation
 In : SaveFile
Saved to example.txt the content : 
Rework this text based on the opinion: What's your opinion on this text: Original text :
"dummy example text"
Revised version:
Rework this text based on the opinion: What's your opinion on this text: Original text :
"dummy example text"
Revised version:
Rework this text based on the opinion: What's your opinion on this text: "dummy example text"
. Provide opinion on how to make it better.

Original text: "dummy example text"
. Provide opinion on how to make it better.

Original text: "dummy example text"
. Provide opinion on how to make it better.

Original text: "dummy example text"

zachary62 avatar Jan 15 '25 16:01 zachary62

Thanks for the quick reply, comment out line 81 : rework_count = self.params["rework2_flow_min_count"]

That was added while debugging.. Also it seems like I'm quite tired as a few bugs got in...

Here is more up to date code :


from pocketflow import *
import os

def call_llm(prompt):
    # Your API logic here
    return prompt

class LoadFile(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        print(f"Flow name: {self.params["flow_id"]}")
        """Load file from disk"""
        filename = self.params["filename"]
        with open(filename, "r") as file:
            return file.read()

    def exec(self, prep_res):
        """Return file content"""
        return prep_res

    def post(self, shared, prep_res, exec_res):
        """Store file content in shared"""
        shared["file_content"] = exec_res
        return "default"


class GetOpinion(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        """Get file content from shared"""
        if not shared.get("reworked_file_content"):
            return shared["file_content"]
        else:
            return "Original text :\n" + shared["file_content"] + "Revised version:\n" + shared["reworked_file_content"]

    def exec(self, prep_res):
        """Ask LLM for opinion on file content"""
        prompt = f"What's your opinion on this text: {prep_res}. Provide opinion on how to make it better."
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        """Store opinion in shared"""
        shared["opinion"] = exec_res
        return "default"

class GetValidation(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        """Get file content from shared"""
        shared['discussion'] = shared["file_content"] + shared["opinion"] + "Final revised text : " + shared["reworked_file_content"]
        return

    def exec(self, prep_res):
        """Ask LLM for opinion on file content"""
        prompt = f"Validate that the final revised text is valid and reflects the changes proposed in opinion : {prep_res}. \nReply `IS VALID` if it is of `NOT VALID` if it needs some more work."
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        """Store rework count in shared"""
        if "IS VALID" in exec_res:
            return "default"
        else:
            return "invalid"


class ReworkFile(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        """Get file content and opinion from shared"""
        return shared["file_content"], shared["opinion"]

    def exec(self, prep_res):
        """Ask LLM to rework file based on opinion"""
        file_content, opinion = prep_res
        prompt = f"Rework this text based on the opinion: {opinion}\n\nOriginal text: {file_content}"
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        """Store reworked file content in shared"""
        if "rework2_flow_min_count" in self.params:
            rework_count = self.params["rework2_flow_min_count"]
            shared["reworked_file_content"] = exec_res
            if not shared.get("reworked_file_content_count"):
                shared["reworked_file_content_count"] = 1
            elif shared.get("reworked_file_content_count"):
                shared["reworked_file_content_count"] += 1

            if shared["reworked_file_content_count"] < rework_count and self.params["flow_id"] == "rework_flow2":
                print(f"Less than {self.params["rework2_flow_min_count"]} rework for rework2_flow, so going for pass #{shared["reworked_file_content_count"]}.")
                return "rework"
            else:
                return "default"
        else:
            shared["reworked_file_content"] = exec_res


class SaveFile(Node):
    def prep(self, shared):
        print(f" In : {self.__class__.__name__}")
        """Get reworked file content and original filename from shared"""
        filename = self.params["filename"]
        if "reworked_file_content" in shared:
            return shared["reworked_file_content"], filename
        else:
            print("Error")

    def exec(self, prep_res):
        """Save reworked file content to new file"""
        reworked_file_content, filename = prep_res
        new_filename = f"{filename.split('.')[0]}_v2.{filename.split('.')[-1]}"
        with open(new_filename, "w") as file:
            file.write(reworked_file_content)
        return reworked_file_content

    def post(self, shared, prep_res, exec_res):
        filename = self.params["filename"]
        """Return success message"""
        print(f"Saved to {filename} the content : \n{exec_res}")


# # # Comment this from here
# # First flow
# Create nodes
load_Node = LoadFile()
opinion_Node = GetOpinion()
rework_Node = ReworkFile()
save_Node = SaveFile()

# Connect nodes
load_Node >> opinion_Node >> rework_Node >> save_Node

# Create flow
rework_Flow = Flow(start=load_Node)

# Set flow params
rework_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow"})

# Run flow
shared = {}
rework_Flow.run(shared)
# # # To here for second workflow to work

# # Second flow
# Create nodes
load2_Node = LoadFile()
opinion2_Node = GetOpinion()
rework2_Node = ReworkFile()
valid2_Node = GetValidation()
save2_Node = SaveFile()

# Connect nodes
load2_Node >> opinion2_Node
opinion2_Node >> rework2_Node

rework2_Node - "default" >> valid2_Node
rework2_Node - "rework" >> opinion2_Node

# Get second opinion it if rework asked because in rework_flow2 and less than 2 rework
valid2_Node - "invalid" >> opinion2_Node
valid2_Node - "default" >> save2_Node

# Create flow
rework2_Flow = Flow(start=load2_Node)

# Set flow params
# This will not set params if class Flow was already initialized with other params ?
rework2_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow2", "rework2_flow_min_count" : 3})

# Run flow
shared2 = {}
rework2_Flow.run(shared2)


And the output seems to work ??? And flow_id works also... Sorry for that then, but the other ideas should be a good addition. Also this code could be added to a example section in the docs.

 In : LoadFile
Flow name: rework_flow
 In : GetOpinion
 In : ReworkFile
 In : SaveFile
Saved to example.txt the content : 
Rework this text based on the opinion: What's your opinion on this text: Example file
. Provide opinion on how to make it better.

Original text: Example file

 In : LoadFile
Flow name: rework_flow2
 In : GetOpinion
 In : ReworkFile
Less than 3 rework for rework2_flow, so going for pass #1.
 In : GetOpinion
 In : ReworkFile
Less than 3 rework for rework2_flow, so going for pass #2.
 In : GetOpinion
 In : ReworkFile
 In : GetValidation
 In : SaveFile
Saved to example.txt the content : 
Rework this text based on the opinion: What's your opinion on this text: Original text :
Example file
Revised version:
Rework this text based on the opinion: What's your opinion on this text: Original text :
Example file
Revised version:
Rework this text based on the opinion: What's your opinion on this text: Example file
. Provide opinion on how to make it better.

Original text: Example file
. Provide opinion on how to make it better.

Original text: Example file
. Provide opinion on how to make it better.

Original text: Example file

johnr14 avatar Jan 15 '25 16:01 johnr14

I've created an example function that tracks call stack for nodes: https://the-pocket.github.io/PocketFlow/viz.html

Minimal examples that will work for your example:

from pocketflow import *
import os
import inspect

def call_llm(prompt):
    # Your API logic here
    return prompt

import inspect

def get_node_call_stack():
    stack = inspect.stack()
    node_names = []
    seen_ids = set()
    for frame_info in stack[1:]:
        local_vars = frame_info.frame.f_locals
        if 'self' in local_vars:
            caller_self = local_vars['self']
            if isinstance(caller_self, BaseNode) and id(caller_self) not in seen_ids:
                seen_ids.add(id(caller_self))
                node_names.append(type(caller_self).__name__)
    return node_names

class LoadFile(Node):
    def prep(self, shared):
        stack = get_node_call_stack()
        print("Call stack:", stack)
        # print(inspect.stack())
        print(f'Flow name: {self.params["flow_id"]}')
        """Load file from disk"""
        filename = self.params["filename"]
        with open(filename, "r") as file:
            return file.read()

    def exec(self, prep_res):
        """Return file content"""
        return prep_res

    def post(self, shared, prep_res, exec_res):
        """Store file content in shared"""
        shared["file_content"] = exec_res
        return "default"


class GetOpinion(Node):
    def prep(self, shared):
        stack = get_node_call_stack()
        print("Call stack:", stack)
        """Get file content from shared"""
        if not shared.get("reworked_file_content"):
            return shared["file_content"]
        else:
            return "Original text :\n" + shared["file_content"] + "Revised version:\n" + shared["reworked_file_content"]

    def exec(self, prep_res):
        """Ask LLM for opinion on file content"""
        prompt = f"What's your opinion on this text: {prep_res}. Provide opinion on how to make it better."
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        """Store opinion in shared"""
        shared["opinion"] = exec_res
        return "default"

class GetValidation(Node):
    def prep(self, shared):
        stack = get_node_call_stack()
        print("Call stack:", stack)
        """Get file content from shared"""
        shared['discussion'] = shared["file_content"] + shared["opinion"] + "Final revised text : " + shared["reworked_file_content"]
        return

    def exec(self, prep_res):
        """Ask LLM for opinion on file content"""
        prompt = f"Validate that the final revised text is valid and reflects the changes proposed in opinion : {prep_res}. \nReply `IS VALID` if it is of `NOT VALID` if it needs some more work."
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        """Store rework count in shared"""
        if "IS VALID" in exec_res:
            return "default"
        else:
            return "invalid"


class ReworkFile(Node):
    def prep(self, shared):
        stack = get_node_call_stack()
        print("Call stack:", stack)
        """Get file content and opinion from shared"""
        return shared["file_content"], shared["opinion"]

    def exec(self, prep_res):
        """Ask LLM to rework file based on opinion"""
        file_content, opinion = prep_res
        prompt = f"Rework this text based on the opinion: {opinion}\n\nOriginal text: {file_content}"
        return call_llm(prompt)

    def post(self, shared, prep_res, exec_res):
        """Store reworked file content in shared"""
        rework_count = self.params["rework2_flow_min_count"]
        shared["reworked_file_content"] = exec_res
        if not shared.get("reworked_file_content_count"):
            shared["reworked_file_content_count"] = 1
        elif shared.get("reworked_file_content_count"):
            shared["reworked_file_content_count"] += 1

        if shared["reworked_file_content_count"] < rework_count and self.params["flow_id"] == "rework_flow2":
            print(f"Less than {self.params['rework2_flow_min_count']} rework for rework2_flow, so going for pass #{shared['reworked_file_content_count']}.")
            return "rework"
        else:
            return "default"


class SaveFile(Node):
    def prep(self, shared):
        stack = get_node_call_stack()
        print("Call stack:", stack)
        """Get reworked file content and original filename from shared"""
        filename = self.params["filename"]
        return shared["reworked_file_content"], filename

    def exec(self, prep_res):
        """Save reworked file content to new file"""
        reworked_file_content, filename = prep_res
        new_filename = f"{filename.split('.')[0]}_v2.{filename.split('.')[-1]}"
        with open(new_filename, "w") as file:
            file.write(reworked_file_content)
        return reworked_file_content

    def post(self, shared, prep_res, exec_res):
        filename = self.params["filename"]
        """Return success message"""
        print(f"Saved to {filename} the content : \n{exec_res}")

class Rework2_Flow(Flow):
    pass

# # Create nodes
load2_Node = LoadFile()
opinion2_Node = GetOpinion()
rework2_Node = ReworkFile()
valid2_Node = GetValidation()
save2_Node = SaveFile()

# Connect nodes
load2_Node >> opinion2_Node
opinion2_Node >> rework2_Node

rework2_Node - "default" >> valid2_Node
rework2_Node - "rework" >> opinion2_Node

# Get second opinion it if rework asked because in rework_flow2 and less than 2 rework
valid2_Node - "invalid" >> opinion2_Node
valid2_Node - "default" >> save2_Node

# Create flow
rework2_Flow = Rework2_Flow(start=load2_Node)

# Set flow params
# This will not set params if class Flow was already initialized with other params ?
rework2_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow2", "rework2_flow_min_count" : 3})

# Run flow
shared2 = {}
rework2_Flow.run(shared2)

I get the following output:

Call stack: ['LoadFile', 'Rework2_Flow']
Flow name: rework_flow2
Call stack: ['GetOpinion', 'Rework2_Flow']
Call stack: ['ReworkFile', 'Rework2_Flow']
Less than 3 rework for rework2_flow, so going for pass #1.
Call stack: ['GetOpinion', 'Rework2_Flow']
Call stack: ['ReworkFile', 'Rework2_Flow']
Less than 3 rework for rework2_flow, so going for pass #2.
Call stack: ['GetOpinion', 'Rework2_Flow']
Call stack: ['ReworkFile', 'Rework2_Flow']
Call stack: ['GetValidation', 'Rework2_Flow']
Call stack: ['SaveFile', 'Rework2_Flow']
Saved to example.txt the content : 
Rework this text based on the opinion: What's your opinion on this text: Original text :
"dummy example text"
Revised version:
Rework this text based on the opinion: What's your opinion on this text: Original text :
"dummy example text"
Revised version:
Rework this text based on the opinion: What's your opinion on this text: "dummy example text"
. Provide opinion on how to make it better.

Original text: "dummy example text"
. Provide opinion on how to make it better.

Original text: "dummy example text"
. Provide opinion on how to make it better.

Original text: "dummy example text"

zachary62 avatar Jan 16 '25 05:01 zachary62