初窥ARM平坦化还原

WriteUp 3个月前 admin
53 0 0



前言


上周在看DASCTF的題發現難得有一道安卓( 題目名:RealeazyRealeazy ),興致勃勃地打開IDA卻發現了這可悲的控制流平坦化,當場直接自闭。


之後分析了下發現這ollvm應該算是比較簡單的那類( 只有間接跳轉 + 最普通的平坦化,貌似沒有虛假分支/虛假塊 ),於是決定好好地學習下怎麼還原。


一開始是想按「使用unidbg还原标准ollvm的fla控制流程平坦化」(https://missking.cc/2021/05/14/ollvm3/)一樣使用Unidbg來還原,後面發現分支的情況用Unidbg不太好處理。


最後還是決定用Unicorn的模擬執行來還原,具體思路&實現完全參考「[原创]ARM64 OLLVM反混淆」(https://bbs.kanxue.com/thread-252321.htm)。

初窥ARM平坦化还原





還原思路


利用Unicorn來模擬執行,從而獲取程序的執行流程,主要有以下步驟:


1.識別&保存函數所有的真實塊,有兩種識別思路,要麼通過真實塊的特徵,要麼通過非真實塊的特徵,看哪種特徵比較明顯,對本例來說真實塊有個明顯的特徵就是mov pc, r0這樣的間接跳轉。


2.模擬執行並保存執行路徑,遇到分支時就手動修改寄存器的值來遍歷( 本例沒有虛假分支,不用考慮太多,直接2條分支都執行就可以 )。


模擬執行過程中遇到
blblx這樣的函數調用時可以直接跳過( 通過修改PC來實現 ),因為我們關注的只有執行流程,同理遇到一些Unicorn無法解析的指令時也是直接跳過就可以( 只要不是真實塊的頭/尾指令就可以 )。


3.根據上述得出的執行流來patch。





具體實現


collect_blocks


首先是收集各種塊的邏輯,通過capstone來遍歷so文件,offsetend分別是某函數的起始、結束地址,block_list用來保存所有塊( 以塊的起始地址作為鍵,而值是包含當前塊各種信息的block_item)。


如何判斷塊的結尾?通過觀察so文件可以知道,一個塊要麼以mov pc, r0結束,要麼以b XXX結束,而前者更是真實塊的特徵。


初窥ARM平坦化还原

初窥ARM平坦化还原


除了真實塊外,預處理塊也是以mov pc,r0結尾,顯然需要將其排除在外,不能讓其保存在real_blocks中。這裡我使用了IDA Python來提前找出所有預處理器的起始地址,實現思路是遍歷找出那些入度為n且出度為1的塊( 很簡單但對本例很有效 )。


初窥ARM平坦化还原


◆IDA Python script for finding preprocessor

import ida_xrefimport idcimport ida_segment def get_all_cref_to(addr):    all_xref = []    ref = ida_xref.get_first_cref_to(addr)    while ref != 0xffffffff:        all_xref.append(ref)        ref = ida_xref.get_next_cref_to(addr, ref)    return all_xref def get_all_cref_from(addr):    all_xref = []    ref = ida_xref.get_first_cref_from(addr)    while ref != 0xffffffff:        all_xref.append(ref)        ref = ida_xref.get_next_cref_from(addr, ref)    return all_xref def is_preproc(addr):    all_xref_to = get_all_cref_to(addr)    all_xref_from = get_all_cref_from(addr)    # 多個入度(具體取值要看情況) && 一個出度    return len(all_xref_to) > 5 and len(all_xref_from) == 1 preproc_addr_list = [] def get_preprocessor(seg):    global preproc_addr_list    print("====================================")    addr = seg.start_ea    end_addr = seg.end_ea     while addr < end_addr:        if is_preproc(addr):            preproc_addr_list.append(addr)        addr = idc.next_head(addr) def main():    get_preprocessor(ida_segment.get_segm_by_name('.mytext'))    get_preprocessor(ida_segment.get_segm_by_name('.text'))    print(preproc_addr_list) main()


具體實現代碼如下:


def collect_blocks(offset, end, ret_addr):
global block_list # 保存所有塊
global real_blocks # 保存真實塊
global ret_blocks # 保存ret 塊
global md

block_list = {}
real_blocks = []
ret_blocks = []

md = Cs(CS_ARCH_ARM, CS_MODE_THUMB)
md.detail = True

preproc_flag = False

ins_str = ""
is_new = True
for dasm in md.disasm(sodata[offset:end], offset):

# 這裡是數據, 不用保存為block
if dasm.address >= 0x612E and dasm.address < 0x617C:
continue

ins_str += f"{hex(dasm.address)}:t{dasm.mnemonic}t{dasm.op_str}n"

# 在塊的起如地址保存對應信息
if is_new:
is_new = False
block_item = {}
block_item["start_addr"] = dasm.address
block_item["capstone"] = dasm

# 判斷當前地址是否預處理器的起始地址
if is_preproc(dasm.address):
preproc_flag = True

if is_block_end(dasm):
is_new = True
block_item["end_addr"] = dasm.address
block_item["ins_str"] = ins_str
ins_str = ""
block_list[block_item["start_addr"]] = block_item

# 初步獲取real_blocks, 這個特徵僅針對本例
if dasm.mnemonic == "mov" and dasm.op_str == "pc, r0":
if preproc_flag:
preproc_flag = False
else:
real_blocks.append(block_item["start_addr"])

# 手動添加arm的ret block (從IDA裡找出度為0的那塊)
ret_blocks.append(ret_addr)

def is_block_end(dasm):
if dasm.mnemonic == "mov" and dasm.op_str == "pc, r0":
return True

if dasm.mnemonic == "b":
return True

return False
def is_preproc(addr):
# 由IDA Python得出
preprocessor_addrs = [27514, 3200, 5784, 10490, 15456, 17002, 19836, 20578, 22440]

return addr in preprocessor_addrs


init_unicorn


初始化Unicorn的模擬執行環境,這裡我們不需要考慮傳參之類的東西。hook_code是指令hook的回調函數,也是最關鍵的邏輯所在,後面會重點介紹。hook_mem_access是內存訪問異常時的回調,對本例的用處不大。


這裡設置.data節是因為程序的間接條跳依賴於其中的數據,若不添加程序將無法執行。


def init_unicorn(filename):
global mu
global sodata

if isinstance(sodata, bytearray):
sodata = bytes(sodata)

mu = Uc(UC_ARCH_ARM, UC_MODE_THUMB)
mu.mem_map(0x80000000, 0x1000 * 8) # 初始化stack
mu.mem_map(0, 4 * 1024 * 1024)
mu.mem_write(0, sodata)
mu.reg_write(UC_ARM_REG_SP, 0x80000000 + 0x1000 * 4)
mu.hook_add(UC_HOOK_CODE, hook_code)
mu.hook_add(UC_HOOK_MEM_UNMAPPED, hook_mem_access)

# 設置.data節
data_section = get_section(filename, '.data')
DATA_MEM_OFFSET = data_section.header["sh_addr"]
DATA_FILE_OFFSET = data_section.header["sh_offset"]
DATA_SIZE = data_section.header["sh_size"]
mu.mem_write(DATA_MEM_OFFSET, sodata[DATA_FILE_OFFSET:DATA_FILE_OFFSET+DATA_SIZE])


start_emu


正式開始模擬執行,會模擬執行若干次,每次會從某真實塊出發,直到找到另一個真實塊/結束塊為至。


queue來保存模擬執行的順序以及對應的上下文,保存上下文是因為分支要走兩條路,當走完一條分支時通過恢復上下文來達到一種回溯的效果,從而繼續走第二條分支。通過這種方法就能遍歷完所有可能的真實塊,但對於有虛假分支的情況,這樣做可能會導致死循環,但反過來想也是一種檢測虛假分支的思路。


itt是本例的真實塊裡的分支指令,具體處理的邏輯在hook_code中,這裡提前判斷的目的是為了手動控制分支的走向。


def start_emu(offset):
global is_success
global flow

if offset in real_blocks:
real_blocks.remove(offset)

queue = [(offset, None)]

# 保存執行流, 最終patch就是靠它
flow = {}

is_success = False
while len(queue) != 0:
env = queue.pop()
pc = env[0]
set_context(env[1])

item = block_list[pc]

# 代表對應的路徑已被記錄, 無需重複
if pc in flow:
continue

flow[pc] = []

# 分支, 例如 0x62D6, 0x5FA2
if item["ins_str"].find("itt") != -1:
ctx = get_context()

p1 = find_path(pc, 0)
if p1 != None:
queue.append((p1, get_context()))
flow[pc].append(p1)
set_context(ctx)
p2 = find_path(pc, 1)

if p1 != p2:
queue.append((p2, get_context()))
flow[pc].append(p2)

else:
p = find_path(pc)
if p != None:
queue.append((p, get_context()))

flow[pc].append(p)


find_path


find_path的返回值是某真實塊的起始地址( 通過block_list能獲取該塊的所有信息 )。


在報錯時直接跳過導致報錯的指令( 通常是由於Unicorn不兼容某些指令所導致的報錯 ),可以直接跳過的原因是我們只關注會影響執行流的指令,其他無需理會。


要特別注意的是,本例是Thumb Mode,因此模擬執行的地址必須|1,否則會導致一連串奇怪的事情( 一開始沒留意被坑了很久…… )。


def find_path(start_addr, branch = None):
global real_blocks
global mu
global g_start_addr
global branch_control
global list_trace
global dst_addr
global is_success

branch_control = branch
g_start_addr = start_addr
list_trace = {}
dst_addr = 0
is_success = False

try:
mu.emu_start(start_addr | 1, start_addr + 0x10000)
print("============= emu end =============")

except UcError as e:
pc = mu.reg_read(UC_ARM_REG_PC)
if pc != 0:
#mu.reg_write(UC_ARM64_REG_PC, pc + 4)
# Thumb指令, 長度可能是2/4, 因此要動態獲取
_size = get_ins_size(pc)
return find_path(pc + _size, branch)
else:
print("ERROR: %s pc:%x" % (e,pc))

if is_success:
return dst_addr

return None


hook_code


最關鍵的邏輯,主要分成以下幾部分:


1.判斷當前地址是否屬於real_blocksret_blocks,是則將該地址保存到dst_addr然後停止本次模擬執行。


2.判斷當前指令是否函數調用,是則修改PC的值以此跳過該函數調用,記得要|1,這點特別坑!


3.判斷branch_control是否有值,是則代表需要處理分支的情況,當branch_control0時執行False分支( 根據條件修改對應寄存器 ),以下圖為例,False分支是不執行紅框部分的那條分支,這點在最後的patch時會用到。


初窥ARM平坦化还原

def hook_code(uc, address, size, user_data):
global is_success
global mu
global branch_control
global g_start_addr
global list_trace
global dst_addr
global end
global md

if is_success or address > end:
mu.emu_stop()
return

ban_ins = ["bl", "blx"]
if address in [0x5ED6]:
# for debug
print("debug addr: ", hex(address))

for ins in md.disasm(sodata[address:address+size], address):
print(">>> Tracing instruction at 0x%x, instruction size = 0x%x" % (address, size))
print(">>> 0x%x:t%st%st%d" % (ins.address, ins.mnemonic, ins.op_str, ins.size))

print_regs()

if address in real_blocks:
if address in list_trace:
print("have fake block?")
uc.emu_stop()
else:
list_trace[address] = 1


if address != g_start_addr:
is_success = True
dst_addr = address
print(f"find: {hex(address)}")
uc.emu_stop()
return

if address in ret_blocks:
print(f"end_block: {hex(address)} ")
mu.emu_stop()
return

flag_pass = False

for b in ban_ins:
if ins.mnemonic.find(b) != -1:
flag_pass = True
break

# 內存操作 (對本例用處不大)
if ins.op_str.find("[") != -1:

# R7是棧底寄存器, 通常用來取局部變量
if ins.op_str.find("[r7") != -1 and ins.op_str.find("0xf4") != -1:
print()

if ins.op_str.find("[sp") != -1:
flag_pass = True
for op in ins.operands:
if op.type == ARM_OP_MEM:
reg_name = ins.reg_name(op.value.base)
addr = mu.reg_read(reg_ctou(reg_name))

if addr >= 0x80000000 and addr < 0x80000000 + 0x10000 * 8:
flag_pass = False

if flag_pass:
print(f"[pass] addr: {hex(ins.address)} size: {ins.size}")
# 關鍵點: 要 |1
uc.reg_write(UC_ARM_REG_PC, (ins.address + ins.size) | 1)
# uc.reg_write(UC_ARM_REG_PC, address + size)
return


if branch_control == None:
return

# ollvm 分支
# branch_control == 1代表條件成立, 如 cmp r1,r2、beq XXX -> r1 == r2 (目的是方便後續的patch)
next_dasm = get_dasm(ins.address + ins.size, 4)
if next_dasm.mnemonic == "itt" and next_dasm.op_str == "ne":
if ins.mnemonic == "cmp":
ops = ins.op_str.split(", ")
reg = reg_ctou(ops[0])
if ops[1].startswith("#"):
cmp_num = int(ops[1][1:], 16)
else:
assert ops[1].startswith("r")
cmp_num = mu.reg_read(reg_ctou(ops[1]))

if branch_control == 0:
mu.reg_write(reg, cmp_num)
else:
mu.reg_write(reg, cmp_num + 1)

elif ins.mnemonic == "tst": # tst r2, r3 ---> r2 & r3 , 改變z標誌
regs = [reg_ctou(x) for x in ins.op_str.split(", ")]
if branch_control == 0:
mu.reg_write(regs[0], 0)
mu.reg_write(regs[1], 0)
else:
mu.reg_write(regs[0], 1)
mu.reg_write(regs[1], 1)

# 例: cmp r2, r3, cmp是r2 - r3, 當結果<0時, blt、bmi都會執行
elif next_dasm.mnemonic == "itt" and next_dasm.op_str == "lt" or
next_dasm.mnemonic == "itt" and next_dasm.op_str == "mi" or
next_dasm.mnemonic == "itt" and next_dasm.op_str == "lo":
if ins.mnemonic == "cmp":
ops = ins.op_str.split(", ")
reg = reg_ctou(ops[0])
if ops[1].startswith("#"):
cmp_num = int(ops[1][1:], 16)
else:
assert ops[1].startswith("r")
cmp_num = mu.reg_read(reg_ctou(ops[1]))

if branch_control == 0:
mu.reg_write(reg, cmp_num + 1)
else:
mu.reg_write(reg, cmp_num - 1)
# gt 是大於
elif next_dasm.mnemonic == "itt" and next_dasm.op_str == "gt":
if ins.mnemonic == "cmp":
ops = ins.op_str.split(", ")
reg = reg_ctou(ops[0])
if ops[1].startswith("#"):
cmp_num = int(ops[1][1:], 16)
else:
assert ops[1].startswith("r")
cmp_num = mu.reg_read(reg_ctou(ops[1]))

if branch_control == 0:
mu.reg_write(reg, cmp_num - 1)
else:
mu.reg_write(reg, cmp_num + 1)
# eq 是等於
elif next_dasm.mnemonic == "itt" and next_dasm.op_str == "eq":
if ins.mnemonic == "cmp":
ops = ins.op_str.split(", ")
reg = reg_ctou(ops[0])
if ops[1].startswith("#"):
cmp_num = int(ops[1][1:], 16)
else:
assert ops[1].startswith("r")
cmp_num = mu.reg_read(reg_ctou(ops[1]))

if branch_control == 0:
mu.reg_write(reg, cmp_num - 1)
else:
mu.reg_write(reg, cmp_num)

elif next_dasm.mnemonic == "itt":
raise Exception("ollvm branch new type")



start_patch


start_emu結束後的flow如下圖所示,很容易可以看到其中的規律,利用BFS來遍歷簡直再合適不過。


初窥ARM平坦化还原


對於無分支的真實塊,可以直接通過b指令來跳轉,也可像我這樣修改r0來跳轉,因為本例本身就是將r0賦給pc來實現間接跳轉的,同時需要將一些字節patch為nop以防被干擾。


def start_patch(flow, start_addr):
global block_list
global sodata

sodata = bytearray(sodata)

visited = {}
queue = [start_addr]
while len(queue) != 0:
current_addr = queue.pop()
if current_addr in visited or current_addr == None:
continue

visited[current_addr] = 1

next_blocks = flow[current_addr]

rb_end_addr = block_list[current_addr]["end_addr"]
rb_start_addr = block_list[current_addr]["start_addr"]

# 1. 無分支的情況
if len(next_blocks) == 1:
queue.append(next_blocks[0])
# 對本例, patch地址固定為真實塊最後指令地址 - 10
patch_addr = rb_end_addr - 10

next_start_addr = queue[-1]
patch_nop(patch_addr, 10)

if next_start_addr != None:
asm_bytes = ks_asm(KS_ARCH_ARM, KS_MODE_THUMB, f"mov r0, #{hex(next_start_addr)}")
patch_bytes(patch_addr, asm_bytes)
print(f"{hex(patch_addr)} ---> {hex(next_start_addr)}")
else:

ret_block_addr = ret_blocks[0]
asm_bytes = ks_asm(KS_ARCH_ARM, KS_MODE_THUMB, f"mov r0, #{hex(ret_block_addr)}")
patch_bytes(patch_addr, asm_bytes)
print(f"{hex(patch_addr)} ---> ret block: {hex(ret_block_addr)}")

# 2. 有分支的情況
else:
queue.append(next_blocks[0])
queue.append(next_blocks[1])
b0 = next_blocks[0] # branch_control為0的分支, False分支
b1 = next_blocks[1] # branch_control為1的分支, True分支

# patch_addr = rb_end_addr - 28
patch_addr = get_branch_pa(rb_start_addr) # find "itt" addr
patch_branch(patch_addr, b0, b1)
print(f"{hex(patch_addr)} --->nt1. {hex(b0)}nt2. {hex(b1)}")


分支是像cmp r0,0; itt ne這樣的組合,只需將itt nepatch成bne addr1; b addr2就可以。傳入的addr就是itt ne的地址。


對於條件跳轉如beqbne等,其跳轉的地址 = 目標地址 - 當前地址 - 4;而無條件跳轉直接跳到對應的絕對地址就可以。


條件跳轉後面跟的地址是b1_addr,這是True分支的真實塊的地址,這樣的處理是為了對應上述hook_code對分支的處理,兩者是有聯系的,不能隨便來。


def patch_branch(addr, b0, b1):
dasm = get_dasm(addr, 4)
assert dasm.mnemonic == "itt"

print(f"{hex(addr)}:t{dasm.mnemonic}t{dasm.op_str}")

patch_nop(addr, 28 + 2)

b1_addr = b1 - addr - 4
b0_addr = b0

branch_true_bytes = ks_asm(KS_ARCH_ARM, KS_MODE_THUMB, f"b{dasm.op_str} #{hex(b1_addr)}", addr)
branch_false_bytes = ks_asm(KS_ARCH_ARM, KS_MODE_THUMB, f"b {hex(b0_addr)}", addr + len(branch_true_bytes))

patch_bytes(addr, branch_true_bytes)
patch_bytes(addr + len(branch_true_bytes), branch_false_bytes)





完整腳本


from capstone import *
from capstone.arm import *
from unicorn import *
from unicorn.arm_const import *
from keystone import *

from elftools.elf.elffile import ELFFile

def get_section(filename, sectionName):
file = open(filename, 'rb')

elf_file = ELFFile(file)
for section in elf_file.iter_sections():
if section.name == sectionName:
return section

# capstone寄存器 -> unicorn寄存器
def reg_ctou(reg_name):#
if reg_name == "sp":
return UC_ARM_REG_SP

if reg_name == "pc":
return UC_ARM_REG_PC

reg_idx = int(reg_name[1:])
if reg_idx >= 0 and reg_idx <= 12:
return UC_ARM_REG_R0 + reg_idx

raise Exception("reg_ctou: have new type?")

def is_block_end(dasm):
if dasm.mnemonic == "mov" and dasm.op_str == "pc, r0":
return True

if dasm.mnemonic == "b":
return True

return False

def hook_mem_access(uc, type, address,size,value,userdata):
pc = uc.reg_read(UC_ARM_REG_PC)
print('pc:%x type:%d addr:%x size:%x' % (pc, type, address, size))
#uc.emu_stop()
return True

def is_preproc(addr):
# 由IDA Python得出
preprocessor_addrs = [27514, 3200, 5784, 10490, 15456, 17002, 19836, 20578, 22440]

return addr in preprocessor_addrs

def get_context():
global mu

return mu.context_save()

def set_context(context):
global mu
if context == None:
return

mu.context_restore(context)

def print_regs():
global mu
msg = ""
for i in range(8):
msg += f"r{i}:{hex(mu.reg_read(UC_ARM_REG_R0 + i))}t"
msg += f"sp:{hex(mu.reg_read(UC_ARM_REG_SP))}tpc:{hex(mu.reg_read(UC_ARM_REG_PC))}"
print(msg)

def get_ins_size(addr):
for ins in md.disasm(sodata[addr:addr+4], addr):
return ins.size

def get_dasm(addr, size):
global md
global sodata

for dasm in md.disasm(sodata[addr:addr+size], addr):
return dasm

def hook_code(uc, address, size, user_data):
global is_success
global mu
global branch_control
global g_start_addr
global list_trace
global dst_addr
global end
global md

if is_success or address > end:
mu.emu_stop()
return

ban_ins = ["bl", "blx"]
if address in [0x5ED6]:
# for debug
print("debug addr: ", hex(address))

for ins in md.disasm(sodata[address:address+size], address):
print(">>> Tracing instruction at 0x%x, instruction size = 0x%x" % (address, size))
print(">>> 0x%x:t%st%st%d" % (ins.address, ins.mnemonic, ins.op_str, ins.size))

print_regs()

if address in real_blocks:
if address in list_trace:
print("have fake block?")
uc.emu_stop()
else:
list_trace[address] = 1


if address != g_start_addr:
is_success = True
dst_addr = address
print(f"find: {hex(address)}")
uc.emu_stop()
return

if address in ret_blocks:
print(f"end_block: {hex(address)} ")
mu.emu_stop()
return

flag_pass = False

for b in ban_ins:
if ins.mnemonic.find(b) != -1:
flag_pass = True
break

# 內存操作 (對本例用處不大)
if ins.op_str.find("[") != -1:

# R7是棧底寄存器, 通常用來取局部變量
if ins.op_str.find("[r7") != -1 and ins.op_str.find("0xf4") != -1:
print()

if ins.op_str.find("[sp") != -1:
flag_pass = True
for op in ins.operands:
if op.type == ARM_OP_MEM:
reg_name = ins.reg_name(op.value.base)
addr = mu.reg_read(reg_ctou(reg_name))

if addr >= 0x80000000 and addr < 0x80000000 + 0x10000 * 8:
flag_pass = False

if flag_pass:
print(f"[pass] addr: {hex(ins.address)} size: {ins.size}")
# 關鍵點: 要 |1
uc.reg_write(UC_ARM_REG_PC, (ins.address + ins.size) | 1)
# uc.reg_write(UC_ARM_REG_PC, address + size)
return


if branch_control == None:
return

# ollvm 分支
# branch_control == 1代表條件成立, 如 cmp r1,r2、beq XXX -> r1 == r2 (目的是方便後續的patch)
next_dasm = get_dasm(ins.address + ins.size, 4)
if next_dasm.mnemonic == "itt" and next_dasm.op_str == "ne":
if ins.mnemonic == "cmp":
ops = ins.op_str.split(", ")
reg = reg_ctou(ops[0])
if ops[1].startswith("#"):
cmp_num = int(ops[1][1:], 16)
else:
assert ops[1].startswith("r")
cmp_num = mu.reg_read(reg_ctou(ops[1]))

if branch_control == 0:
mu.reg_write(reg, cmp_num)
else:
mu.reg_write(reg, cmp_num + 1)

elif ins.mnemonic == "tst": # tst r2, r3 ---> r2 & r3 , 改變z標誌
regs = [reg_ctou(x) for x in ins.op_str.split(", ")]
if branch_control == 0:
mu.reg_write(regs[0], 0)
mu.reg_write(regs[1], 0)
else:
mu.reg_write(regs[0], 1)
mu.reg_write(regs[1], 1)

# 例: cmp r2, r3, cmp是r2 - r3, 當結果<0時, blt、bmi都會執行
elif next_dasm.mnemonic == "itt" and next_dasm.op_str == "lt" or
next_dasm.mnemonic == "itt" and next_dasm.op_str == "mi" or
next_dasm.mnemonic == "itt" and next_dasm.op_str == "lo":
if ins.mnemonic == "cmp":
ops = ins.op_str.split(", ")
reg = reg_ctou(ops[0])
if ops[1].startswith("#"):
cmp_num = int(ops[1][1:], 16)
else:
assert ops[1].startswith("r")
cmp_num = mu.reg_read(reg_ctou(ops[1]))

if branch_control == 0:
mu.reg_write(reg, cmp_num + 1)
else:
mu.reg_write(reg, cmp_num - 1)
# gt 是大於
elif next_dasm.mnemonic == "itt" and next_dasm.op_str == "gt":
if ins.mnemonic == "cmp":
ops = ins.op_str.split(", ")
reg = reg_ctou(ops[0])
if ops[1].startswith("#"):
cmp_num = int(ops[1][1:], 16)
else:
assert ops[1].startswith("r")
cmp_num = mu.reg_read(reg_ctou(ops[1]))

if branch_control == 0:
mu.reg_write(reg, cmp_num - 1)
else:
mu.reg_write(reg, cmp_num + 1)
# eq 是等於
elif next_dasm.mnemonic == "itt" and next_dasm.op_str == "eq":
if ins.mnemonic == "cmp":
ops = ins.op_str.split(", ")
reg = reg_ctou(ops[0])
if ops[1].startswith("#"):
cmp_num = int(ops[1][1:], 16)
else:
assert ops[1].startswith("r")
cmp_num = mu.reg_read(reg_ctou(ops[1]))

if branch_control == 0:
mu.reg_write(reg, cmp_num - 1)
else:
mu.reg_write(reg, cmp_num)

elif next_dasm.mnemonic == "itt":
raise Exception("ollvm branch new type")


def find_path(start_addr, branch = None):
global real_blocks
global mu
global g_start_addr
global branch_control
global list_trace
global dst_addr
global is_success

branch_control = branch
g_start_addr = start_addr
list_trace = {}
dst_addr = 0
is_success = False

try:
mu.emu_start(start_addr | 1, start_addr + 0x10000)
print("============= emu end =============")

except UcError as e:
pc = mu.reg_read(UC_ARM_REG_PC)
if pc != 0:
#mu.reg_write(UC_ARM64_REG_PC, pc + 4)
# Thumb指令, 長度可能是2/4, 因此要動態獲取
_size = get_ins_size(pc)
return find_path(pc + _size, branch)
else:
print("ERROR: %s pc:%x" % (e,pc))

if is_success:
return dst_addr

return None

def patch_nop(addr, size):
global sodata
nop_bytes = bytearray(b'x00xbf')

for i in range(size):
sodata[addr + i] = nop_bytes[i % 2]

def patch_bytes(addr, bytes: bytearray):
for i in range(len(bytes)):
sodata[addr + i] = bytes[i]

def ks_asm(arch, mode, code, addr = 0):
ks = Ks(arch, mode)

encoding, count = ks.asm(code, addr)
return bytearray(encoding)

def patch_branch(addr, b0, b1):
dasm = get_dasm(addr, 4)
assert dasm.mnemonic == "itt"

print(f"{hex(addr)}:t{dasm.mnemonic}t{dasm.op_str}")

patch_nop(addr, 28 + 2)

b1_addr = b1 - addr - 4
b0_addr = b0

branch_true_bytes = ks_asm(KS_ARCH_ARM, KS_MODE_THUMB, f"b{dasm.op_str} #{hex(b1_addr)}", addr)
branch_false_bytes = ks_asm(KS_ARCH_ARM, KS_MODE_THUMB, f"b {hex(b0_addr)}", addr + len(branch_true_bytes))

patch_bytes(addr, branch_true_bytes)
patch_bytes(addr + len(branch_true_bytes), branch_false_bytes)

# 獲取分支的patch address (從addr向下查找, 直到itt指令)
def get_branch_pa(addr):
global md
global sodata

for dasm in md.disasm(sodata[addr:end], addr):
# print(f"{hex(addr)}:t{dasm.mnemonic}t{dasm.op_str}")
if dasm.mnemonic == "itt":
return dasm.address


raise Exception("cant find itt????")

def start_patch(flow, start_addr):
global block_list
global sodata

sodata = bytearray(sodata)

visited = {}
queue = [start_addr]
while len(queue) != 0:
current_addr = queue.pop()
if current_addr in visited or current_addr == None:
continue

visited[current_addr] = 1

next_blocks = flow[current_addr]

rb_end_addr = block_list[current_addr]["end_addr"]
rb_start_addr = block_list[current_addr]["start_addr"]

# 1. 無分支的情況
if len(next_blocks) == 1:
queue.append(next_blocks[0])
# 對本例, patch地址固定為真實塊最後指令地址 - 10
patch_addr = rb_end_addr - 10

next_start_addr = queue[-1]
patch_nop(patch_addr, 10)

if next_start_addr != None:
asm_bytes = ks_asm(KS_ARCH_ARM, KS_MODE_THUMB, f"mov r0, #{hex(next_start_addr)}")
patch_bytes(patch_addr, asm_bytes)
print(f"{hex(patch_addr)} ---> {hex(next_start_addr)}")
else:

ret_block_addr = ret_blocks[0]
asm_bytes = ks_asm(KS_ARCH_ARM, KS_MODE_THUMB, f"mov r0, #{hex(ret_block_addr)}")
patch_bytes(patch_addr, asm_bytes)
print(f"{hex(patch_addr)} ---> ret block: {hex(ret_block_addr)}")
# 2. 有分支的情況
else:
queue.append(next_blocks[0])
queue.append(next_blocks[1])
b0 = next_blocks[0] # branch_control為0的分支, False分支
b1 = next_blocks[1] # branch_control為1的分支, True分支

# patch_addr = rb_end_addr - 28
patch_addr = get_branch_pa(rb_start_addr)
patch_branch(patch_addr, b0, b1)
print(f"{hex(patch_addr)} --->nt1. {hex(b0)}nt2. {hex(b1)}")

def load_file(filename):
global sodata
with open(filename, mode = "rb") as f:
sodata = f.read()

def save_file(filename):
with open(filename, mode="wb") as f:
f.write(sodata)

def collect_blocks(offset, end, ret_addr):
global block_list # 保存所有塊
global real_blocks # 保存真實塊
global ret_blocks # 保存ret 塊
global md

block_list = {}
real_blocks = []
ret_blocks = []

md = Cs(CS_ARCH_ARM, CS_MODE_THUMB)
md.detail = True

preproc_flag = False

ins_str = ""
is_new = True
for dasm in md.disasm(sodata[offset:end], offset):

# 這裡是數據, 不用保存為block
if dasm.address >= 0x612E and dasm.address < 0x617C:
continue

ins_str += f"{hex(dasm.address)}:t{dasm.mnemonic}t{dasm.op_str}n"

# 在塊的起如地址保存對應信息
if is_new:
is_new = False
block_item = {}
block_item["start_addr"] = dasm.address
block_item["capstone"] = dasm

# 判斷當前地址是否預處理器的起始地址
if is_preproc(dasm.address):
preproc_flag = True

if is_block_end(dasm):
is_new = True
block_item["end_addr"] = dasm.address
block_item["ins_str"] = ins_str
ins_str = ""
block_list[block_item["start_addr"]] = block_item

# 初步獲取real_blocks, 這個特徵僅針對本例
if dasm.mnemonic == "mov" and dasm.op_str == "pc, r0":
if preproc_flag:
preproc_flag = False
else:
real_blocks.append(block_item["start_addr"])

# 手動添加arm的ret block (從IDA裡找出度為0的那塊)
ret_blocks.append(ret_addr)

def init_unicorn(filename):
global mu
global sodata

if isinstance(sodata, bytearray):
sodata = bytes(sodata)

mu = Uc(UC_ARCH_ARM, UC_MODE_THUMB)
mu.mem_map(0x80000000, 0x1000 * 8) # 初始化stack
mu.mem_map(0, 4 * 1024 * 1024)
mu.mem_write(0, sodata)
mu.reg_write(UC_ARM_REG_SP, 0x80000000 + 0x1000 * 4)
mu.hook_add(UC_HOOK_CODE, hook_code)
mu.hook_add(UC_HOOK_MEM_UNMAPPED, hook_mem_access)

# 設置.data節
data_section = get_section(filename, '.data')
DATA_MEM_OFFSET = data_section.header["sh_addr"]
DATA_FILE_OFFSET = data_section.header["sh_offset"]
DATA_SIZE = data_section.header["sh_size"]
mu.mem_write(DATA_MEM_OFFSET, sodata[DATA_FILE_OFFSET:DATA_FILE_OFFSET+DATA_SIZE])

def start_emu(offset):
global is_success
global flow

if offset in real_blocks:
real_blocks.remove(offset)

queue = [(offset, None)]

# 保存執行流, 最終patch就是靠它
flow = {}

is_success = False
while len(queue) != 0:
env = queue.pop()
pc = env[0]
set_context(env[1])

item = block_list[pc]

# 代表對應的路徑已被記錄, 無需重複
if pc in flow:
continue

flow[pc] = []

# 分支, 例如 0x62D6, 0x5FA2
if item["ins_str"].find("itt") != -1:
ctx = get_context()

p1 = find_path(pc, 0)
if p1 != None:
queue.append((p1, get_context()))
flow[pc].append(p1)
set_context(ctx)
p2 = find_path(pc, 1)

if p1 != p2:
queue.append((p2, get_context()))
flow[pc].append(p2)

else:
p = find_path(pc)
if p != None:
queue.append((p, get_context()))

flow[pc].append(p)

def run(offset, end_addr, ret_addr):
global end
global flow

end = end_addr

collect_blocks(offset = offset, end = end, ret_addr = ret_addr)

init_unicorn("libBlackMamBa.so")

start_emu(offset)

start_patch(flow, offset)

if __name__ == "__main__":

offset_list = [0x584c, 0xC90, 0x1700, 0x2968, 0x50A4, 0x42DC] # 0x3CD0
end_list = [0x6b90, 0x16AE, 0x2910, 0x3C76, 0x57BE, 0x4D92] # 0x4280
ret_list = [0x6B58, 0x1666, 0x28E0, 0x3C3C, 0x578E, 0x4D54] # 0x4250
load_file("libBlackMamBa.so")

for i in range(len(offset_list)):
run(offset_list[i], end_list[i], ret_list[i])

save_file("patch3.so")


最後附上一張還原的效果圖,以及上述如果有誤的話還望指出!


初窥ARM平坦化还原



參考

◆https://bbs.kanxue.com/thread-252321.htm

◆https://www.jianshu.com/p/0355a67b8762




初窥ARM平坦化还原


看雪ID:ngiokweng

https://bbs.kanxue.com/user-home-946537.htm

*本文为看雪论坛优秀文章,由 ngiokweng 原创,转载请注明来自看雪社区

初窥ARM平坦化还原



# 往期推荐

1、CTF php .so pwn 题型分析

2、病毒分析:低危病毒请当心-两次文件释放&加载.net远控

3、免杀动态对抗之syscall[源码分析]

4、Windows主机入侵检测与防御内核技术深入解析

5、基于 Frida 对单字节加密验证程序侧信道爆破


初窥ARM平坦化还原


初窥ARM平坦化还原

球分享

初窥ARM平坦化还原

球点赞

初窥ARM平坦化还原

球在看



初窥ARM平坦化还原

点击阅读原文查看更多

原文始发于微信公众号(看雪学苑):初窥ARM平坦化还原

版权声明:admin 发表于 2024年6月19日 下午6:00。
转载请注明:初窥ARM平坦化还原 | CTF导航

相关文章