起因是在实习过程中遇到了很多apt攻击场景,很多我都是从来没遇见过的,所以在最开始为了理解一个攻击链,我要花大量的时间去查阅资料,丰富攻击场景,还得需要确认某个场景和样本实际功能吻合。于是我想到了利用AI解读代码并分析出大概的攻击手段,在github上了一些开源项目,感觉都不是挺合适的,大多都是就是将当前函数丢给GPT,然后分析、注释等。这对于我分析一个样本而言,仅仅一个函数的数据难以让AI识别出一个攻击场景,加上之前也有魔改一些IDA插件的经验,于是有此文。
目前功能实现
自定义 prompt
自动递归当前函数,收集所有XrefsFrom,结合apt分析 需要定义prompt
提供函数递归深度设置接口,考虑某些函数分析的难度不是很高,减少tokens消耗 。
ComprehendAIPlugin类->框架搭建 开发IDA的插件有一个固定框架,主要是通过继承idaapi.plugin_t
插件类来实现,需要实现的几个成员函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class MyPlugin (idaapi.plugin_t): flags = idaapi.PLUGIN_UNL comment = "这是一个简单的 IDA 插件示例" help = "使用这个插件来完成一些操作" wanted_name = "MyPlugin" wanted_hotkey = "Alt-F1" def init (self ): pass def run (self, arg ): pass def term (self ): pass def PLUGIN_ENTRY (): return MyPlugin()
框架搭建好了,还需要提供UI界面支持,我们需要的功能目前有三个,函数分析,设置函数递归深度,自定义提问等,所以我需要在右键函数的时候弹出菜单中有我的菜单项,注册菜单项。注册菜单项需要对UI接口进行hook,hook的方式是实现一个继承自UI_Hooks的类,并在init类里注册该hook,于是有MenuHook类:
1 2 3 4 5 6 7 8 9 10 ACTION_DEFINITIONS = [ ("AI_analysis:Analysis" , "Non-blocking analysis" , "执行非阻塞型AI分析" ), ("AI_analysis:SetDepth" , "Set analysis depth" , "设置分析深度" ), ("AI_analysis:CustomQuery" , "Ask AI" , "自定义提问" ) ] class MenuHook (UI_Hooks ): def finish_populating_widget_popup (self, form, popup ): if idaapi.get_widget_type(form) in (idaapi.BWN_DISASM, idaapi.BWN_PSEUDOCODE): for action_id, _, _ in ComprehendAIPlugin.ACTION_DEFINITIONS: idaapi.attach_action_to_popup(form, popup, action_id, "ComprehendAI/" , idaapi.SETMENU_APP)
有了菜单项,我需要完善处理函数,类似QT的信号和槽机制,注册了action,需要有对应的槽函数,在IDApython提供的类中,action_handler_t是负责处理action handler的类,继承并实现activate函数即可,于是有
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 def _register_actions (self ): for action_id, label, tooltip in self .ACTION_DEFINITIONS: action_desc = idaapi.action_desc_t( action_id, label, self .MenuCommandHandler(action_id,self .handler), None , tooltip, 0 ) idaapi.register_action(action_desc) def _unregister_actions (self ): for action_id, _, _ in self .ACTION_DEFINITIONS: idaapi.unregister_action(action_id) class MenuCommandHandler (action_handler_t ): def __init__ (self, action_id,handler:AnalysisHandler ): super ().__init__() self .action_id = action_id self .handler = handler def activate (self, ctx ): if self .action_id == "AI_analysis:Analysis" : self .handler.create_ai_task(TaskType.ANALYSIS) elif self .action_id == "AI_analysis:CustomQuery" : question = idaapi.ask_text(0 , "" , "输入问题" ) if question: self .handler.create_ai_task(TaskType.CUSTOM_QUERY,question) elif self .action_id == "AI_analysis:SetDepth" : new_depth = idaapi.ask_long(2 , "设置分析深度 (默认2):" ) if new_depth is not None : self .handler.set_analysis_depth(new_depth) return 1
OK,现在只需要在init
函数时hook菜单创建,注册action即可,于是有:
def init(self):
self.ui_hook = self.MenuHook()
self.ui_hook.hook()
self.handler = AnalysisHandler()
self._register_actions()
print("ComprehendAI initialized")
return idaapi.PLUGIN_KEEP
代码汇总实现ComprehendAIPlugin
类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 class ComprehendAIPlugin (idaapi.plugin_t): flags = idaapi.PLUGIN_HIDE comment = "AI-based Reverse Analysis Plugin" help = "Perform AI-based analysis on binary code" wanted_name = "ComprehendAI" wanted_hotkey = "Ctrl+Shift+A" ACTION_DEFINITIONS = [ ("AI_analysis:Analysis" , "Non-blocking analysis" , "执行非阻塞型AI分析" ), ("AI_analysis:SetDepth" , "Set analysis depth" , "设置分析深度" ), ("AI_analysis:CustomQuery" , "Ask AI" , "自定义提问" ) ] def init (self ): self .ui_hook = self .MenuHook() self .ui_hook.hook() self .handler = AnalysisHandler() self ._register_actions() print ("ComprehendAI initialized" ) return idaapi.PLUGIN_KEEP def run (self, arg ): pass def term (self ): self .ui_hook.unhook() self ._unregister_actions() print ("ComprehendAI unloaded" ) def _register_actions (self ): for action_id, label, tooltip in self .ACTION_DEFINITIONS: action_desc = idaapi.action_desc_t( action_id, label, self .MenuCommandHandler(action_id,self .handler), None , tooltip, 0 ) idaapi.register_action(action_desc) def _unregister_actions (self ): for action_id, _, _ in self .ACTION_DEFINITIONS: idaapi.unregister_action(action_id) class MenuHook (UI_Hooks ): def finish_populating_widget_popup (self, form, popup ): if idaapi.get_widget_type(form) in (idaapi.BWN_DISASM, idaapi.BWN_PSEUDOCODE): for action_id, _, _ in ComprehendAIPlugin.ACTION_DEFINITIONS: idaapi.attach_action_to_popup(form, popup, action_id, "ComprehendAI/" , idaapi.SETMENU_APP) class MenuCommandHandler (action_handler_t ): def __init__ (self, action_id,handler:AnalysisHandler ): super ().__init__() self .action_id = action_id self .handler = handler def activate (self, ctx ): if self .action_id == "AI_analysis:Analysis" : self .handler.create_ai_task(TaskType.ANALYSIS) elif self .action_id == "AI_analysis:CustomQuery" : question = idaapi.ask_text(0 , "" , "输入问题" ) if question: self .handler.create_ai_task(TaskType.CUSTOM_QUERY,question) elif self .action_id == "AI_analysis:SetDepth" : new_depth = idaapi.ask_long(2 , "设置分析深度 (默认2):" ) if new_depth is not None : self .handler.set_analysis_depth(new_depth) return 1 def update (self, ctx ): return idaapi.AST_ENABLE_ALWAYS
ConfigManager->配置文件处理 创建ConfigManager类以处理不同的模型、API-KEY、API来源等,需要创建一个配置文件,目前的config_sample.json如下:
1 2 3 4 5 6 7 { "openai" : { "model" : "qwq-32b" , "api_key" : "sk-" , "base_url" : "https://" } }
ConfigManager代码逻辑就不多赘述,就是读取配置文件,调用openai接口,连接模型并返回模型句柄。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class ConfigManager : _instance = None _lock = Lock() def __new__ (cls ): with cls._lock: if not cls._instance: cls._instance = super ().__new__(cls) cls._instance._initialize() return cls._instance def _initialize (self ): self .script_dir = os.path.dirname(os.path.abspath(__file__)) self .config_path = os.path.join(self .script_dir, 'config.json' ) self .config = self ._load_config() self .openai_client = self ._create_openai_client() def _load_config (self ): try : with open (self .config_path, "r" ) as f: return json.load(f) except Exception as e: raise RuntimeError(f"Failed to load config: {str (e)} " ) def _create_openai_client (self ): return OpenAI( api_key=self .config["openai" ]["api_key" ], base_url=self .config["openai" ]["base_url" ] ) @property def model_name (self ): return self .config["openai" ]["model" ] @property def client (self ): return self .openai_client
DisassemblyProcessor->反汇编数据提取 连接了模型,需要准备给他准备输入数据,这就需要提取反汇编。用一个递归来是实现:先获取当前函数的起始地址和终止地址,丢进处理函数中,处理函数将该起始地址和终止地址间的反汇编存储起来,并对该起始地址进行交叉引用找到所有子函数,将子函数依次遍历,将子函数的起始地址传入处理函数,从而实现递归提取数据。
代码如下,get_current_function_disasm
递归当前函数获取所有反汇编(包含子函数),_process_function
递归函数,_get_callees
XrefFrom获取子函数起始地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 class DisassemblyProcessor : def __init__ (self, max_depth=2 ): self .max_depth = max_depth self ._lock = Lock() self ._reset_state() def _reset_state (self ): with self ._lock: self .processed_funcs = set () self .func_disasm_list = [] def get_current_function_disasm (self ): self ._reset_state() current_ea = idc.get_screen_ea() func_start = idc.get_func_attr(current_ea, idc.FUNCATTR_START) if func_start == idaapi.BADADDR: raise ValueError("Failed to locate function start address" ) self ._process_function(func_start, self .max_depth) return "\n" .join(self .func_disasm_list) def _process_function (self, func_ea, depth ): if func_ea in self .processed_funcs or depth < 0 : return with self ._lock: self .processed_funcs.add(func_ea) try : decompiled = str (idaapi.decompile(func_ea)) with self ._lock: self .func_disasm_list.append(decompiled) except Exception as e: print (f"Decompilation failed for {hex (func_ea)} : {str (e)} " ) for callee in self ._get_callees(func_ea): self ._process_function(callee, depth - 1 ) def _get_callees (self, func_ea ): callees = set () for ea in range (func_ea, idc.get_func_attr(func_ea, idc.FUNCATTR_END)): for xref in idautils.XrefsFrom(ea): if xref.type in [ida_xref.fl_CN, ida_xref.fl_CF]: callee_ea = xref.to if idc.get_func_attr(callee_ea, idc.FUNCATTR_START) == callee_ea: callees.add(callee_ea) return callees
AIService->openai接口封装 为了让AI调用更加简单一些,我希望只需要传递一个参数Prompt,于是我写了一个openai接口的封装。其中ask_ai
函数接收一个参数prompt,然后使用互斥锁模拟等待过程;_request_openai
函数负责直接对接openai接口,实现流式输出(该功能由xgDebug实现)。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 class AIService : def __init__ (self ): self .config = ConfigManager() def ask_ai (self, prompt, ai_isRunning:Lock ): messages = [{"role" : "user" , "content" : prompt}] print ("ComprehendAI output:" ) result = self ._request_openai(messages) if result: ai_isRunning.release() print ("\r✅ 分析完成!" ) print (result) else : print ("\r❌ 分析失败,请重试" ) def _request_openai (self,messages ): reasoning_content = "" answer_content = "" is_answering = False try : completion = self .config.client.chat.completions.create( model=self .config.model_name, messages=messages, stream=True , ) for chunk in completion: if not chunk.choices: print ("\nUsage:" ) print (chunk.usage) else : delta = chunk.choices[0 ].delta if hasattr (delta, 'reasoning_content' ) and delta.reasoning_content != None : print (delta.reasoning_content, end='' , flush=True ) reasoning_content += delta.reasoning_content else : if delta.content != "" and is_answering is False : print ("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n" ) is_answering = True answer_content += delta.content return answer_content except Exception as e: print (f"Error occurred: {e} " ) traceback.print_exc() return None def _process_chunk (self, chunk ): if not chunk.choices: return None delta = chunk.choices[0 ].delta return getattr (delta, 'content' , '' ) or ""
AnalysisHandler->分析任务提交 AnalysisHandler负责用户接口处理,设置默认prompt,并创建线程,执行一个分析任务,使用互斥锁防止同时多个任务。_create_analysis_prompt
返回默认prompt;create_ai_task
作为async_task之上
的封装,负责分配不同任务请求(自定义提问还是函数分析);async_task
创建线程执行请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class AnalysisHandler : def __init__ (self ): self .disassembler = DisassemblyProcessor() self .ai_service = AIService() self .ai_isRunning = Lock() def set_analysis_depth (self, depth ): self .disassembler.max_depth = depth def _create_analysis_prompt (self, disassembly ): return f""" 你是一名人工智能逆向工程专家。 我会提供你一些反汇编代码,其中首个函数是你需要分析并总结成报告的函数, 其余函数是该函数调用的一些子函数。 分析要求: 重点描述主函数功能,并对核心行为进行推测; 简要描述子函数功能 输出要求: 主函数功能:... 行为推测:... 子函数功能:... 纯文本输出。 下面是你要分析的反汇编代码: {disassembly} """ def create_ai_task (self,taskType,question=None ): if taskType == TaskType.ANALYSIS: disassembly = self .disassembler.get_current_function_disasm() promt = self ._create_analysis_prompt(disassembly) self .async_task(promt) elif taskType == TaskType.CUSTOM_QUERY: self .async_task(question) def async_task (self,question ): print (question) promt = ',请用纯文本格式输出' if self .ai_isRunning.acquire(blocking=False ): question += promt task = Thread(target=self .ai_service.ask_ai,args=(question,self .ai_isRunning,)) task.start() else : print ("\r❌ 当前AI正在处理任务,请稍后尝试" )
github地址:https://github.com/Kvancy/ComprehendAI
结语: