binaryninja-api icon indicating copy to clipboard operation
binaryninja-api copied to clipboard

stack variable analyze failure and basic block analyze error

Open SGSGsama opened this issue 2 months ago • 4 comments

Version and Platform (required):

  • Binary Ninja Version: 5.2.8722, c75356aa
  • OS: windows
  • OS Version: 10
  • CPU Architecture: x86_64

Bug Description: stack variable in some function become sp+offset rather than variables

Image

basic blocks analyze wrong and spilt function to multi parts , in the form of jump(address)

Image Image

I tried to undefine function or make it to code but didn't help

Steps To Reproduce:

  1. run py script in the zip to use custom deobfcation workflow
  2. open the db
  3. in sub_423700 and sub_427fd4 you can see some bug behavior

Expected Behavior:

Screenshots:

Binary: BianryNinja portal

merit schema spins enthusiastically

Additional Information:

SGSGsama avatar Dec 10 '25 13:12 SGSGsama

more basic block analyze bug below

Image

SGSGsama avatar Dec 10 '25 13:12 SGSGsama

I checked indirect branch at the bugged il and found it has indirect branch setted,so i guess there is some internal bug when bianryninja tried to connect basic blocks in guided mode,using add_guided_source_blocks didn't help at all

Image Image

SGSGsama avatar Dec 11 '25 11:12 SGSGsama

I see some of the bugged "jump IL" is related to the deobfcation workflow,is the bug related to my workflow?

import json
from dataclasses import dataclass,field
from binaryninja import(
    Workflow,
    AnalysisContext,
    Activity,
    MediumLevelILFunction,
    MediumLevelILConstPtr,
    MediumLevelILConstData,
    MediumLevelILConst,
    MediumLevelILAdd,
    MediumLevelILSub,
    MediumLevelILXor,
    MediumLevelILJump,
    RegisterValueType,
    MediumLevelILJumpTo,
    MediumLevelILFunction,
    BinaryView,

)
mlil_const=MediumLevelILConstPtr | MediumLevelILConstData | MediumLevelILConst
mlil_calc=MediumLevelILAdd | MediumLevelILSub | MediumLevelILXor
from binaryninja import(
    MediumLevelILVarSsa,
    MediumLevelILVarPhi,
    MediumLevelILVar,
    MediumLevelILSetVarSsa,
    MediumLevelILLoadSsa,
    SSAVariable,
    ExpressionIndex,
    MediumLevelILLabel,
    MediumLevelILIf,
    Function,
)
class two_direct_indirect_jump_handler:
    @dataclass
    class resolved_data:
        cond:ExpressionIndex
        trueAddr:int
        falseAddr:int
    def __init__(self,ctx:AnalysisContext):
        self.ctx=ctx
        self.resolved:dict[int,two_direct_indirect_jump_handler.resolved_data]={}
        pass

    @staticmethod
    def _calc_phi(def_site:MediumLevelILVarPhi,mlil_ssa:MediumLevelILFunction,cond):
        assert(isinstance(def_site,MediumLevelILVarPhi))
        lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src[0],mlil_ssa,cond)
        rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src[1],mlil_ssa,cond)
        if lv==None or rv==None:
            raise RuntimeError("phi calc fail\n {} {} \n {} {}".format(def_site.src[0].def_site,def_site.src[1].def_site,hex(def_site.address),def_site))
        return lv | rv 
    @staticmethod
    def _calc_load(def_site:MediumLevelILLoadSsa,mlil_ssa:MediumLevelILFunction,cond):
        assert(isinstance(def_site.src,MediumLevelILLoadSsa))
        tokens=list(def_site.src.traverse(lambda x: x))
        table=def_site.get_possible_reg_values_after(def_site.dest.var.storage)
        # print(table.mapping)
        res={}
        var_token=None
        for token in tokens:
            if isinstance(token,MediumLevelILVarSsa):
                var_token=token
                break
        if var_token==None:
            raise RuntimeError("load calc fail no var token {} {}".format(hex(def_site.address),def_site))
        mp=two_direct_indirect_jump_handler.search_var_value_ssa(var_token.var,mlil_ssa,cond)
        if mp==None:
            raise RuntimeError("load calc fail no mp {} {}".format(hex(def_site.address),def_site))
        
        try:
            res|= {"t":table.mapping[mp["t"]],"f":table.mapping[mp["f"]]}
        except KeyError:
            try:
                res|= {"t":table.mapping[mp["t"]//8],"f":table.mapping[mp["f"]//8]}
            except KeyError:
                print(mp,var_token.var,table.mapping)
                print(hex(def_site.address),def_site)
                print("key error",mp,table.mapping)
                raise RuntimeError("maybe not two direct indirect jump {}".format(hex(def_site.address)))
        if len(res)!=2:
            raise RuntimeError("calc load fail len!=2 {} {}".format(hex(def_site.address),def_site))
        return res
    @staticmethod
    def _calc_math(def_site,mlil_ssa:MediumLevelILFunction,cond):
        def calc_cbk(lv,rv,op:function):
            if isinstance(lv,int) and isinstance(rv,int):
                return op(lv,rv)
            if isinstance(lv,dict) and isinstance(rv,dict):
                t=op(lv["t"],rv["t"])
                f=op(lv["f"],rv["f"])
                return {"t":t,"f":f}
            if isinstance(lv,int) and isinstance(rv,dict):
                t=op(lv,rv["t"])
                f=op(lv,rv["f"])
                return {"t":t,"f":f}
            if isinstance(lv,dict) and isinstance(rv,int):
                t=op(lv["t"],rv)
                f=op(lv["f"],rv)
                return {"t":t,"f":f}
        if isinstance(def_site.src,MediumLevelILXor):
            lv=def_site.src.left
            rv=def_site.src.right
            if isinstance(lv,mlil_const):
                lv=lv.value.value
            else:
                lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
            if isinstance(rv,mlil_const):
                rv=rv.value.value
            else:
                rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
            return calc_cbk(lv,rv,lambda x,y: x^y)
        elif isinstance(def_site.src,MediumLevelILAdd):
            lv=def_site.src.left
            rv=def_site.src.right
            if isinstance(lv,mlil_const):
                lv=lv.value.value
            else:
                lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
            if isinstance(rv,mlil_const):
                rv=rv.value.value
            else:
                rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
            return calc_cbk(lv,rv,lambda x,y: x+y)
        elif isinstance(def_site.src,MediumLevelILSub):
            lv=def_site.src.left
            rv=def_site.src.right
            if isinstance(lv,mlil_const):
                lv=lv.value.value
            else:
                lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
            if isinstance(rv,mlil_const):
                rv=rv.value.value
            else:
                rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
            return calc_cbk(lv,rv,lambda x,y: x-y)
        else:
            raise RuntimeError("not support math calc {}".format(hex(def_site.address)))
    @staticmethod
    def search_var_value_ssa(var:SSAVariable,mlil_ssa:MediumLevelILFunction,cond):
        def_site=var.def_site
        cur_value=def_site.get_possible_reg_values_after(var.var.storage)
        # print(def_site,cur_value)
        if cur_value.type==RegisterValueType.ConstantValue or cur_value.type==RegisterValueType.ConstantPointerValue or cur_value.type==RegisterValueType.ConstantDataValue:
            branches=def_site.branch_dependence
            for key in branches.keys():
                if_expr:MediumLevelILIf=mlil_ssa[key]
                if if_expr.true==def_site.instr_index:
                    if cond["value"]==None:
                        cond["value"]=if_expr.non_ssa_form.condition.expr_index
                    return {"t":cur_value.value}
                elif if_expr.false==def_site.instr_index:
                    if cond["value"]==None:
                        cond["value"]=if_expr.non_ssa_form.condition.expr_index
                    return {"f":cur_value.value}
            return cur_value.value
    
        elif cur_value.type==RegisterValueType.LookupTableValue:
            return two_direct_indirect_jump_handler._calc_load(def_site,mlil_ssa,cond)
        elif cur_value.type==RegisterValueType.InSetOfValues:
            if isinstance(def_site,MediumLevelILVarPhi):
                return two_direct_indirect_jump_handler._calc_phi(def_site,mlil_ssa,cond)
            elif isinstance(def_site.src,mlil_calc):
                return two_direct_indirect_jump_handler._calc_math(def_site,mlil_ssa,cond)
            else:
                raise RuntimeError("not support InSetOfValues calc {}".format(hex(def_site.address)))
        elif cur_value.type==RegisterValueType.UndeterminedValue:
            if isinstance(def_site,MediumLevelILVarPhi):
                return two_direct_indirect_jump_handler._calc_phi(def_site,mlil_ssa,cond)
            elif isinstance(def_site.src,mlil_calc):
                return two_direct_indirect_jump_handler._calc_math(def_site,mlil_ssa,cond)
            else:
                raise RuntimeError("not support UndeterminedValue calc {}".format(hex(def_site.address)))

    def check_manual_update(self,func:Function,addr:int,res:dict):
        auto_branch=func.get_indirect_branches_at(addr)
        # print(auto_branch,res)
        if len(auto_branch)!=2:
            return False
        if auto_branch[0].dest_addr!=res["t"] and auto_branch[0].dest_addr!=res["f"]:
            return True
        if auto_branch[1].dest_addr!=res["t"] and auto_branch[1].dest_addr!=res["f"]:
            return True
    def check_addr_vaild(self,addr:int,bv:BinaryView):
        for seg in bv.segments:
            if seg.executable and seg.start<=addr<=seg.end and addr%4==0:
                return True
        return False
    def connect_basic_block(self,func:Function):
        mlil=func.mlil

        for _ in func.unresolved_indirect_branches:
            unsolved_addr=_[1]
            try:
                jump:MediumLevelILJump|MediumLevelILJumpTo=mlil[mlil.get_instruction_start(unsolved_addr)].ssa_form
                if not isinstance(jump,MediumLevelILJump) and not isinstance(jump,MediumLevelILJumpTo):
                    raise RuntimeError("not jump {}".format(hex(jump.address)))
                
                var=jump.dest.ssa_form.var
                cond={"value":None}
                res=two_direct_indirect_jump_handler.search_var_value_ssa(var,mlil.ssa_form,cond)
                # print(res,mlil.get_expr(cond["value"]))
                if res==None or cond["value"]==None:
                    raise RuntimeError("calc fail {}".format(hex(jump.address)))
                
                # print(hex(unsolved_addr),self.check_bug_tag(mlil.source_function,unsolved_addr))
                if  (isinstance(jump,(MediumLevelILJump)) or self.check_manual_update(func,unsolved_addr,res)):
                    # print(hex(unsolved_addr),jump,func.mlil.get_expr(cond["value"]),res)
                    func.set_user_indirect_branches(unsolved_addr, [(func.arch,res["t"]),(func.arch,res["f"])],func.arch)
                self.resolved[unsolved_addr]=two_direct_indirect_jump_handler.resolved_data(cond=cond["value"],trueAddr=res["t"],falseAddr=res["f"])
                # print(hex(unsolved_addr),hex(res["t"]),hex(res["f"]))
            except Exception as e:
                print(hex(unsolved_addr),e)
                continue
        
    
    def convert_jump_to_if(self,ctx:AnalysisContext):
        new_func=MediumLevelILFunction(ctx.function.arch,low_level_il=ctx.llil)
        old_mlil=ctx.function.mlil
        new_func.prepare_to_copy_function(old_mlil)
        for old_block in old_mlil:
            new_func.prepare_to_copy_block(old_block)
            for instr_idx in range(old_block.start,old_block.end):
                instr=old_mlil[instr_idx]
                if isinstance(instr,MediumLevelILJumpTo) and isinstance(instr.dest,MediumLevelILVar) and not instr.get_possible_reg_values(instr.dest.var.storage).type==RegisterValueType.ConstantValue:
                    # print(old_mlil.source_function.get_tags_at(instr.address,auto=True))
                    for tag in old_mlil.source_function.get_tags_at(instr.address,auto=True):
                        old_mlil.source_function.remove_auto_address_tags_of_type(instr.address,tag.type.name)
                    cond={"value":None}
                    try:
                        res=two_direct_indirect_jump_handler.search_var_value_ssa(instr.dest.ssa_form.var,old_mlil.ssa_form,cond)
                        if res==None or cond["value"]==None or not self.check_addr_vaild(res["t"],ctx.function.view) or not self.check_addr_vaild(res["f"],ctx.function.view):
                            old_mlil.source_function.add_tag("Bugs","need manual analyze jump",instr.address,auto=True)
                            print("add bug tag at {}".format(hex(instr.address)))
                    except Exception as e:
                        old_mlil.source_function.add_tag("Bugs","need manual analyze jump",instr.address,auto=True)
                        print("add bug tag at {}".format(hex(instr.address)))
                        print(e)
                if isinstance(instr,MediumLevelILJumpTo) and instr.address in self.resolved:
                    try:
                        label_t=MediumLevelILLabel()
                        label_f=MediumLevelILLabel()
                        indirect_branches=ctx.function.get_indirect_branches_at(instr.address)
                        if len(indirect_branches)!=2:
                            raise RuntimeError("indirect branches len!=2 {} {}".format(hex(instr.address),indirect_branches))
                        for branch in indirect_branches:
                            if branch.dest_addr==self.resolved[instr.address].trueAddr:
                                label_t.operand=instr.targets[branch.dest_addr]
                            if branch.dest_addr==self.resolved[instr.address].falseAddr:
                                label_f.operand=instr.targets[branch.dest_addr]
                        if_expr=new_func.if_expr(old_mlil.get_expr(self.resolved[instr.address].cond).copy_to(new_func),
                                                 label_t,label_f,old_mlil[instr_idx].source_location)
                        new_func.append(if_expr,old_mlil[instr_idx].source_location)
                    except Exception as e:
                        print(e)
                        new_func.append(old_mlil[instr_idx].copy_to(new_func),old_mlil[instr_idx].source_location)
                    
                else:
                    new_func.append(old_mlil[instr_idx].copy_to(new_func),old_mlil[instr_idx].source_location)
        new_func.finalize()
        new_func.generate_ssa_form()
        ctx.mlil=new_func

    def re_run_calc_check(self,var:SSAVariable):
        cond={"value":None}
        try:
            res=self.search_var_value_ssa(var,self.ctx.function.mlil.ssa_form,cond)
        except Exception as e:
            print(e)
            return False
        if res==None or cond["value"]==None:
            return False
        return True
    def run(self):
        from binaryninja import SectionSemantics
        if self.ctx.function.view.sections[".data"].semantics!=SectionSemantics.ReadOnlyDataSectionSemantics:
            print("!!! please set .data section to read only data section semantics !!!")
            return
        self.connect_basic_block(self.ctx.function)
        self.convert_jump_to_if(self.ctx)
        
                

def install_two_direct_indirect_jump_handler(ctx:AnalysisContext):
    handler=two_direct_indirect_jump_handler(ctx)
    
    handler.run()


wf=Workflow("").clone("satori.function.deobf_test")
wf.register_activity(Activity(configuration=json.dumps({
    "name":"satori.function.handle_two_direct_indirect_jump.activity",
    "title":"handle_two_direct_indirect_jump",
    "description":"handle_two_direct_indirect_jump",
    "eligibility":{
        "auto":{
            "default":True
        }
    }

}),action=lambda context: install_two_direct_indirect_jump_handler(context)))
wf.insert("core.function.analyzeReturns",["satori.function.handle_two_direct_indirect_jump.activity"])

wf.register()

SGSGsama avatar Dec 11 '25 11:12 SGSGsama

i close the guided analyze mode and the bug seem to disappear,i guess something is wrong when using guided analyze mode

SGSGsama avatar Dec 11 '25 11:12 SGSGsama