Multiple flow, no unique ID, params are not unique per instance ?
Hi, great work.
I am playing around and wanted to make a few more examples while learning.
I find that Node and Flow are missing a unique id that can be printed from within for debugging purpose.
like print(self.Node.name) and print(self.Flow.name).
So I am trying to use params to make a flow_id variable. Sub flow will be the next step, but I am having a problem right now.
It could also be nice for a node to know where it is in a Flow or a subFlow like print(self.Flow.node_history) and find parent Flow if we are in a subflow print(self.Flow.parents)
Also, unique node and flow names would be useful for graph when naming has meaning.
So I got this code :
from pocketflow import *
import os
def call_llm(prompt):
# Your API logic here
return prompt
class LoadFile(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
print(f"Flow name: {self.params["flow_id"]}")
"""Load file from disk"""
filename = self.params["filename"]
with open(filename, "r") as file:
return file.read()
def exec(self, prep_res):
"""Return file content"""
return prep_res
def post(self, shared, prep_res, exec_res):
"""Store file content in shared"""
shared["file_content"] = exec_res
return "default"
class GetOpinion(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
"""Get file content from shared"""
if not shared.get("reworked_file_content"):
return shared["file_content"]
else:
return "Original text :\n" + shared["file_content"] + "Revised version:\n" + shared["reworked_file_content"]
def exec(self, prep_res):
"""Ask LLM for opinion on file content"""
prompt = f"What's your opinion on this text: {prep_res}. Provide opinion on how to make it better."
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
"""Store opinion in shared"""
shared["opinion"] = exec_res
return "default"
class GetValidation(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
"""Get file content from shared"""
shared['discussion'] = shared["file_content"] + shared["opinion"] + "Final revised text : " + shared["reworked_file_content"]
return
def exec(self, prep_res):
"""Ask LLM for opinion on file content"""
prompt = f"Validate that the final revised text is valid and reflects the changes proposed in opinion : {prep_res}. \nReply `IS VALID` if it is of `NOT VALID` if it needs some more work."
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
"""Store rework count in shared"""
if "IS VALID" in exec_res:
return "default"
else:
return "invalid"
class ReworkFile(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
"""Get file content and opinion from shared"""
return shared["file_content"], shared["opinion"]
def exec(self, prep_res):
"""Ask LLM to rework file based on opinion"""
file_content, opinion = prep_res
prompt = f"Rework this text based on the opinion: {opinion}\n\nOriginal text: {file_content}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
"""Store reworked file content in shared"""
rework_count = self.params["rework2_flow_min_count"]
shared["reworked_file_content"] = exec_res
if not shared.get("reworked_file_content_count"):
shared["reworked_file_content_count"] = 1
elif shared.get("reworked_file_content_count"):
shared["reworked_file_content_count"] += 1
if shared["reworked_file_content_count"] < rework_count and self.params["flow_id"] == "rework_flow2":
print(f"Less than {self.params["rework2_flow_min_count"]} rework for rework2_flow, so going for pass #{shared["reworked_file_content_count"]}.")
return "rework"
else:
return "default"
class SaveFile(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
"""Get reworked file content and original filename from shared"""
filename = self.params["filename"]
return shared["reworked_file_content"], filename
def exec(self, prep_res):
"""Save reworked file content to new file"""
reworked_file_content, filename = prep_res
new_filename = f"{filename.split('.')[0]}_v2.{filename.split('.')[-1]}"
with open(new_filename, "w") as file:
file.write(reworked_file_content)
return reworked_file_content
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
"""Return success message"""
print(f"Saved to {filename} the content : \n{exec_res}")
##############################################
# # # Comment this from here
# # First flow
# Create nodes
load_Node = LoadFile()
opinion_Node = GetOpinion()
rework_Node = ReworkFile()
save_Node = SaveFile()
# Connect nodes
load_Node >> opinion_Node >> rework_Node >> save_Node
# Create flow
rework_Flow = Flow(start=load_Node)
# Set flow params
rework_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow"})
# Run flow
shared = {}
rework_Flow.run(shared)
# # # To here for second workflow to work
##############################################
# # Second flow
# Create nodes
load2_Node = LoadFile()
opinion2_Node = GetOpinion()
rework2_Node = ReworkFile()
valid2_Node = GetValidation()
save2_Node = SaveFile()
# Connect nodes
load2_Node >> opinion2_Node
opinion2_Node >> rework2_Node
rework2_Node - "default" >> valid2_Node
rework2_Node - "rework" >> opinion2_Node
# Get second opinion it if rework asked because in rework_flow2 and less than 2 rework
valid2_Node - "invalid" >> opinion2_Node
valid2_Node - "default" >> save2_Node
# Create flow
rework2_Flow = Flow(start=load2_Node)
# Set flow params
# This will not set params if class Flow was already initialized with other params ?
rework2_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow2", "rework2_flow_min_count" : 3})
# Run flow
shared2 = {}
rework2_Flow.run(shared2)
It seems that rework2_Flow doesn't get the param rework2_flow_min_count if rework_Flow was initialized before.
Other ways ?
I use uuid in a other file, but since it only has one Flow, it works.
Since this is built to make multiple flows and interconnect them, addressing this issue should be important.
The other way would be to use shared to store node and flow history, but it seems overly complex for the task.
This is the answer I got from llama 405b:
- Use instance-level parameters If you still want to keep the set_params method, you can modify it to set instance-level parameters instead of class-level parameters.
class Flow:
def __init__(self, start):
self.start = start
self._params = {}
def set_params(self, params):
self._params = params
@property
def params(self):
return self._params
This way, each flow instance will have its own set of parameters.
Any input on this, ?
Thanks
Hey thank you for the question!
So I am trying to use params to make a flow_id variable. Sub flow will be the next step, but I am having a problem right now
Note that a node instance could get reused for many times (especially for batch) so instance level id is not suggested.
But I see you use class level id print(f" In : {self.__class__.__name__}") which is smart!!
It could also be nice for a node to know where it is in a Flow or a subFlow like print(self.Flow.node_history) and find parent Flow if we are in a subflow print(self.Flow.parents)
Good point! I need to think about this more. The challenge is that a node/flow can be used by many different flows.
This history stack is better to be collected during run time. Something like inspect but I need more time to work on it:
import inspect
class Bar:
def __init__(self):
pass
def exec(self):
stack = inspect.stack()
for frame in stack:
frame_info = frame.frame
if 'self' in frame_info.f_locals: # Check if it's in a class
caller_instance = frame_info.f_locals['self']
class_name = type(caller_instance).__name__
print(f"Method 'exec' in Bar was called by class: {class_name}")
return
print("No class context found")
class Foo:
def __init__(self, bar):
self.bar = bar
def exec(self):
self.bar.exec()
# Instantiate Bar and pass it to Foo
bar_instance = Bar()
foo_instance = Foo(bar_instance)
# Call exec on Foo, which in turn calls exec on Bar
foo_instance.exec()
It seems that rework2_Flow doesn't get the param rework2_flow_min_count if rework_Flow was initialized before.
I don't fully follow the issue here.
I tried to run your codes with a dummy example.txt
However, the first rework_Flow won't run and throw an error because key error rework_count = self.params["rework2_flow_min_count"]. It doesn't reach the rework2_Flow.
However, if I comment out rework_Flow, the outputs of rework2_Flow look good to me:
In : LoadFile
Flow name: rework_flow2
In : GetOpinion
In : ReworkFile
Less than 3 rework for rework2_flow, so going for pass #1.
In : GetOpinion
In : ReworkFile
Less than 3 rework for rework2_flow, so going for pass #2.
In : GetOpinion
In : ReworkFile
In : GetValidation
In : SaveFile
Saved to example.txt the content :
Rework this text based on the opinion: What's your opinion on this text: Original text :
"dummy example text"
Revised version:
Rework this text based on the opinion: What's your opinion on this text: Original text :
"dummy example text"
Revised version:
Rework this text based on the opinion: What's your opinion on this text: "dummy example text"
. Provide opinion on how to make it better.
Original text: "dummy example text"
. Provide opinion on how to make it better.
Original text: "dummy example text"
. Provide opinion on how to make it better.
Original text: "dummy example text"
Thanks for the quick reply, comment out line 81 : rework_count = self.params["rework2_flow_min_count"]
That was added while debugging.. Also it seems like I'm quite tired as a few bugs got in...
Here is more up to date code :
from pocketflow import *
import os
def call_llm(prompt):
# Your API logic here
return prompt
class LoadFile(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
print(f"Flow name: {self.params["flow_id"]}")
"""Load file from disk"""
filename = self.params["filename"]
with open(filename, "r") as file:
return file.read()
def exec(self, prep_res):
"""Return file content"""
return prep_res
def post(self, shared, prep_res, exec_res):
"""Store file content in shared"""
shared["file_content"] = exec_res
return "default"
class GetOpinion(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
"""Get file content from shared"""
if not shared.get("reworked_file_content"):
return shared["file_content"]
else:
return "Original text :\n" + shared["file_content"] + "Revised version:\n" + shared["reworked_file_content"]
def exec(self, prep_res):
"""Ask LLM for opinion on file content"""
prompt = f"What's your opinion on this text: {prep_res}. Provide opinion on how to make it better."
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
"""Store opinion in shared"""
shared["opinion"] = exec_res
return "default"
class GetValidation(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
"""Get file content from shared"""
shared['discussion'] = shared["file_content"] + shared["opinion"] + "Final revised text : " + shared["reworked_file_content"]
return
def exec(self, prep_res):
"""Ask LLM for opinion on file content"""
prompt = f"Validate that the final revised text is valid and reflects the changes proposed in opinion : {prep_res}. \nReply `IS VALID` if it is of `NOT VALID` if it needs some more work."
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
"""Store rework count in shared"""
if "IS VALID" in exec_res:
return "default"
else:
return "invalid"
class ReworkFile(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
"""Get file content and opinion from shared"""
return shared["file_content"], shared["opinion"]
def exec(self, prep_res):
"""Ask LLM to rework file based on opinion"""
file_content, opinion = prep_res
prompt = f"Rework this text based on the opinion: {opinion}\n\nOriginal text: {file_content}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
"""Store reworked file content in shared"""
if "rework2_flow_min_count" in self.params:
rework_count = self.params["rework2_flow_min_count"]
shared["reworked_file_content"] = exec_res
if not shared.get("reworked_file_content_count"):
shared["reworked_file_content_count"] = 1
elif shared.get("reworked_file_content_count"):
shared["reworked_file_content_count"] += 1
if shared["reworked_file_content_count"] < rework_count and self.params["flow_id"] == "rework_flow2":
print(f"Less than {self.params["rework2_flow_min_count"]} rework for rework2_flow, so going for pass #{shared["reworked_file_content_count"]}.")
return "rework"
else:
return "default"
else:
shared["reworked_file_content"] = exec_res
class SaveFile(Node):
def prep(self, shared):
print(f" In : {self.__class__.__name__}")
"""Get reworked file content and original filename from shared"""
filename = self.params["filename"]
if "reworked_file_content" in shared:
return shared["reworked_file_content"], filename
else:
print("Error")
def exec(self, prep_res):
"""Save reworked file content to new file"""
reworked_file_content, filename = prep_res
new_filename = f"{filename.split('.')[0]}_v2.{filename.split('.')[-1]}"
with open(new_filename, "w") as file:
file.write(reworked_file_content)
return reworked_file_content
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
"""Return success message"""
print(f"Saved to {filename} the content : \n{exec_res}")
# # # Comment this from here
# # First flow
# Create nodes
load_Node = LoadFile()
opinion_Node = GetOpinion()
rework_Node = ReworkFile()
save_Node = SaveFile()
# Connect nodes
load_Node >> opinion_Node >> rework_Node >> save_Node
# Create flow
rework_Flow = Flow(start=load_Node)
# Set flow params
rework_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow"})
# Run flow
shared = {}
rework_Flow.run(shared)
# # # To here for second workflow to work
# # Second flow
# Create nodes
load2_Node = LoadFile()
opinion2_Node = GetOpinion()
rework2_Node = ReworkFile()
valid2_Node = GetValidation()
save2_Node = SaveFile()
# Connect nodes
load2_Node >> opinion2_Node
opinion2_Node >> rework2_Node
rework2_Node - "default" >> valid2_Node
rework2_Node - "rework" >> opinion2_Node
# Get second opinion it if rework asked because in rework_flow2 and less than 2 rework
valid2_Node - "invalid" >> opinion2_Node
valid2_Node - "default" >> save2_Node
# Create flow
rework2_Flow = Flow(start=load2_Node)
# Set flow params
# This will not set params if class Flow was already initialized with other params ?
rework2_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow2", "rework2_flow_min_count" : 3})
# Run flow
shared2 = {}
rework2_Flow.run(shared2)
And the output seems to work ??? And flow_id works also... Sorry for that then, but the other ideas should be a good addition. Also this code could be added to a example section in the docs.
In : LoadFile
Flow name: rework_flow
In : GetOpinion
In : ReworkFile
In : SaveFile
Saved to example.txt the content :
Rework this text based on the opinion: What's your opinion on this text: Example file
. Provide opinion on how to make it better.
Original text: Example file
In : LoadFile
Flow name: rework_flow2
In : GetOpinion
In : ReworkFile
Less than 3 rework for rework2_flow, so going for pass #1.
In : GetOpinion
In : ReworkFile
Less than 3 rework for rework2_flow, so going for pass #2.
In : GetOpinion
In : ReworkFile
In : GetValidation
In : SaveFile
Saved to example.txt the content :
Rework this text based on the opinion: What's your opinion on this text: Original text :
Example file
Revised version:
Rework this text based on the opinion: What's your opinion on this text: Original text :
Example file
Revised version:
Rework this text based on the opinion: What's your opinion on this text: Example file
. Provide opinion on how to make it better.
Original text: Example file
. Provide opinion on how to make it better.
Original text: Example file
. Provide opinion on how to make it better.
Original text: Example file
I've created an example function that tracks call stack for nodes: https://the-pocket.github.io/PocketFlow/viz.html
Minimal examples that will work for your example:
from pocketflow import *
import os
import inspect
def call_llm(prompt):
# Your API logic here
return prompt
import inspect
def get_node_call_stack():
stack = inspect.stack()
node_names = []
seen_ids = set()
for frame_info in stack[1:]:
local_vars = frame_info.frame.f_locals
if 'self' in local_vars:
caller_self = local_vars['self']
if isinstance(caller_self, BaseNode) and id(caller_self) not in seen_ids:
seen_ids.add(id(caller_self))
node_names.append(type(caller_self).__name__)
return node_names
class LoadFile(Node):
def prep(self, shared):
stack = get_node_call_stack()
print("Call stack:", stack)
# print(inspect.stack())
print(f'Flow name: {self.params["flow_id"]}')
"""Load file from disk"""
filename = self.params["filename"]
with open(filename, "r") as file:
return file.read()
def exec(self, prep_res):
"""Return file content"""
return prep_res
def post(self, shared, prep_res, exec_res):
"""Store file content in shared"""
shared["file_content"] = exec_res
return "default"
class GetOpinion(Node):
def prep(self, shared):
stack = get_node_call_stack()
print("Call stack:", stack)
"""Get file content from shared"""
if not shared.get("reworked_file_content"):
return shared["file_content"]
else:
return "Original text :\n" + shared["file_content"] + "Revised version:\n" + shared["reworked_file_content"]
def exec(self, prep_res):
"""Ask LLM for opinion on file content"""
prompt = f"What's your opinion on this text: {prep_res}. Provide opinion on how to make it better."
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
"""Store opinion in shared"""
shared["opinion"] = exec_res
return "default"
class GetValidation(Node):
def prep(self, shared):
stack = get_node_call_stack()
print("Call stack:", stack)
"""Get file content from shared"""
shared['discussion'] = shared["file_content"] + shared["opinion"] + "Final revised text : " + shared["reworked_file_content"]
return
def exec(self, prep_res):
"""Ask LLM for opinion on file content"""
prompt = f"Validate that the final revised text is valid and reflects the changes proposed in opinion : {prep_res}. \nReply `IS VALID` if it is of `NOT VALID` if it needs some more work."
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
"""Store rework count in shared"""
if "IS VALID" in exec_res:
return "default"
else:
return "invalid"
class ReworkFile(Node):
def prep(self, shared):
stack = get_node_call_stack()
print("Call stack:", stack)
"""Get file content and opinion from shared"""
return shared["file_content"], shared["opinion"]
def exec(self, prep_res):
"""Ask LLM to rework file based on opinion"""
file_content, opinion = prep_res
prompt = f"Rework this text based on the opinion: {opinion}\n\nOriginal text: {file_content}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
"""Store reworked file content in shared"""
rework_count = self.params["rework2_flow_min_count"]
shared["reworked_file_content"] = exec_res
if not shared.get("reworked_file_content_count"):
shared["reworked_file_content_count"] = 1
elif shared.get("reworked_file_content_count"):
shared["reworked_file_content_count"] += 1
if shared["reworked_file_content_count"] < rework_count and self.params["flow_id"] == "rework_flow2":
print(f"Less than {self.params['rework2_flow_min_count']} rework for rework2_flow, so going for pass #{shared['reworked_file_content_count']}.")
return "rework"
else:
return "default"
class SaveFile(Node):
def prep(self, shared):
stack = get_node_call_stack()
print("Call stack:", stack)
"""Get reworked file content and original filename from shared"""
filename = self.params["filename"]
return shared["reworked_file_content"], filename
def exec(self, prep_res):
"""Save reworked file content to new file"""
reworked_file_content, filename = prep_res
new_filename = f"{filename.split('.')[0]}_v2.{filename.split('.')[-1]}"
with open(new_filename, "w") as file:
file.write(reworked_file_content)
return reworked_file_content
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
"""Return success message"""
print(f"Saved to {filename} the content : \n{exec_res}")
class Rework2_Flow(Flow):
pass
# # Create nodes
load2_Node = LoadFile()
opinion2_Node = GetOpinion()
rework2_Node = ReworkFile()
valid2_Node = GetValidation()
save2_Node = SaveFile()
# Connect nodes
load2_Node >> opinion2_Node
opinion2_Node >> rework2_Node
rework2_Node - "default" >> valid2_Node
rework2_Node - "rework" >> opinion2_Node
# Get second opinion it if rework asked because in rework_flow2 and less than 2 rework
valid2_Node - "invalid" >> opinion2_Node
valid2_Node - "default" >> save2_Node
# Create flow
rework2_Flow = Rework2_Flow(start=load2_Node)
# Set flow params
# This will not set params if class Flow was already initialized with other params ?
rework2_Flow.set_params({"filename": "example.txt", "flow_id" : "rework_flow2", "rework2_flow_min_count" : 3})
# Run flow
shared2 = {}
rework2_Flow.run(shared2)
I get the following output:
Call stack: ['LoadFile', 'Rework2_Flow']
Flow name: rework_flow2
Call stack: ['GetOpinion', 'Rework2_Flow']
Call stack: ['ReworkFile', 'Rework2_Flow']
Less than 3 rework for rework2_flow, so going for pass #1.
Call stack: ['GetOpinion', 'Rework2_Flow']
Call stack: ['ReworkFile', 'Rework2_Flow']
Less than 3 rework for rework2_flow, so going for pass #2.
Call stack: ['GetOpinion', 'Rework2_Flow']
Call stack: ['ReworkFile', 'Rework2_Flow']
Call stack: ['GetValidation', 'Rework2_Flow']
Call stack: ['SaveFile', 'Rework2_Flow']
Saved to example.txt the content :
Rework this text based on the opinion: What's your opinion on this text: Original text :
"dummy example text"
Revised version:
Rework this text based on the opinion: What's your opinion on this text: Original text :
"dummy example text"
Revised version:
Rework this text based on the opinion: What's your opinion on this text: "dummy example text"
. Provide opinion on how to make it better.
Original text: "dummy example text"
. Provide opinion on how to make it better.
Original text: "dummy example text"
. Provide opinion on how to make it better.
Original text: "dummy example text"