It has been a time since I got to know angr. As an opensource academic project, it does a quite good job. In the coming days (months), I will try to dive into angr source code, for the aim of knowing its internals, the structure of this awesome framework and applies my algorithms to it.

I selected the Reaching Definition Analysis (RDA) as the breakthrough. As its self-definition says:

angr is a python framework for analyzing binaries. It combines both static and dynamic symbolic (“concolic”) analysis, making it applicable to a variety of tasks.

The most significant advantage of angr is its combination of different types of analysis, and a very good expansibility.

I used PyCharm to do code exploration, thanks to its convenient and powerful Forward/Backward navigation functionalities, I saved a lot of time. The code I’m referring to is angr-dev version (8, 20, 1, 7) (outputed by python -c "import angr; print(angr.__version__)")

The most obvious class of RDA is ReachingDefinitionAnalysis, it is inherited from two parent classes ForwardAnalysis and Analysis.

Actually ForwardAnalysis can be regarded as the most important class for you to implement your own analysis “pass” as it provides a foundamental structure for forward analysis.

This is my very first attempt to build a static forward analysis framework that can serve as the base of multiple
static analyses in angr, including CFG analysis, VFG analysis, DDG, etc.
In short, ForwardAnalysis performs a forward data-flow analysis by traversing a graph, compute on abstract values,
and store results in abstract states. The user can specify what graph to traverse, how a graph should be traversed,
how abstract values and abstract states are defined, etc. – Fish

ForwardAnalysis provides basic building blocks for implementing a forward analysis. The most crutial function is _analyze, inside it we can see:

def _analyze(self):
    """
    The main analysis routine.

    :return: None
    """
    self._pre_analysis()
    if self._graph_visitor is None:
        # There is no base graph that we can rely on. The analysis itself should generate successors for the
        # current job.
        # An example is the CFG recovery.

        self._analysis_core_baremetal()
    else:
        # We have a base graph to follow. Just handle the current job.
        self._analysis_core_graph()
    self._post_analysis()

We can do some self-defined pre-work in _pre_analysis(), after that, if we provide a graph object to ForwardAnalysis, the control will be given to _analysis_core_graph(). After analyzing each node of the graph, we can do some work by _post_analysis().

Diving into _analysis_core_graph function, we can see, nodes are unstoppably fetched by self._graph_visitor.next_node(), on each node, self._run_on_node() is executed. It returns a new state named output_state and the output_state of the current node will be used as the input_state of the next node.

Note that the state here is not what we refer to SimState when playing angr symbolic execution. It’s just a type of program abstraction, you can even define yours.

def _analysis_core_graph(self):

    while not self.should_abort:

        self._intra_analysis()

        n = self._graph_visitor.next_node()

        if n is None:
            break

        job_state = self._get_input_state(n)
        if job_state is None:
            job_state = self._initial_abstract_state(n)

        changed, output_state = self._run_on_node(n, job_state)

        # output state of node n is input state for successors to node n
        successors_to_visit = self._add_input_state(n, output_state)

        if changed is False:
            # no change is detected
            continue
        elif changed is True:
            # changes detected
            # revisit all its successors
            self._graph_visitor.revisit_successors(n, include_self=False)
        else:
            # the change of states are determined during state merging (_add_input_state()) instead of during
            # simulated execution (_run_on_node()).
            # revisit all successors in the `successors_to_visit` list
            for succ in successors_to_visit:
                self._graph_visitor.revisit_node(succ)

After knowing the whole structure of ForwardAnalysis, let’s jump back to class ReachingDefinitionsAnalysis. Tracing from _analyze(), we will then reach self._run_on_node. Inside it we can find two temporary variables – block and engine, the block refers to the current code block which is being processed and engine, in our case, is an instance of the type SimEngineRDVEX, which will be dealt with later.

self._engine_vex = SimEngineRDVEX(self.project, self._current_local_call_depth, self._maximum_local_call_depth,
                                          self._function_handler)
...
block = self.project.factory.block(node.addr, node.size, opt_level=0)
block_key = node.addr
engine = self._engine_vex

After that we find a key statement self.node_observe(node.addr, state, OP_BEFORE). angr RDA, and other similar analysis, provide a concept of observation, we can set as many observation points as we want so that we can see what is happening there. The observation points can be “instructions” or “blocks”, the time to observe can be “OP_BEFORE” or “OP_AFTER”. For those states already reaching the observation points, they will be stored in a python-dictionary object like the following shows:

key = 'node', node_addr, op_type
...
self.observed_results[key] = state

Following it, here comes to the very important sentence

state, self._visited_blocks = engine.process(
    state,
    block=block,
    fail_fast=self._fail_fast,
    visited_blocks=self._visited_blocks
)

To know how that engine process a block, we need dive into the code of class SimEngineRDVEX. From class SimEngineRDVEX(SimEngineRDVEXMinin, SimEngineLight) we can see, SimEngineRDVEX has inherited something form SimEngineLight and SimEngineRDVEXMinin.

def process(self, state, *args, **kwargs):
    self._def_use_graph = kwargs.pop('def_use_graph', None)
    self._visited_blocks = kwargs.pop('visited_blocks', None)

    # we are using a completely different state. Therefore, we directly call our _process() method before
    # SimEngine becomes flexible enough.
    try:
        self._process(
            state,
            None,
            block=kwargs.pop('block', None),
        )
    except SimEngineError as e:
        if kwargs.pop('fail_fast', False) is True:
            raise e
        l.error(e)
    return self.state, self._visited_blocks

The private function _process in SimEngineRDVEXMinin leads us to _proccess_Stmt function.

def _process_Stmt(self, whitelist=None):
  ...
  for stmt_idx, stmt in enumerate(self.block.vex.statements):
      if whitelist is not None and stmt_idx not in whitelist:
          continue
      self.stmt_idx = stmt_idx

      if type(stmt) is pyvex.IRStmt.IMark:
          # Note that we cannot skip IMarks as they are used later to trigger observation events
          self.ins_addr = stmt.addr + stmt.delta

      self._handle_Stmt(stmt)

Here comes to the exciting part _handle_Stmt:

def _handle_Stmt(self, stmt):
    handler = "_handle_%s" % type(stmt).__name__
    if hasattr(self, handler):
        getattr(self, handler)(stmt)

We can see it uses hasattr and getattr to invoke specitif “Stmt handlers” (like what Java reflection does), if a developer have their own ways of dealing with different Stmts, he/she can overwrites these and write his/her own.

Back into SimEngineRDVEX we find there exist a lot of _handle_xxx routines, take _handle_WrTmp as an example,

def _handle_WrTmp(self, stmt):
    super()._handle_WrTmp(stmt)
    self.state.kill_and_add_definition(Tmp(stmt.tmp), self._codeloc(), self.tmps[stmt.tmp])

After running the parent’s _handle_WrTmp function, kill_and_add_definition function is invoked, since “writing a new tmp” will absolutely generate a new definition, and whether some old definitions will be killed depends.

As a totally static analysis, RDA traverses each VEX (valgrind IR) statements and stores their effects into states. One other thing to mention, since angr works on VEX level, their atoms are different from those low-level assembly. If you want to know what an atom (in our case, Tmp), you need to check how those atoms are defined, let’s say angr.analyses.reaching_definitions.atoms module in RDA.