stack variable analyze failure and basic block analyze error
Version and Platform (required):
- Binary Ninja Version: 5.2.8722, c75356aa
- OS: windows
- OS Version: 10
- CPU Architecture: x86_64
Bug Description: stack variable in some function become sp+offset rather than variables
basic blocks analyze wrong and spilt function to multi parts , in the form of jump(address)
I tried to undefine function or make it to code but didn't help
Steps To Reproduce:
- run py script in the zip to use custom deobfcation workflow
- open the db
- in sub_423700 and sub_427fd4 you can see some bug behavior
Expected Behavior:
Screenshots:
Binary: BianryNinja portal
merit schema spins enthusiastically
Additional Information:
more basic block analyze bug below
I checked indirect branch at the bugged il and found it has indirect branch setted,so i guess there is some internal bug when bianryninja tried to connect basic blocks in guided mode,using add_guided_source_blocks didn't help at all
I see some of the bugged "jump IL" is related to the deobfcation workflow,is the bug related to my workflow?
import json
from dataclasses import dataclass,field
from binaryninja import(
Workflow,
AnalysisContext,
Activity,
MediumLevelILFunction,
MediumLevelILConstPtr,
MediumLevelILConstData,
MediumLevelILConst,
MediumLevelILAdd,
MediumLevelILSub,
MediumLevelILXor,
MediumLevelILJump,
RegisterValueType,
MediumLevelILJumpTo,
MediumLevelILFunction,
BinaryView,
)
mlil_const=MediumLevelILConstPtr | MediumLevelILConstData | MediumLevelILConst
mlil_calc=MediumLevelILAdd | MediumLevelILSub | MediumLevelILXor
from binaryninja import(
MediumLevelILVarSsa,
MediumLevelILVarPhi,
MediumLevelILVar,
MediumLevelILSetVarSsa,
MediumLevelILLoadSsa,
SSAVariable,
ExpressionIndex,
MediumLevelILLabel,
MediumLevelILIf,
Function,
)
class two_direct_indirect_jump_handler:
@dataclass
class resolved_data:
cond:ExpressionIndex
trueAddr:int
falseAddr:int
def __init__(self,ctx:AnalysisContext):
self.ctx=ctx
self.resolved:dict[int,two_direct_indirect_jump_handler.resolved_data]={}
pass
@staticmethod
def _calc_phi(def_site:MediumLevelILVarPhi,mlil_ssa:MediumLevelILFunction,cond):
assert(isinstance(def_site,MediumLevelILVarPhi))
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src[0],mlil_ssa,cond)
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src[1],mlil_ssa,cond)
if lv==None or rv==None:
raise RuntimeError("phi calc fail\n {} {} \n {} {}".format(def_site.src[0].def_site,def_site.src[1].def_site,hex(def_site.address),def_site))
return lv | rv
@staticmethod
def _calc_load(def_site:MediumLevelILLoadSsa,mlil_ssa:MediumLevelILFunction,cond):
assert(isinstance(def_site.src,MediumLevelILLoadSsa))
tokens=list(def_site.src.traverse(lambda x: x))
table=def_site.get_possible_reg_values_after(def_site.dest.var.storage)
# print(table.mapping)
res={}
var_token=None
for token in tokens:
if isinstance(token,MediumLevelILVarSsa):
var_token=token
break
if var_token==None:
raise RuntimeError("load calc fail no var token {} {}".format(hex(def_site.address),def_site))
mp=two_direct_indirect_jump_handler.search_var_value_ssa(var_token.var,mlil_ssa,cond)
if mp==None:
raise RuntimeError("load calc fail no mp {} {}".format(hex(def_site.address),def_site))
try:
res|= {"t":table.mapping[mp["t"]],"f":table.mapping[mp["f"]]}
except KeyError:
try:
res|= {"t":table.mapping[mp["t"]//8],"f":table.mapping[mp["f"]//8]}
except KeyError:
print(mp,var_token.var,table.mapping)
print(hex(def_site.address),def_site)
print("key error",mp,table.mapping)
raise RuntimeError("maybe not two direct indirect jump {}".format(hex(def_site.address)))
if len(res)!=2:
raise RuntimeError("calc load fail len!=2 {} {}".format(hex(def_site.address),def_site))
return res
@staticmethod
def _calc_math(def_site,mlil_ssa:MediumLevelILFunction,cond):
def calc_cbk(lv,rv,op:function):
if isinstance(lv,int) and isinstance(rv,int):
return op(lv,rv)
if isinstance(lv,dict) and isinstance(rv,dict):
t=op(lv["t"],rv["t"])
f=op(lv["f"],rv["f"])
return {"t":t,"f":f}
if isinstance(lv,int) and isinstance(rv,dict):
t=op(lv,rv["t"])
f=op(lv,rv["f"])
return {"t":t,"f":f}
if isinstance(lv,dict) and isinstance(rv,int):
t=op(lv["t"],rv)
f=op(lv["f"],rv)
return {"t":t,"f":f}
if isinstance(def_site.src,MediumLevelILXor):
lv=def_site.src.left
rv=def_site.src.right
if isinstance(lv,mlil_const):
lv=lv.value.value
else:
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
if isinstance(rv,mlil_const):
rv=rv.value.value
else:
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
return calc_cbk(lv,rv,lambda x,y: x^y)
elif isinstance(def_site.src,MediumLevelILAdd):
lv=def_site.src.left
rv=def_site.src.right
if isinstance(lv,mlil_const):
lv=lv.value.value
else:
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
if isinstance(rv,mlil_const):
rv=rv.value.value
else:
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
return calc_cbk(lv,rv,lambda x,y: x+y)
elif isinstance(def_site.src,MediumLevelILSub):
lv=def_site.src.left
rv=def_site.src.right
if isinstance(lv,mlil_const):
lv=lv.value.value
else:
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
if isinstance(rv,mlil_const):
rv=rv.value.value
else:
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
return calc_cbk(lv,rv,lambda x,y: x-y)
else:
raise RuntimeError("not support math calc {}".format(hex(def_site.address)))
@staticmethod
def search_var_value_ssa(var:SSAVariable,mlil_ssa:MediumLevelILFunction,cond):
def_site=var.def_site
cur_value=def_site.get_possible_reg_values_after(var.var.storage)
# print(def_site,cur_value)
if cur_value.type==RegisterValueType.ConstantValue or cur_value.type==RegisterValueType.ConstantPointerValue or cur_value.type==RegisterValueType.ConstantDataValue:
branches=def_site.branch_dependence
for key in branches.keys():
if_expr:MediumLevelILIf=mlil_ssa[key]
if if_expr.true==def_site.instr_index:
if cond["value"]==None:
cond["value"]=if_expr.non_ssa_form.condition.expr_index
return {"t":cur_value.value}
elif if_expr.false==def_site.instr_index:
if cond["value"]==None:
cond["value"]=if_expr.non_ssa_form.condition.expr_index
return {"f":cur_value.value}
return cur_value.value
elif cur_value.type==RegisterValueType.LookupTableValue:
return two_direct_indirect_jump_handler._calc_load(def_site,mlil_ssa,cond)
elif cur_value.type==RegisterValueType.InSetOfValues:
if isinstance(def_site,MediumLevelILVarPhi):
return two_direct_indirect_jump_handler._calc_phi(def_site,mlil_ssa,cond)
elif isinstance(def_site.src,mlil_calc):
return two_direct_indirect_jump_handler._calc_math(def_site,mlil_ssa,cond)
else:
raise RuntimeError("not support InSetOfValues calc {}".format(hex(def_site.address)))
elif cur_value.type==RegisterValueType.UndeterminedValue:
if isinstance(def_site,MediumLevelILVarPhi):
return two_direct_indirect_jump_handler._calc_phi(def_site,mlil_ssa,cond)
elif isinstance(def_site.src,mlil_calc):
return two_direct_indirect_jump_handler._calc_math(def_site,mlil_ssa,cond)
else:
raise RuntimeError("not support UndeterminedValue calc {}".format(hex(def_site.address)))
def check_manual_update(self,func:Function,addr:int,res:dict):
auto_branch=func.get_indirect_branches_at(addr)
# print(auto_branch,res)
if len(auto_branch)!=2:
return False
if auto_branch[0].dest_addr!=res["t"] and auto_branch[0].dest_addr!=res["f"]:
return True
if auto_branch[1].dest_addr!=res["t"] and auto_branch[1].dest_addr!=res["f"]:
return True
def check_addr_vaild(self,addr:int,bv:BinaryView):
for seg in bv.segments:
if seg.executable and seg.start<=addr<=seg.end and addr%4==0:
return True
return False
def connect_basic_block(self,func:Function):
mlil=func.mlil
for _ in func.unresolved_indirect_branches:
unsolved_addr=_[1]
try:
jump:MediumLevelILJump|MediumLevelILJumpTo=mlil[mlil.get_instruction_start(unsolved_addr)].ssa_form
if not isinstance(jump,MediumLevelILJump) and not isinstance(jump,MediumLevelILJumpTo):
raise RuntimeError("not jump {}".format(hex(jump.address)))
var=jump.dest.ssa_form.var
cond={"value":None}
res=two_direct_indirect_jump_handler.search_var_value_ssa(var,mlil.ssa_form,cond)
# print(res,mlil.get_expr(cond["value"]))
if res==None or cond["value"]==None:
raise RuntimeError("calc fail {}".format(hex(jump.address)))
# print(hex(unsolved_addr),self.check_bug_tag(mlil.source_function,unsolved_addr))
if (isinstance(jump,(MediumLevelILJump)) or self.check_manual_update(func,unsolved_addr,res)):
# print(hex(unsolved_addr),jump,func.mlil.get_expr(cond["value"]),res)
func.set_user_indirect_branches(unsolved_addr, [(func.arch,res["t"]),(func.arch,res["f"])],func.arch)
self.resolved[unsolved_addr]=two_direct_indirect_jump_handler.resolved_data(cond=cond["value"],trueAddr=res["t"],falseAddr=res["f"])
# print(hex(unsolved_addr),hex(res["t"]),hex(res["f"]))
except Exception as e:
print(hex(unsolved_addr),e)
continue
def convert_jump_to_if(self,ctx:AnalysisContext):
new_func=MediumLevelILFunction(ctx.function.arch,low_level_il=ctx.llil)
old_mlil=ctx.function.mlil
new_func.prepare_to_copy_function(old_mlil)
for old_block in old_mlil:
new_func.prepare_to_copy_block(old_block)
for instr_idx in range(old_block.start,old_block.end):
instr=old_mlil[instr_idx]
if isinstance(instr,MediumLevelILJumpTo) and isinstance(instr.dest,MediumLevelILVar) and not instr.get_possible_reg_values(instr.dest.var.storage).type==RegisterValueType.ConstantValue:
# print(old_mlil.source_function.get_tags_at(instr.address,auto=True))
for tag in old_mlil.source_function.get_tags_at(instr.address,auto=True):
old_mlil.source_function.remove_auto_address_tags_of_type(instr.address,tag.type.name)
cond={"value":None}
try:
res=two_direct_indirect_jump_handler.search_var_value_ssa(instr.dest.ssa_form.var,old_mlil.ssa_form,cond)
if res==None or cond["value"]==None or not self.check_addr_vaild(res["t"],ctx.function.view) or not self.check_addr_vaild(res["f"],ctx.function.view):
old_mlil.source_function.add_tag("Bugs","need manual analyze jump",instr.address,auto=True)
print("add bug tag at {}".format(hex(instr.address)))
except Exception as e:
old_mlil.source_function.add_tag("Bugs","need manual analyze jump",instr.address,auto=True)
print("add bug tag at {}".format(hex(instr.address)))
print(e)
if isinstance(instr,MediumLevelILJumpTo) and instr.address in self.resolved:
try:
label_t=MediumLevelILLabel()
label_f=MediumLevelILLabel()
indirect_branches=ctx.function.get_indirect_branches_at(instr.address)
if len(indirect_branches)!=2:
raise RuntimeError("indirect branches len!=2 {} {}".format(hex(instr.address),indirect_branches))
for branch in indirect_branches:
if branch.dest_addr==self.resolved[instr.address].trueAddr:
label_t.operand=instr.targets[branch.dest_addr]
if branch.dest_addr==self.resolved[instr.address].falseAddr:
label_f.operand=instr.targets[branch.dest_addr]
if_expr=new_func.if_expr(old_mlil.get_expr(self.resolved[instr.address].cond).copy_to(new_func),
label_t,label_f,old_mlil[instr_idx].source_location)
new_func.append(if_expr,old_mlil[instr_idx].source_location)
except Exception as e:
print(e)
new_func.append(old_mlil[instr_idx].copy_to(new_func),old_mlil[instr_idx].source_location)
else:
new_func.append(old_mlil[instr_idx].copy_to(new_func),old_mlil[instr_idx].source_location)
new_func.finalize()
new_func.generate_ssa_form()
ctx.mlil=new_func
def re_run_calc_check(self,var:SSAVariable):
cond={"value":None}
try:
res=self.search_var_value_ssa(var,self.ctx.function.mlil.ssa_form,cond)
except Exception as e:
print(e)
return False
if res==None or cond["value"]==None:
return False
return True
def run(self):
from binaryninja import SectionSemantics
if self.ctx.function.view.sections[".data"].semantics!=SectionSemantics.ReadOnlyDataSectionSemantics:
print("!!! please set .data section to read only data section semantics !!!")
return
self.connect_basic_block(self.ctx.function)
self.convert_jump_to_if(self.ctx)
def install_two_direct_indirect_jump_handler(ctx:AnalysisContext):
handler=two_direct_indirect_jump_handler(ctx)
handler.run()
wf=Workflow("").clone("satori.function.deobf_test")
wf.register_activity(Activity(configuration=json.dumps({
"name":"satori.function.handle_two_direct_indirect_jump.activity",
"title":"handle_two_direct_indirect_jump",
"description":"handle_two_direct_indirect_jump",
"eligibility":{
"auto":{
"default":True
}
}
}),action=lambda context: install_two_direct_indirect_jump_handler(context)))
wf.insert("core.function.analyzeReturns",["satori.function.handle_two_direct_indirect_jump.activity"])
wf.register()
i close the guided analyze mode and the bug seem to disappear,i guess something is wrong when using guided analyze mode