ComprehendAI插件开发

起因是在实习过程中遇到了很多apt攻击场景,很多我都是从来没遇见过的,所以在最开始为了理解一个攻击链,我要花大量的时间去查阅资料,丰富攻击场景,还得需要确认某个场景和样本实际功能吻合。于是我想到了利用AI解读代码并分析出大概的攻击手段,在github上了一些开源项目,感觉都不是挺合适的,大多都是就是将当前函数丢给GPT,然后分析、注释等。这对于我分析一个样本而言,仅仅一个函数的数据难以让AI识别出一个攻击场景,加上之前也有魔改一些IDA插件的经验,于是有此文。

目前功能实现

  1. 自定义prompt
  2. 自动递归当前函数,收集所有XrefsFrom,结合apt分析需要定义prompt
  3. 提供函数递归深度设置接口,考虑某些函数分析的难度不是很高,减少tokens消耗

ComprehendAIPlugin类->框架搭建

开发IDA的插件有一个固定框架,主要是通过继承idaapi.plugin_t插件类来实现,需要实现的几个成员函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MyPlugin(idaapi.plugin_t):
flags = idaapi.PLUGIN_UNL
comment = "这是一个简单的 IDA 插件示例"
help = "使用这个插件来完成一些操作"
wanted_name = "MyPlugin"
wanted_hotkey = "Alt-F1"

def init(self): #插件初始化函数
pass

def run(self, arg): #插件运行函数,可以用于接收参数实现不同的插件功能
pass

def term(self): #插件终止函数,可处理一些hook的清理
pass

def PLUGIN_ENTRY():
return MyPlugin()

框架搭建好了,还需要提供UI界面支持,我们需要的功能目前有三个,函数分析,设置函数递归深度,自定义提问等,所以我需要在右键函数的时候弹出菜单中有我的菜单项,注册菜单项。注册菜单项需要对UI接口进行hook,hook的方式是实现一个继承自UI_Hooks的类,并在init类里注册该hook,于是有MenuHook类:

1
2
3
4
5
6
7
8
9
10
ACTION_DEFINITIONS = [
("AI_analysis:Analysis", "Non-blocking analysis", "执行非阻塞型AI分析"),
("AI_analysis:SetDepth", "Set analysis depth", "设置分析深度"),
("AI_analysis:CustomQuery", "Ask AI", "自定义提问")
]
class MenuHook(UI_Hooks):
def finish_populating_widget_popup(self, form, popup):
if idaapi.get_widget_type(form) in (idaapi.BWN_DISASM, idaapi.BWN_PSEUDOCODE):
for action_id, _, _ in ComprehendAIPlugin.ACTION_DEFINITIONS:
idaapi.attach_action_to_popup(form, popup, action_id, "ComprehendAI/", idaapi.SETMENU_APP)

有了菜单项,我需要完善处理函数,类似QT的信号和槽机制,注册了action,需要有对应的槽函数,在IDApython提供的类中,action_handler_t是负责处理action handler的类,继承并实现activate函数即可,于是有

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 def _register_actions(self):
for action_id, label, tooltip in self.ACTION_DEFINITIONS:
action_desc = idaapi.action_desc_t(
action_id,
label,
self.MenuCommandHandler(action_id,self.handler),
None,
tooltip,
0
)
idaapi.register_action(action_desc)

def _unregister_actions(self):
for action_id, _, _ in self.ACTION_DEFINITIONS:
idaapi.unregister_action(action_id)

class MenuCommandHandler(action_handler_t):
def __init__(self, action_id,handler:AnalysisHandler):
super().__init__()
self.action_id = action_id
self.handler = handler
def activate(self, ctx):

if self.action_id == "AI_analysis:Analysis":
self.handler.create_ai_task(TaskType.ANALYSIS)

elif self.action_id == "AI_analysis:CustomQuery":
question = idaapi.ask_text(0, "", "输入问题")
if question:
self.handler.create_ai_task(TaskType.CUSTOM_QUERY,question)

elif self.action_id == "AI_analysis:SetDepth":
new_depth = idaapi.ask_long(2, "设置分析深度 (默认2):")
if new_depth is not None:
self.handler.set_analysis_depth(new_depth)

return 1

OK,现在只需要在init函数时hook菜单创建,注册action即可,于是有:

def init(self):
    self.ui_hook = self.MenuHook()
    self.ui_hook.hook()
    
    self.handler = AnalysisHandler()
    self._register_actions()
    
    print("ComprehendAI initialized")
    return idaapi.PLUGIN_KEEP

代码汇总实现ComprehendAIPlugin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#处理插件框架
class ComprehendAIPlugin(idaapi.plugin_t):
flags = idaapi.PLUGIN_HIDE
comment = "AI-based Reverse Analysis Plugin"
help = "Perform AI-based analysis on binary code"
wanted_name = "ComprehendAI"
wanted_hotkey = "Ctrl+Shift+A"

ACTION_DEFINITIONS = [
("AI_analysis:Analysis", "Non-blocking analysis", "执行非阻塞型AI分析"),
("AI_analysis:SetDepth", "Set analysis depth", "设置分析深度"),
("AI_analysis:CustomQuery", "Ask AI", "自定义提问")
]

def init(self):
self.ui_hook = self.MenuHook()
self.ui_hook.hook()

self.handler = AnalysisHandler()
self._register_actions()

print("ComprehendAI initialized")
return idaapi.PLUGIN_KEEP

def run(self, arg):
pass

def term(self):
self.ui_hook.unhook()
self._unregister_actions()
print("ComprehendAI unloaded")

def _register_actions(self):
for action_id, label, tooltip in self.ACTION_DEFINITIONS:
action_desc = idaapi.action_desc_t(
action_id,
label,
self.MenuCommandHandler(action_id,self.handler),
None,
tooltip,
0
)
idaapi.register_action(action_desc)

def _unregister_actions(self):
for action_id, _, _ in self.ACTION_DEFINITIONS:
idaapi.unregister_action(action_id)

class MenuHook(UI_Hooks):
def finish_populating_widget_popup(self, form, popup):
if idaapi.get_widget_type(form) in (idaapi.BWN_DISASM, idaapi.BWN_PSEUDOCODE):
for action_id, _, _ in ComprehendAIPlugin.ACTION_DEFINITIONS:
idaapi.attach_action_to_popup(form, popup, action_id, "ComprehendAI/", idaapi.SETMENU_APP)

class MenuCommandHandler(action_handler_t):
def __init__(self, action_id,handler:AnalysisHandler):
super().__init__()
self.action_id = action_id
self.handler = handler
def activate(self, ctx):

if self.action_id == "AI_analysis:Analysis":
self.handler.create_ai_task(TaskType.ANALYSIS)

elif self.action_id == "AI_analysis:CustomQuery":
question = idaapi.ask_text(0, "", "输入问题")
if question:
self.handler.create_ai_task(TaskType.CUSTOM_QUERY,question)

elif self.action_id == "AI_analysis:SetDepth":
new_depth = idaapi.ask_long(2, "设置分析深度 (默认2):")
if new_depth is not None:
self.handler.set_analysis_depth(new_depth)

return 1

def update(self, ctx):
return idaapi.AST_ENABLE_ALWAYS

ConfigManager->配置文件处理

创建ConfigManager类以处理不同的模型、API-KEY、API来源等,需要创建一个配置文件,目前的config_sample.json如下:

1
2
3
4
5
6
7
{
"openai": {
"model": "qwq-32b",
"api_key": "sk-",
"base_url": "https://"
}
}

ConfigManager代码逻辑就不多赘述,就是读取配置文件,调用openai接口,连接模型并返回模型句柄。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#处理配置文件
class ConfigManager:
_instance = None
_lock = Lock()

def __new__(cls):
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance._initialize()
return cls._instance

def _initialize(self):
self.script_dir = os.path.dirname(os.path.abspath(__file__))
self.config_path = os.path.join(self.script_dir, 'config.json')
self.config = self._load_config()
self.openai_client = self._create_openai_client()

def _load_config(self):
try:
with open(self.config_path, "r") as f:
return json.load(f)
except Exception as e:
raise RuntimeError(f"Failed to load config: {str(e)}")

def _create_openai_client(self):
return OpenAI(
api_key=self.config["openai"]["api_key"],
base_url=self.config["openai"]["base_url"]
)

@property
def model_name(self):
return self.config["openai"]["model"]

@property
def client(self):
return self.openai_client

DisassemblyProcessor->反汇编数据提取

连接了模型,需要准备给他准备输入数据,这就需要提取反汇编。用一个递归来是实现:先获取当前函数的起始地址和终止地址,丢进处理函数中,处理函数将该起始地址和终止地址间的反汇编存储起来,并对该起始地址进行交叉引用找到所有子函数,将子函数依次遍历,将子函数的起始地址传入处理函数,从而实现递归提取数据。

代码如下,get_current_function_disasm递归当前函数获取所有反汇编(包含子函数),_process_function递归函数,_get_calleesXrefFrom获取子函数起始地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#处理反汇编代码提取
class DisassemblyProcessor:
def __init__(self, max_depth=2):
self.max_depth = max_depth
self._lock = Lock()
self._reset_state()

def _reset_state(self):
with self._lock:
self.processed_funcs = set()
self.func_disasm_list = []

def get_current_function_disasm(self):
self._reset_state()

current_ea = idc.get_screen_ea()
func_start = idc.get_func_attr(current_ea, idc.FUNCATTR_START)

if func_start == idaapi.BADADDR:
raise ValueError("Failed to locate function start address")

self._process_function(func_start, self.max_depth)
return "\n".join(self.func_disasm_list)

def _process_function(self, func_ea, depth):
if func_ea in self.processed_funcs or depth < 0:
return

with self._lock:
self.processed_funcs.add(func_ea)

try:
decompiled = str(idaapi.decompile(func_ea))
with self._lock:
self.func_disasm_list.append(decompiled)
except Exception as e:
print(f"Decompilation failed for {hex(func_ea)}: {str(e)}")

for callee in self._get_callees(func_ea):
self._process_function(callee, depth - 1)

def _get_callees(self, func_ea):
callees = set()
for ea in range(func_ea, idc.get_func_attr(func_ea, idc.FUNCATTR_END)):
for xref in idautils.XrefsFrom(ea):
if xref.type in [ida_xref.fl_CN, ida_xref.fl_CF]:
callee_ea = xref.to
if idc.get_func_attr(callee_ea, idc.FUNCATTR_START) == callee_ea:
callees.add(callee_ea)
return callees

AIService->openai接口封装

为了让AI调用更加简单一些,我希望只需要传递一个参数Prompt,于是我写了一个openai接口的封装。其中ask_ai函数接收一个参数prompt,然后使用互斥锁模拟等待过程;_request_openai函数负责直接对接openai接口,实现流式输出(该功能由xgDebug实现)。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#封装openai接口
class AIService:
def __init__(self):
self.config = ConfigManager()

def ask_ai(self, prompt, ai_isRunning:Lock):
messages = [{"role": "user", "content": prompt}]
print("ComprehendAI output:")
result = self._request_openai(messages)
if result:
ai_isRunning.release()
print("\r✅ 分析完成!")
print(result)
else:
print("\r❌ 分析失败,请重试")

def _request_openai(self,messages):
reasoning_content = ""
answer_content = ""
is_answering = False
try:
completion = self.config.client.chat.completions.create(
model=self.config.model_name,
messages=messages,
stream=True,
)
for chunk in completion:
# 如果chunk.choices为空,则打印usage
if not chunk.choices:
print("\nUsage:")
print(chunk.usage)
else:
delta = chunk.choices[0].delta
# 打印思考过程
if hasattr(delta, 'reasoning_content') and delta.reasoning_content != None:
print(delta.reasoning_content, end='', flush=True)
reasoning_content += delta.reasoning_content
else:
# 开始回复
if delta.content != "" and is_answering is False:
print("\n" + "=" * 20 + "完整回复" + "=" * 20 + "\n")
is_answering = True
# 打印回复过程
# print(delta.content, end='', flush=True)
answer_content += delta.content
return answer_content

except Exception as e:
print(f"Error occurred: {e}")
traceback.print_exc()
return None

def _process_chunk(self, chunk):
if not chunk.choices:
return None
delta = chunk.choices[0].delta
return getattr(delta, 'content', '') or ""

AnalysisHandler->分析任务提交

AnalysisHandler负责用户接口处理,设置默认prompt,并创建线程,执行一个分析任务,使用互斥锁防止同时多个任务。_create_analysis_prompt返回默认prompt;create_ai_task作为async_task之上的封装,负责分配不同任务请求(自定义提问还是函数分析);async_task创建线程执行请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#处理用户接口
class AnalysisHandler:

def __init__(self):
self.disassembler = DisassemblyProcessor()
self.ai_service = AIService()
self.ai_isRunning = Lock()

def set_analysis_depth(self, depth):
self.disassembler.max_depth = depth

def _create_analysis_prompt(self, disassembly):
return f"""
你是一名人工智能逆向工程专家。
我会提供你一些反汇编代码,其中首个函数是你需要分析并总结成报告的函数,
其余函数是该函数调用的一些子函数。
分析要求:
重点描述主函数功能,并对核心行为进行推测;
简要描述子函数功能

输出要求:
主函数功能:...
行为推测:...
子函数功能:...
纯文本输出。

下面是你要分析的反汇编代码:
{disassembly}
"""

def create_ai_task(self,taskType,question=None):
if taskType == TaskType.ANALYSIS:
disassembly = self.disassembler.get_current_function_disasm()
promt = self._create_analysis_prompt(disassembly)
self.async_task(promt)

elif taskType == TaskType.CUSTOM_QUERY:
self.async_task(question)

def async_task(self,question):
print(question)
promt = ',请用纯文本格式输出'
if self.ai_isRunning.acquire(blocking=False):
question += promt
task = Thread(target=self.ai_service.ask_ai,args=(question,self.ai_isRunning,))
task.start()

else:
print("\r❌ 当前AI正在处理任务,请稍后尝试")
#response = self.ai_service.analyze_code(disassembly, callback)
#print(f"\nAnalysis completed: {response[:100]}...")

github地址:https://github.com/Kvancy/ComprehendAI

结语:

image-20250330161948176