IDA Thread Analysis Script

In a recent post, I talked about renaming subroutine-blocks and identifying them in IDA. This technique is very helpful for identifying large block of functions where the parent and child functions are self-contained. Library code is a good example of code that is self contained. Take for example a zip library. The zip library is linked to an executable file. To call it we would pass a buffer to the zip function, the buffer would get passed to the child functions, the zip functions will do their magic and a compressed buffer is returned (simplified version). All of the child functions that were responsible for doing the zip magic would be considered a sub-routine block. They are all related and self-contained. What about a set of functions that are related but not self-contained? A perfect example of this would be a thread.

Usually threads are designed to serve one purpose. A thread could contain functions that are called from other functions, which would not make it self contained. Due to the single purpose all the functions would be related. By identifying the code path of a specific thread we might be able to help with enumerating the threads functionality. First we will need to get the address for each call to CreateThread. This can be done using LocByName("CreateThread",0).
def retlistofCreateThreadAddr():
    addr = []
    for x in CodeRefsTo(LocByName("CreateThread"),0):
        addr.append(x)
    return addr
We will then need to get the offset that is pushed on to the stack for the thread's function start address (lpStartAddr). This is the third argument.
push    eax             ; lpThreadId
push    ebx             ; dwCreationFlags
push    esi             ; lpParameter
push    offset StartAddress_SearchFiles ; lpStartAddress
push    ebx             ; dwStackSize
push    ebx             ; lpThreadAttributes
call    ds:CreateThread
cmp     eax, ebx
jz      short loc_1000341E
From MSDN
HANDLE CreateThread(
  LPSECURITY_ATTRIBUTES lpsa,
  DWORD cbStack,
  LPTHREAD_START_ROUTINE lpStartAddr,
  LPVOID lpvThreadParam,
  DWORD fdwCreate,
  LPDWORD lpIDThread
);

 // lpStartAddr: [in] Long pointer to the application-defined function of type
 // LPTHREAD_START_ROUTINE to be executed by the thread; represents the starting 
 // address of the thread. For more information on the thread function, see ThreadProc.

IDA is usually good at identifying lpStartAddr. If we rely on IDA, we can back trace a number of instructions from the address found in retlistofCreateThreadAddr() until we find the string "lpStartAddr" in the comments. Once we have the address we just need to read a Dword for the threads function start address. There are a couple of flaws to this approach. One is that we are relying on IDA for comments and another is we are relying on lpStartAddr to be a Dword address. The function address could be chosen at runtime. If this is the case we won't be abel to find lpStartAddr. The code will then need to be manually analyzed. An easy way to determine if we were able to receive the lpStartAddr is to check if it's a valid address using GetFunctionName(lpStartAddr).

def getStartAddr(ct_addr):
    # backtrace to find string "lpStartAddress"
    count = 0
    addr = PrevHead(ct_addr,minea=0)
    while count < 10:
        if 'lpStartAddress' in str(Comment(addr)):
            return Dword(addr+1) 
        count = count + 1
        addr = PrevHead(addr,minea=0)
        continue 
 
Once we have the lpStartAddr we just need to get all of it's child functions. For this we can use a modified version of the stolen code (graph_down function) from Carlos G. Prado. The modification returns a list that contains the function name and the address of where a call is at. This list can be used to get the depth of each function. Once we have the depth we can display each threads and all of it's child functions in a structured output in IDA.




This script will be included in the upcoming release of IDAScope. In the previous post we used  a script to rename all functions in a subroutine block. The same script can be used for renaming all child functions in a thread. For the IDAScope release of this script it will be in it's own window.  Plus, the script will have an option to add a repeating comment to all child functions in a thread. Constant appending to the function name starts to clutter it up. Dan has been doing some awesome work on IDAScope (makes my updates look like sophomore programing examples).

 For anyone who wants the code now the script and code can be found below.

Note: Calculating the depth is probably the slowest part of the code. I tried to figure out away to get the depth from inside the graph_down function but I had no luck. I spent a good amount of time reviewing others code in graphing down and graphing up. Cody Pierce has some great code on Tipping Point's blog but it's for graphing up. If anyone has any thoughts please shoot me an email. My address is in the comments of the script.

Source code of an_threads.py, Download

## an_threads.py is a script that can be used to help with analyzing threads
## and their child functions. Usage IDA > File > Script file.. > Select an_threads.py
## The output will be displayed to the Output Window. IDA > View > Output Window
## Created by alexander.hanel@gmail.com, version 0.01 

from idaapi import * 
import idautils
import idc
import sys

def format_depth(x):
    # Get's the depth of each function from the parent/root function
    for index in range(0, len(x)):
        if x[index][1] == None:
            x[index].append(0)
            continue
        if index == 1:
            x[index].append(1)
            continue
        # Indent Child Function 
        if x[index][0] == x[index-1][1]:
            x[index].append(x[index-1][2]+1)
            continue
        # No Indent same function 
        if x[index][0] == x[index-1][0]:
            x[index].append(x[index-1][2])
            continue
        if x[index][0] != x[index-1][1] or x[index][0] != x[index-1][0]:
            for v in range(1, index):
                if len(x[index]) == 3: continue 
                if x[index][0] == x[v][0]:
                    x[index].append(x[v][2])
                    continue
                
        if len(x[index]) == 3:
                continue
    # returns list
    # format parent, child, depth 
    return x
    
def print_dep(dep):
    # prints the output
    for line in dep:
        if line[1] == None:
            print GetFunctionName(int(line[0],16)), "(lpStartAddr)"
            
        else:
            space = ' ' * 3 * line[2]
            func_string = GetFunctionName(int(line[1],16))
            if func_string == '':
                 func_string = '* Call ' + GetDisasm(int(line[1],16))[6:-6]
            print space , func_string

    return 
    
def graph_down(ea, depth, graph = {}, path = set([]) ):
    # This function was borrowed from Carlos G. Prado. Check out his Milf-Plugin for IDA on Google Code. 
    graph[ea] = list()    # Create a new entry on the graph dictionary {node: [child1, child2, ...], ...}
    path.add(ea)        # This is a set, therefore the add() method

    # Iterate through all function instructions and take only call instructions
    for x in [x for x in FuncItems(ea) if is_call_insn(x)]:        # Take the call elements
            for xref in XrefsFrom(x, XREF_FAR):                                   
                    if not xref.iscode:
                            continue

                    if xref.to not in path or 'extrn' in GetDisasm(xref.to):
                        depth.append([hex(LocByName(GetFunctionName(x))), hex(xref.to)])
                    
                    if xref.to not in path:        # Eliminates recursions
                            graph[ea].append(xref.to)
                            graph_down(xref.to, depth, graph, path)
    return depth

def retlistofCreateThreadAddr():
    # returns a list of all addresses that call CreateThread
    addr = []
    for x in CodeRefsTo(LocByName("CreateThread"),0):
        addr.append(x)
    return addr

def getStartAddr(ct_addr):
    # backtrace to find string "lpStartAddress"
    # then read and return Dword
    count = 0
    addr = PrevHead(ct_addr,minea=0)
    while count < 10:
        if 'lpStartAddress' in str(Comment(addr)):
            return Dword(addr+1) 
        count = count + 1
        addr = PrevHead(addr,minea=0)
        continue         


## Main()
threads = []   
for x in retlistofCreateThreadAddr():
    # return (CreateFunction Address, StartAddress)
    threads.append((x,(getStartAddr(x)))) 
                   
print "Number of Threads %s" % (len(threads))
for addr in threads:
    print "CreateThread Call %s" % hex(addr[0])
    if GetFunctionName(addr[1]) == '':
        print "[Warning] Could Not Get lpStartAddr [Warning]"
        print 
        continue
    x = graph_down(addr[1], depth=[[hex(LocByName(GetFunctionName(addr[1]))),None]])
    print_dep(format_depth(x))
    print

No comments:

Post a Comment