静态分析框架从 01 到 10-数据篇
分析篇中我介绍了如何对程序做静态分析提取数据,在本篇中我将介绍如何处理这些数据,并且从中找到安全漏洞。
数据的处理分为两个部分,一个是数据存储的抽象层,用于将内存对象映射到数据库中(ORM);另一个是使用数据进行分析的规则引擎,我将说明如何基于 Neo4j 实现数据流跟踪。
在这之前,先来看看梦中情人长什么样子。
代码属性图
2014 年, 《Modeling and Discovering Vulnerabilities with Code Property Graphs》 里首次提出了代码属性图(Code Property Graph) 的概念以及其在漏洞挖掘领域的应用。代码属性图将多个层次的代码信息(比如语法、控制流、数据流等)存储到一个图形结构里,使得各个层次的信息可以相互关联。 并且可以使用领域专用语言(DSL,Domain Specific Language)来对图内的信息进行查询。 详细的描述可以见 https://docs.joern.io/code-property-graph。
提出代码属性图的作者现在是 ShiftLeft 的首席科学家, ShiftLeft 的产品都是围绕着代码属性图的概念来做的,包括其早期的开源产品 Joern 以及其兄弟商业产品 Ocular。Ocular 构造了一个由 AST、CFG、PDG 等多层级组成的 CPG:
并且提供过程间的、flow-sensitive、context-sensitive、field-sensitive 的数据流跟踪。 另外还配套了自研的图数据库 OverflowDB,以及专用语言 CPGQL。
ShiftLeft 是 CPG 的最佳实践以及标杆,配备的都属于顶配梦幻阵容,虽然很多内容都是开源的,但社区支持并不是那么友好,不容易上手。咱也没那么大实力单独做这些配件,所以目标是做一个底配版:
由 Hierarchy + Call Graph + Data Flow Graph 组成 CPG。
配备 Neo4J 图数据库,以及查询语言 CypherQL 配合 Java。
Hierarchy 指的是目标的代码结构,比如类之间的继承关系、类与方法/字段之间的从属关系、方法与指令的从属关系,这部分比较简单,就不具体展开了。承接分析篇的结果,目前组成 CPG 所需要的数据已经存在我们的内存里了,接下来要做的就是将这些数据映射存储到图数据库中。
数据抽象层
Neo4j 有一些被广泛使用的 ORM 框架, 比如官方的 Neo4j-OGM 以及 SpringBoot 的 Spring Data Neo4j,但是为了提供强大且便捷的功能,框架都进行了很多的业务封装,导致批量插入的性能比使用原生查询的性能慢了数十倍,而我的需求动辄是插入几千万条数据,这些偏业务侧的框架的性能远远无法达到我的接受范围。
还有一种做法是在另外一个静态分析项目 tabby 里面想到的,可以将内存模型导出为 CSV 然后利用 Neo4J 的导入功能批量插入数据。 但是自己写一个符合格式的 CSV 转换比较繁琐,并且不够优雅。
通过各个方案的对比之后我决定还是使用了嵌入式 Neo4j 以及裸查询的方式来实现数据入库。 主要就是对数据库操作进行封装,然后保留模型(节点或关系)的 Labels 、Type 以及 Properties 的抽象接口,需要入库的数据就继承这个抽象类,类似于:
AbstractModel.java:
public abstract class AbstractModel<T> {
private static final AtomicInteger counter = new AtomicInteger();
@Getter @Setter private T raw;
@Getter private long internalId = counter.getAndIncrement();
public void prepare(Transaction tx) {}
protected abstract Map<String, Object> getProperties();
public abstract T doCreate(Transaction tx);
public abstract T doQuery(Transaction tx);
}AbstractNode.java:
public abstract class AbstractNode extends AbstractModel<Node> {
protected abstract Set<Label> getLabels();
@Override
public Node doQuery(Transaction tx) {...}
@Override
public Node doCreate(Transaction tx) {
if (getRaw() != null) {
return getRaw();
}
Map<String, Object> prop = getProperties();
Node node = <CREATE NODE AS n RETURN n>
if (node != null) {
this.setRaw(node);
}
return null;
}
@Override
public String toString() {...}
}JavaTypeDecl.java
public class JavaTypeDecl extends AbstractNode {
String name;
@Override
protected Set<Label> getLabels() { return Sets.newHashSet(Labels.JavaTypeDecl); }
@Override
protected Map<String, Object> getProperties() {
Map<String, Object> prop = new HashMap<>();
prop.put("name", this.name);
return prop;
}
}同样道理的还有 AbstractEdge,边(Edge)对应的就是 Neo4J 里的关系(Relationship),
DatabaseService
所有的数据模型最终都将通过 DatabaseService 进行写入,但是每个分析产生的数据都各不相同,为了方便管理我还定义了 DataBucket 接口,每个分析产生的数据都在同一个桶里,桶里要求提供 Node 队列以及 Edge 队列,DatabaseService 每次以桶为单位建立多线程 Push 队列。
值得一提的是因为 push 时将会使用多个数据库事务连接(Transaction),而建立关系时使用了 Node.createRelationshipTo API, 这里面会使用创建节点时的事务来进行创建边,但此时这个事务可能早就被关闭了, 在查看 Neo4J 代码之后发现建立边实际上只需要使用 NodeId 即可,遂通过调用内部 API 来实现:
Relationship relationship;
try {
KernelTransaction ktx = DatabaseService.getKernelTransaction(tx);
int relationshipTypeId = ktx.tokenWrite().relationshipTypeGetOrCreateForName(getType().name());
long relationshipId = ktx.dataWrite().relationshipCreate(from.getId(), relationshipTypeId, to.getId());
relationship = ((InternalTransaction) tx).newRelationshipEntity(relationshipId, from.getId(), relationshipTypeId, to.getId());
} catch (Throwable e) {
relationship = tx.getNodeById(from.getId()).createRelationshipTo(to, getType());
}将数据都 Push 至数据库之后, 就可以使用 Neo4j Browser 来查看结果了,比如查询一个方法内的指令和数据流:
规则引擎
规则引擎里定义了几个概念: 节点查询、漏洞、漏洞类型以及漏洞模型, 其关系如下:
节点查询用于从图数据库里筛选节点,一般可以直接使用 Cypher 语句,无法满足的情况下可以配合 Java 进一步筛选,查询出来的节点可以用于定义 source、sink、sanitizes 等。
漏洞模型产出漏洞,并且提供污点跟踪的的模型模版,最简单的场景下仅需定义 source、sink 即可使用。
漏洞包含漏洞类型、优先级、代码位置、风险路径以及自定义标签。
漏洞类型包含漏洞的名称、描述以及风险等级。
数据流路径探索
在分析篇中, 我们已经实现了在内存里的数据流传播,但是其只是为了支撑其他精确的分析,覆盖到的仅有一部分数据,并且,想要将过程间数据流全部的传播结果都存储在数据库中也并不是那么现实。所以我在数据库里只会存储数据流节点和边的信息,然后利用 Neo4J 的路径查找来进行数据流跟踪。
首先 Cypher 查询是具备两个节点之间的路径搜索功能的,但是存在一些问题导致如果直接使用则无法满足我们的需求:
一是性能问题, Neo4J 默认采用单线程的深度遍历, 但实际情况下我经常需要遍历几百几千万条路径才能得到结果,这时的等待时间会让人怀疑人生。
二是过程间数据流传播时上下文的问题, 在内存中我们使用了一个栈来确定实际的返回边,但 Cypher 并没有这样的功能,有一些另辟蹊径的办法能在特定情况下做到,但是同时也会损耗巨大量的性能。
幸运的是,虽然 Cypher 默认的查询无法满足需求, 但是在 Neo4J 的代码里却有着一套健全的路径搜索框架,而正好我使用的是嵌入式数据库,可以完整的使用这些 API 来构建自己的路径探索策略。
Neo4J 为路径遍历提供了 MonoDirectionalTraversalDescription,在这个类里面可以看到几个支撑遍历路径策略的成员,我需要用到的是以下几个:
PathExpander,是用于确认当前路径可以继续展开哪些边(关系)。
PathEvaluator, 是用于评估对当前路径的决策,是继续展开或者截断?当前路径是否要包含在返回结果里?
UniquenessFactory,是用于确认避免重复遍历的单一性策略,是根据节点重复或者根据边重复来截断路径,而且提供了 Global、Path 等几个级别。
InitialBranchState,最妙的是这个,简单的说就是路径状态,而我需要的调用栈不正是路径状态吗?Neo4J 里面这个值一直都是空的,甚至不支持设置,目前只能通过反射使用,但功能是完善的。
路径展开(context-sensitive)
使用一个空的调用栈作为 InitialBranchState, 然后在 PathExpander 里获取到当前边的上下文信息,更新调用栈状态,并且根据栈顶的上下文信息来筛选准确的数据返回边,即可实现准确的上下文敏感过程间数据流传播。这一段逻辑代码比较长,就不贴了。
并且,这部分的工作是通过多线程完成的,虽然 Neo4J 整个路径遍历的框架就是单线程的,但是通过多线程处理好数据再喂给它显然也能提升不少 CPU 利用率。
单一性策略
Neo4J 里提供了九种单一性策略,分别对应四种层级(Global、Path、Level、Recent)上的节点和关系, 再加上一种 None 即无需单一性。
Global 代表在所有路径里每个节点/关系只访问一遍,后续其他路径到达此节点则将被截断;Path 则是对应每个路径里只访问一遍, Level 代表的是同一路径长度下只访问一次, Recent 大概是有个缓存,我也没看太明白,具体可以看官方文档。
选择单一性策略非常重要,主要是为了解决环的问题,这里我选择了以 RELATIONSHIP_PATH 为基础,可以避免陷入递归调用的环;然后对于 FieldNode 使用以 source 为级别的节点单一(每个 source 相关的路径只访问一次),对于变量则使用路径节点单一,这样一来就不会在递归调用或者循环控制流中产生的环里被一声声靓仔迷失了自己。
大致代码如下:
public class CycleUnique implements UniquenessFilter {
UniquenessFilter parent;
Map<Long, Set<Long>> pathFieldVisited = new HashMap<>();
public CycleUnique() { this(RELATIONSHIP_PATH.create(null)); }
public CycleUnique(UniquenessFilter parent) { this.parent = parent; }
@Override
public boolean checkFirst(TraversalBranch branch) { return parent.checkFirst(branch); }
@Override
public boolean check(TraversalBranch source) {
if (!parent.check(source)) return false;
long idToCompare = source.endNode().getId();
if (source.endNode().hasLabel(Labels.FieldNode)) {
Set<Long> visited = pathFieldVisited.computeIfAbsent(source.startNode().getId(), k -> new HashSet<>());
if (!visited.add(idToCompare)) return false;
}
while (source.length() > 0
&& source.lastRelationship().getType().name().equals(RelTypes.DataPointTo.name())
&& source.lastRelationship().getStartNode().hasLabel(Labels.VarNode)) {
source = source.parent();
if (source.endNode().getId() == idToCompare) {
return false;
}
}
return true;
}
}路径决策
通过 PathEvaluator 可以评估对一个路径实行怎样的决策,比如限制路径的长度,当路径长度达到 100 时丢弃当前路径并且阻止继续遍历(EXCLUDE_AND_PRUNE),或者当路径遇到污点跟踪的 sanitizer 节点,同样也是丢弃并且截断,而但路径遇到 sink 节点,那么将保留当前路径作为一个返回结果,并且决策是否继续探索(INCLUDE_AND_PRUNE 或 INCLUDE_AND_CONTINUE)
我的策略如下:
@Override
public Evaluation evaluate(Path path, BranchState<LinkedList<Integer>> state) {
if (maxDepth(path, depth)) return Evaluation.EXCLUDE_AND_PRUNE;
if (sanitizes != null && sanitizes.contains(path.endNode())) return Evaluation.EXCLUDE_AND_PRUNE;
if (end != null && end.contains(path.endNode())) return Evaluation.INCLUDE_AND_CONTINUE;
return end == null ? Evaluation.INCLUDE_AND_PRUNE : Evaluation.EXCLUDE_AND_CONTINUE;
}到此已经可以得到一个符合预期的路径探索策略,组合起来 MonoDirectionalTraversalDescription 即可进行污点跟踪的路径探索,相关代码:
@Builder
public PathFinder(int maxDepth, int maxStepExpanding, Direction direction, UniquenessFactory uniqueness, RelationshipType[] flows, PathBooster booster) {
this.maxDepth = maxDepth;
this.maxStepExpanding = maxStepExpanding;
this.direction = direction == null ? Direction.OUTGOING : direction;
this.uniqueness = uniqueness == null ? CycleUnique.CYCLE_PATH : uniqueness;
this.flows = flows == null ? DATA_FLOW : flows;
this.booster = booster;
}
private TraversalDescription traversal(PathEvaluator<LinkedList<Integer>> evaluator) {
TraversalDescription side = new MonoDirectionalTraversalDescription()
.breadthFirst()
.evaluator(evaluator)
.uniqueness(uniqueness)
.expand(PathExpander.builder()
.flows(flows)
.maxStepExpanding(maxStepExpanding)
.booster(booster)
.build());
if (isBackward()) {
side = side.reverse();
}
try {
Field field = MonoDirectionalTraversalDescription.class.getDeclaredField("initialState");
field.setAccessible(true);
field.set(side, STACK_EMPTY);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
return side;
}
private Iterable<Path> sideCollect(Set<Node> from, TraversalDescription side) {
HashSet<Path> paths = new HashSet<>();
for (org.neo4j.graphdb.Path path : side.traverse(from)) {
path = isBackward() ? reverse(path) : path;
paths.add(path);
}
return paths;
}breadthFirst 表示我是选择广度优先的遍历算法,使用 depthFirst 的话则代表使用深度优先。
到这里,数据流跟踪功能就基本实现了,封装一些业务代码之后就可以交给漏洞模型使用。
漏洞模型
RuleEngine 模块里提供了一个抽象类 DataTaintModel 作为模版以更方便的编写基于污点跟踪的模型,贴一下部分代码:
public abstract class DataTaintModel extends VulnerableDiscovering {
public static int DEFAULT_MAX_DEPTH = 0;
public static int DEFAULT_STEP_EXPANDING = 0;
private final RuleEngine engine;
Set<Path> foundPath = new HashSet<>();
Map<Node, String> nodeQueries = new HashMap<>();
public DataTaintModel(RuleEngine engine) {this.engine = engine;}
protected RuleEngine engine() {return engine;}
protected abstract Set<String> sourceQueries();
protected abstract Set<String> sinkQueries();
protected Set<Node> sources() {...}
protected Set<Node> sinks() {...}
protected RelationshipType[] flows() {return PathFinder.DATA_FLOW;}
protected Set<Node> sanitizes() {return null;}
protected Set<Node> passThrough() {return null;}
protected int depth() {return DEFAULT_MAX_DEPTH;}
protected int stepExpanding() {return DEFAULT_STEP_EXPANDING;}
protected Direction direction() {return Direction.OUTGOING;}
protected Path verifyPath(Path path) {...}
protected Vulnerable buildVulnerable(Path path) {...}
protected int importance(Path path) {return importance();}
protected Set<String> tags(Path path) {...}
@Override
public Set<Vulnerable> discover() {
Set<Vulnerable> result = new HashSet<>();
Set<Node> sources = sources();
Set<Node> sinks = sinks();
Set<Node> sanitizes = sanitizes();
if (sources == null || sources.isEmpty() || sinks == null || sinks.isEmpty()) {
return result;
}
PathFinder finder = PathFinder.builder()
.maxStepExpanding(stepExpanding())
.maxDepth(depth())
.flows(flows())
.direction(direction())
.booster(engine().getPathBooster())
.build();
for (Path path : finder.find(sources, sinks, sanitizes)) {
path = verifyPath(path);
if (path == null) continue;
foundPath.add(path);
Vulnerable vulnerable = buildVulnerable(path);
if (vulnerable != null) result.add(vulnerable);
}
return result;
}
public Set<Path> getFoundPath() {
return foundPath;
}
}再举一个 NestIntentBridge 模型的例子
public class NestIntentBridge extends DataTaintModel {
public NestIntentBridge(RuleEngine engine) { super(engine); }
@Override
protected Set<String> sourceQueries() { return Sets.newHashSet("SerializationExtraSource"); }
@Override
protected Set<String> sinkQueries() { return Sets.newHashSet("StartComponentSink"); }
@Override
protected Vulnerability type() { return AndroidVulnerabilities.IntentBridge.vulnerability(); }
@Override
protected RelationshipType[] flows() { return PathFinder.POINT_FLOW; }
@Override
protected int importance() { return 2; }
@Override
protected int importance(Path path) {
if (!EntryPointFinder.entryPoints(engine(), toMethodNode(path.startNode())).isEmpty()) {
return super.importance(path) + 5;
}
return super.importance(path);
}
@Override
protected Set<String> tags(Path path) {
Set<String> tags = super.tags(path);
tags.addAll(EntryPointFinder.entryPoints(engine(), toMethodNode(path.startNode())));
return tags;
}
}这里面用了一个 EntryPointFinder 是用于验证这条漏洞路径适用哪些导出组件的,因为具体实现有待优化,所以就先不说了。
总结
我对规则引擎的工作追求大部分都集中在提高路径探索效率以及深度上面,其他的很多业务功能依然在不断的改动,并不是一个比较稳定的状态,所以也没有太多可以分享的。路径探索目前依然存在很多需要改进的地方,比如现在只有固定的一种探索策略,如果遇到了路径爆炸那只能通过最大深度或者设置其他限制来完成探索,后续我可能会搞多策略混合的路径探索。
本篇完,《静态分析框架从 01 到 10》也暂时完结,后续有没有可以更新的得看缘分(饿没饿死)。。