Created
May 9, 2021 19:22
-
-
Save tin-z/c8920a66a8791ea8f7d54f4304c65656 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, os | |
import ctypes | |
import math | |
import struct | |
#### Ignore #### | |
EC = '\x1b[0m' | |
BOLD = '\x1b[1m' | |
INIT = {'f':30,'b':40,'hf':90,'hb':100} | |
COLORS = ("BLACK",0),("RED",1),("GREEN",2),("YELLOW",3),("BLUE",4),("CYAN",6) | |
for x,y in COLORS : | |
globals()[x] = {k:"\x1b[{}m".format(v+y) for k,v in INIT.items()} | |
FAIL = lambda x : BOLD + RED['f'] + x + EC | |
WARNING = lambda x : BOLD + YELLOW['b'] + BLACK['f'] + x + EC | |
PTR = {"H":lambda x : BLUE['f'] + x + EC, "S": lambda x : YELLOW['hf'] + x + EC, "L" : lambda x : RED['f'] + x + EC } | |
PTR.update( { "HEAP":PTR["H"], "STACK":PTR["S"], "LIBC":PTR["L"] } ) | |
PTR.update( { "H1":lambda x : BLUE['hf'] + x + EC, "H2":lambda x : CYAN['f'] + x + EC, "H3":lambda x : CYAN['hf'] + x + EC }) | |
PTR.update( { "F1":lambda x : BOLD + RED['f'] + BLACK['b'] + "|{}|".format(x) + EC } ) | |
PTR.update( { "F2":lambda x : BOLD + GREEN['f'] + BLACK['b'] + "|{}|".format(x) + EC } ) | |
PTRACE_PEEKTEXT = 1 | |
PTRACE_PEEKDATA = 2 | |
PTRACE_POKETEXT = 4 | |
PTRACE_POKEDATA = 5 | |
PTRACE_GETREGS = 12 | |
PTRACE_SETREGS = 13 | |
PTRACE_ATTACH = 16 | |
PTRACE_DETACH = 17 | |
DESCR = """ | |
scheappes - A simple tool that prints out the status of the memory heap of a process in execution | |
Some inspiration was taken from here:https://github.com/ancat/gremlin/blob/master/heap_scan.py | |
""" | |
#### UTILS #### | |
def checkZ(conds, msg): | |
""" | |
checkZ(lambda, str) -> None | |
If the lambda function returns True, then print(msg) and sys.exit, otherwise return to caller | |
""" | |
if conds(): | |
print(FAIL(msg)) | |
sys.exit(1) | |
def fmtR(num, data_size): | |
if data_size == 8 : | |
return hex(num).rjust(data_size*2 - 2, " ") | |
return hex(num).rjust(data_size*2 + 2, " ") | |
def fmtL(num, data_size): | |
return hex(num).ljust(data_size, " ") | |
def ent(data, unit='natural'): | |
""" | |
ent(b'', unit='natural') | |
Return entropy value, unit can be 'natural' or 'shannon' value | |
""" | |
base = { 'shannon' :2., 'natural':math.exp(1) } | |
counts = dict() | |
rets = .0 | |
if len(data) <= 1: | |
return rets | |
for x in data: | |
counts.update({x:counts[x]+1}) if x in counts else counts.update({x:0}) | |
distrib = [float(x) / len(data) for x in counts.values()] | |
for p in distrib: | |
if p > .0 : | |
rets -= p * math.log(p, base[unit]) | |
return rets | |
#### Ctypes.Structure #### | |
class iovec(ctypes.Structure): | |
_fields_ = [ | |
("iov_base", ctypes.c_void_p), | |
("iov_len", ctypes.c_ulong) | |
] | |
class user_regs_structX64(ctypes.Structure): | |
""" | |
grep "struct user_regs_struct" /usr/include/sys/user.h | |
""" | |
_fields_ = [ | |
("r15", ctypes.c_ulonglong), | |
("r14", ctypes.c_ulonglong), | |
("r13", ctypes.c_ulonglong), | |
("r12", ctypes.c_ulonglong), | |
("rbp", ctypes.c_ulonglong), | |
("rbx", ctypes.c_ulonglong), | |
("r11", ctypes.c_ulonglong), | |
("r10", ctypes.c_ulonglong), | |
("r9", ctypes.c_ulonglong), | |
("r8", ctypes.c_ulonglong), | |
("rax", ctypes.c_ulonglong), | |
("rcx", ctypes.c_ulonglong), | |
("rdx", ctypes.c_ulonglong), | |
("rsi", ctypes.c_ulonglong), | |
("rdi", ctypes.c_ulonglong), | |
("orig_rax", ctypes.c_ulonglong), | |
("rip", ctypes.c_ulonglong), | |
("cs", ctypes.c_ulonglong), | |
("eflags", ctypes.c_ulonglong), | |
("rsp", ctypes.c_ulonglong), | |
("ss", ctypes.c_ulonglong), | |
("fs_base", ctypes.c_ulonglong), | |
("gs_base", ctypes.c_ulonglong), | |
("ds", ctypes.c_ulonglong), | |
("es", ctypes.c_ulonglong), | |
("fs", ctypes.c_ulonglong), | |
("gs", ctypes.c_ulonglong), | |
] | |
class user_regs_struct(ctypes.Structure): | |
""" | |
grep "struct user_regs_struct" /usr/include/sys/user.h | |
""" | |
_fields_ = [ | |
("ebx", ctypes.c_uint32), | |
("ecx", ctypes.c_uint32), | |
("edx", ctypes.c_uint32), | |
("esi", ctypes.c_uint32), | |
("edi", ctypes.c_uint32), | |
("ebp", ctypes.c_uint32), | |
("eax", ctypes.c_uint32), | |
("xds", ctypes.c_uint32), | |
("xes", ctypes.c_uint32), | |
("xfs", ctypes.c_uint32), | |
("xgs", ctypes.c_uint32), | |
("orig_eax", ctypes.c_uint32), | |
("eip", ctypes.c_uint32), | |
("xcs", ctypes.c_uint32), | |
("eflags", ctypes.c_uint32), | |
("esp", ctypes.c_uint32), | |
("xss", ctypes.c_uint32), | |
] | |
#### CORE #### | |
class Chunk(): | |
cflags = ["TCACHABLE", "FASTBIN", "SMALLBIN", "UNSORTEDBIN", "LARGEBIN", "TOPCHUNK" , \ | |
"FLAGS", "PREV_INUSE", "IS_MMAPPED", "NOT_MAIN_ARENA", "SIZE", "PREVSIZE", "FD", "BK"] | |
def __repr__(self): | |
pass | |
# return "Chunk[{}](prev_size:0x{:x}, size:0x{:x}, fd:.... )".format(self.index, self.PREVSIZE, self.SIZE) | |
def __str__(self): | |
tmp_flags = "" | |
for i in ["NOT_MAIN_ARENA", "IS_MMAPPED", "PREV_INUSE"]: | |
if getattr(self, i) == 1: | |
tmp_flags += PTR["F2"](i) | |
else : | |
tmp_flags += PTR["F1"](i) | |
return " @{} Chunk[{}] -> [prev_size:{}| size:{}| fd:{}| bk:{}] {}".format( \ | |
fmtL(self.index, self.data_size), \ | |
PTR['H'](fmtR(self.addr, self.data_size)), \ | |
fmtR(self.PREVSIZE, self.data_size), \ | |
fmtR(self.SIZE, self.data_size), \ | |
fmtR(self.FD, self.data_size), \ | |
fmtR(self.BK, self.data_size), \ | |
tmp_flags) | |
def __init__(self, idz, addr, xy86, data_size, chunk_range, data): | |
self.index = idz | |
self.addr = addr | |
self.xy86 = xy86 | |
self.data_size = data_size | |
self.align = 2**47-1 if self.data_size == 8 else 2**31-1 | |
self.chunk_range = chunk_range | |
self.chunk = { cc:struct.unpack(self.xy86, data[cc*self.data_size :(cc*self.data_size) + self.data_size])[0] for cc in range(self.chunk_range) } | |
checkZ( lambda : len(self.chunk.values()) != chunk_range, "Invalid size chunk") | |
self.chunk.update( {"prev_size":self.chunk[0] & self.align, "curr_size":(self.chunk[1] & ~7) & self.align, "curr_flag":self.chunk[1] & 7, \ | |
"fd":self.chunk[2], "bk":self.chunk[3], "fdB":self.chunk[4], "bkB":self.chunk[5] }) | |
self.__set_flags() | |
def __set_flags(self): | |
for i in Chunk.cflags : | |
globals()["self.{0}".format(i)] = False | |
self.SIZE=self.chunk['curr_size'] | |
self.PREVSIZE=self.chunk['prev_size'] | |
self.FD=self.chunk['fd'] | |
self.BK=self.chunk['bk'] | |
self.FLAGS=self.chunk['curr_flag'] | |
self.PREV_INUSE = self.FLAGS & 1 | |
self.IS_MMAPPED = self.FLAGS & 2 | |
self.NOT_MAIN_ARENA = self.FLAGS & 4 | |
class SGremlin: | |
def __init__(self, pid): | |
self.pid = pid | |
self.__init_runtime() | |
self.__init_ptrace() | |
self.__init_register() | |
self.ptrace_detach() | |
def __init_ptrace(self): | |
if self.xy86 == "Q": | |
self.libc.ptrace.restype = ctypes.c_uint64 | |
self.libc.ptrace.argtypes = [ctypes.c_uint64, ctypes.c_uint64, ctypes.c_void_p, ctypes.c_void_p] | |
elif self.xy86 == "I": | |
self.libc.ptrace.restype = ctypes.c_uint32 | |
self.libc.ptrace.argtypes = [ctypes.c_uint32, ctypes.c_uint32, ctypes.c_voidp, ctypes.c_voidp] | |
self.ptrace_attach() | |
def __init_register(self): | |
self.ptrace_getregs() | |
def __init_runtime(self): | |
""" | |
init_runtime(self) | |
Init the mapping address, architecture, element size in stack, libc CDLL struct, address heap, stack. | |
""" | |
self.map_file = '/proc/{}/maps'.format(self.pid) | |
self.exe_path = '/proc/{}/exe'.format(self.pid) | |
self.parse_maps_file() | |
self.setHeapAndArch() | |
self.setLibc() | |
checkZ( lambda : self.libc is None, "No libc found, wrong arch struct:{}.".format(self.xy86)) | |
self.executable = os.path.realpath(self.exe_path) | |
self.libc.process_vm_readv.argtypes = [ctypes.c_uint64, ctypes.c_void_p, ctypes.c_uint64, ctypes.c_void_p, ctypes.c_uint64, ctypes.c_uint64] | |
self.regs_struct = user_regs_structX64 if self.data_size == 8 else user_regs_struct | |
def parse_maps_file(self): | |
""" | |
parse_maps_file(self) | |
Init dict mapping address | |
""" | |
self.maps = dict() | |
map_file = open(self.map_file, "r") | |
for line in map_file: | |
line = line.strip() | |
parts = line.split() | |
(addr_start, addr_end) = map(lambda x: int(x, 16), parts[0].split('-')) | |
permissions = parts[1] | |
offset = int(parts[2], 16) | |
device_id = parts[3] | |
inode = parts[4] | |
map_name = parts[5] if len(parts) > 5 else '' | |
mapping = { | |
'addr_start': addr_start, | |
'addr_end': addr_end, | |
'size': addr_end - addr_start, | |
'permissions': permissions, | |
'offset': offset, | |
'device_id': device_id, | |
'inode': inode, | |
'map_name': map_name | |
} | |
self.maps[map_name].append(mapping) if map_name in self.maps else self.maps.update({map_name:[mapping]}) | |
map_file.close() | |
def setHeapAndArch(self): | |
""" | |
getHeapAndArch(self) | |
Set, in order, address heap, stack, architecture, element size in stack | |
""" | |
checkZ( lambda : not ('[heap]' in self.maps ) or not ('[stack]' in self.maps ), "No heap/stack found.") | |
self.heap = self.maps['[heap]'][0] | |
self.stack = self.maps['[stack]'][0] | |
tmp1 = ("Q",8) if hex(self.stack['addr_start']).startswith("0x7f") else ("I",4) | |
self.xy86 = tmp1[0] | |
self.data_size = tmp1[1] | |
def setLibc(self): | |
""" | |
get_libc(self) | |
Set self.libc ctypes.CDLL | |
""" | |
if self.xy86 == "Q": | |
self.libc = ctypes.CDLL('/lib/x86_64-linux-gnu/libc.so.6') | |
elif self.xy86 == "I": | |
self.libc = ctypes.CDLL('/lib/i386-linux-gnu/libc.so.6') | |
else : | |
self.libc = None | |
def read_process_memory(self, address, size): | |
""" | |
read_process_memory(self, int, int) -> list(byte) | |
Return a dump of memory from address till address+size | |
""" | |
func=self.libc.process_vm_readv | |
bytes_buffer = ctypes.create_string_buffer(size) | |
local_iovec = iovec(ctypes.cast(ctypes.byref(bytes_buffer), ctypes.c_void_p), size) | |
remote_iovec = iovec(ctypes.c_void_p(address), size) | |
bytes_transferred = func(self.pid, ctypes.byref(local_iovec), 1, ctypes.byref(remote_iovec), 1, 0) | |
return bytes_buffer | |
def write_process_memory(self, address, size, data): | |
""" | |
write_process_memory(self, int, int, list(byte)) -> int | |
Overwrite some memory space from address till address+size | |
""" | |
checkZ( lambda : size >= len(data), "Data too large.") | |
bytes_buffer = ctypes.create_string_buffer(size) | |
bytes_buffer.raw = data | |
local_iovec = iovec(ctypes.cast(ctypes.byref(bytes_buffer), ctypes.c_void_p), size) | |
remote_iovec = iovec(ctypes.c_void_p(address), size) | |
bytes_transferred = self.libc.process_vm_writev( self.pid, ctypes.byref(local_iovec), 1, ctypes.byref(remote_iovec), 1, 0) | |
return bytes_transferred | |
def init_main_arena(self): | |
self.heap_range = (self.heap['addr_start'], self.heap['addr_end']) | |
self.heap_data = self.read_process_memory(self.heap['addr_start'], self.heap['size']) | |
self.chunk_range = 6 | |
self.chunk_list = [] | |
i = 0 | |
idZ = 0 | |
limit = len(self.heap_data) - (self.data_size * self.chunk_range) | |
while i < limit : | |
tmp_chunk = Chunk(idZ, i+self.heap['addr_start'], self.xy86, self.data_size, self.chunk_range, self.heap_data[i : (i + self.chunk_range * self.data_size) ] ) | |
self.chunk_list.append(tmp_chunk) | |
i += tmp_chunk.SIZE | |
idZ += 1 | |
# | |
self.__arena_struct() | |
self.__tcache_struct() | |
self.__check() | |
return | |
def print_arena(self): | |
print(PTR["H1"]("\n### HEAP:")) | |
for chunk in self.chunk_list : | |
print(chunk) | |
def __arena_struct(self): | |
#TODO: parse main arena, and identify binlists | |
pass | |
def __tcache_struct(self): | |
#TODO: parse tcache_perthread_struct | |
pass | |
def __check(self): | |
#TODO: check all is correct, e.g. find invalid chunk, prev_size corrupted, etc. | |
pass | |
def ptrace_attach(self): | |
return self.libc.ptrace(PTRACE_ATTACH, self.pid, None, None) | |
def ptrace_detach(self): | |
return self.libc.ptrace(PTRACE_DETACH, self.pid, None, None) | |
def ptrace_getregs(self): | |
self.regs_status = self.regs_struct() | |
self.libc.ptrace(PTRACE_GETREGS, self.pid, None, ctypes.byref(self.regs_status)) | |
def ptrace_setregs(self): | |
self.libc.ptrace(PTRACE_SETREGS, self.pid, None, ctypes.byref(self.regs_status)) | |
def ptrace_singlestep(self): | |
libc.ptrace(PTRACE_SINGLESTEP, self.pid, 0, 0) | |
def print_regs(self): | |
print(PTR["H2"]("\n### Register:")) | |
for field_name, field_type in self.regs_status._fields_: | |
print(" {} {}".format( field_name.ljust(10," "), fmtR(getattr(self.regs_status, field_name), self.data_size+1))) | |
if __name__ == "__main__" : | |
try: | |
pid = int(sys.argv[1]) | |
# parse maps | |
sgremlin = SGremlin(pid) | |
# parse main arena | |
sgremlin.init_main_arena() | |
# print main arena | |
sgremlin.print_arena() | |
# print regs, detach the process first, otherwise cannot | |
sgremlin.print_regs() | |
except Exception: | |
print(DESCR) | |
checkZ(lambda : True, "#Usage: python3 {} <pid>\n".format(sys.argv[0])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment