AC 自动机在 Tengine 中的应用

在Web应用中我们常常需要判断用户的浏览器, 然后针对不同浏览器做出不同的响应. 识别一个浏览器的类别主要通过HTTP协议头中的User-Agent字段. 比如在我机器上浏览器的User-Agent的标识为:

1
2
Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) \
        Chrome/28.0.1500.72 Safari/537.36

而主流的浏览器无非就是Chrome, Firefox, IE, Safari, 所以我们可以在User-Agent中搜索这些关键字去区分浏览器的类别. 恩, 一个典型的多模匹配问题. 解决该问题的方法很多, 百度的搜索官方博客的这篇文章例举了一些解决这类问题的相关算法, 有兴趣的同学可以看一下.

本人是一个算法小菜鸟, 同时也是一个初级的nginx使用者, 本着解决的问题的精神, 找到了一个现成的用于判断User-Agent的nginx模块: HttpUserAgentModule. 大家可以在这里找到它的源码. 这个模块是由淘宝Tengine团队开发的.

处于好奇, 浏览了下代码(主要也是因为代码量比较少), 发现核心的算法就是AC自动机. 那就趁这个机会好好学习一下;)

AC 自动机

AC 自动机算法(Aho–Corasick string matching algorithm)是一个很高效的字符串匹配算法, 主要是在Trie中引入了类似KMP算法的滑动机制. 很久很久以前(–_–!!!), 当在还在做ACM的时候接触过, 但后来发现自己实在不是搞算法和做题的料, 因此决定还是用自动饮料机买个水, 用KMP看个电影吧;)

有关这个算法的学习, 这里推荐三份学习资料:

个人认为最关键的有两点:

  1. 构建Trie
  2. 构建失败线索

接下来我们来看看Tengine的大大们是如何实现的吧.

Tengine 中的应用

因为是nginx模块, 因此涉及很多nginx的数据结构, 可以先浏览一下这儿(严重感谢Tengine团队的分享)

先看下nginx-http-user-agent的目录结构:

1
2
3
4
5
6
7
8
9

nginx-http-user-agent/
|-- README
|-- config
|-- ngx_http_user_agent_module.c
|-- ngx_trie.c
`-- ngx_trie.h

0 directories, 5 files

ngx_trie.cngx_trie.h主要负责实现自动机算法, 先看一下核心数据结构(加了点自己的注释):

ngx_trie.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* node */
struct ngx_trie_node_s {
    void                           *value;          /* 可以忽略;) */

    ngx_trie_node_t                *search_clue;    /* 匹配失败 */
    ngx_trie_node_t               **next;           /* 子节点   */

    unsigned                        key:31;         /* 标明是否是一个匹配串的结尾       */
    unsigned                        greedy:1;       /* 标明如果已经匹配了, 是否继续查找 */
};


struct ngx_trie_s {
    ngx_trie_node_t                *root;
    ngx_pool_t                     *pool;           /* nginx 自己的内存池 */

    /* 操作, 分别对应ngx_trie.c中实现的3个函数 */
    ngx_trie_insert_pt              insert;
    ngx_trie_query_pt               query;
    ngx_trie_build_clue_pt          build_clue;
};

来看一下创建Trie的函数(初始化数据结构):

ngx_trie.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
ngx_trie_t *
ngx_trie_create(ngx_pool_t *pool)
{
    ngx_trie_t *trie;

    trie = ngx_palloc(pool, sizeof(ngx_trie_t));
    if (trie == NULL) {
        return NULL;
    }

    /* 初始化根节点 */
    trie->root = ngx_trie_node_create(pool);
    if (trie->root == NULL) {
        return NULL;
    }

    trie->pool = pool;

    /* 设置操作函数 */
    trie->insert = ngx_trie_insert;
    trie->query = ngx_trie_query;
    trie->build_clue = ngx_trie_build_clue;

    return trie;
}


ngx_trie_node_t *
ngx_trie_node_create(ngx_pool_t *pool)
{
    ngx_trie_node_t *node;

    node = ngx_pcalloc(pool, sizeof(ngx_trie_node_t));
    if (node == NULL) {
        return NULL;
    }

    return node;
}

接下来看下用于把关键字加入Trie树中的插入函数:

ngx_trie.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
ngx_trie_node_t *
ngx_trie_insert(ngx_trie_t *trie, ngx_str_t *str, ngx_uint_t mode)
{
    size_t           i;
    ngx_int_t        pos, step, index;
    ngx_trie_node_t *p, *root;

    root = trie->root;
    i = 0;

    /* 判断是使用正序还是倒序来进行插入 */
    if (mode & NGX_TRIE_REVERSE) {
        pos = str->len;
        step = -1;
    } else {
        pos = -1;
        step = 1;
    }

    p = root;

    /* 依次将str中的字符插入到树中 */
    while (i < str->len) {
        pos = pos + step;
        index = str->data[pos];

        if (index < 0 || index >= NGX_TRIE_KIND) {    // NGX_TRIE_KIND 为 256
            continue;
        }

        if (p->next == NULL) {
            p->next = ngx_pcalloc(trie->pool,
                    NGX_TRIE_KIND * sizeof(ngx_trie_node_t *));

            if (p->next == NULL) {
                return NULL;
            }
        }
        if (p->next[index] == NULL) {
            p->next[index] = ngx_trie_node_create(trie->pool);
        }

        p = p->next[index];
        i++;
    }

    /* 这里是将字符串的长度给节点 */
    p->key = str->len;
    if (mode & NGX_TRIE_CONTINUE) {
        p->greedy = 1;
    }

    return p;
}

接下来是最精华的用于构建查找失败后的查找线索函数. 这里的主要思想套用下上面第二个资料的博主的话(稍微做了点修改):

构造失败指针的过程概括起来就一句话: 设节点T上的字母为C, 沿着它父节点的失败指针走, 直到走到一个节点Y: 它的儿子中也有字母为C节点J, 然后把当前节点T的失败指针指向节点J. 如果一直走到了root都没找到,那就把失败指针指向root.

具体操作起来只需要: 先把root加入队列(root的失败指针指向自己或者NULL), 之后每处理一个点, 就把它的所有子节点加入队列, 直到队列为空

下面的实现方法与这个思想基本吻合:

ngx_trie.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
ngx_int_t
ngx_trie_build_clue(ngx_trie_t *trie)
{
    ngx_int_t        i, head, tail;
    ngx_trie_node_t *q[NGX_TRIE_MAX_QUEUE_SIZE], *p, *t, *root;

    head = tail = 0;
    root = trie->root;
    q[head++] = root;
    root->search_clue = NULL;

    /* q 是辅助队列 */

    /* 注意: 这里都是为节点的子节点构建线索的 */
    while (head != tail) {
        t = q[tail++];
        tail %= NGX_TRIE_MAX_QUEUE_SIZE;

        if (t->next == NULL) {
            continue;
        }

        p = NULL;

        /* 为该节点的所有子节点构建线索 */
        for (i = 0; i < NGX_TRIE_KIND; i++) {
            if (t->next[i] == NULL) {
                continue;
            }

            /* 若节点是root, 则它的字节点的线索都指向root */
            if (t == root) {
                t->next[i]->search_clue = root;

                /* 将子节点加入队列 */
                q[head++] = t->next[i];
                head %= NGX_TRIE_MAX_QUEUE_SIZE;

                continue;
            }

            /* 若节点不是root */
            p = t->search_clue;

            while (p != NULL) {
                /* 找到一个有相同字母的节点 */
                if (p->next != NULL && p->next[i] != NULL) {
                    t->next[i]->search_clue = p->next[i];
                    break;
                }
                p = p->search_clue;
            }

            /* 找不到 */
            if (p == NULL) {
                t->next[i]->search_clue = root;
            }

            /* 将子节点加入队列 */
            q[head++] = t->next[i];
            head %= NGX_TRIE_MAX_QUEUE_SIZE;
        }
    }

    return NGX_OK;
}

接下来是查找函数:

ngx_trie.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/* 
 * 查找str中包含的关键字 
 *
 * 失败时返回NULL
 */
void *
ngx_trie_query(ngx_trie_t *trie, ngx_str_t *str, ngx_int_t *version_pos,
        ngx_uint_t mode)
{
    void            *value;
    size_t           i;
    ngx_int_t        step, pos, index;
    ngx_trie_node_t *p, *root;

    value = NULL;
    root = trie->root;
    p = root;
    i = 0;

    if (mode & NGX_TRIE_REVERSE) {
        pos = str->len;
        step = -1;
    } else {
        pos = -1;
        step = 1;
    }

    if (p->next == NULL) {
        return NULL;
    }

    while (i < str->len) {
        pos += step;
        index = str->data[pos];
        if (index < 0 || index >= NGX_TRIE_KIND) {
            index = 0;
        }

        /* 如果查找失败 */
        while (p->next[index] == NULL) {
            if (p == root) {
                break;
            }
            p = p->search_clue;    /* 走线索 */
        }

        p = p->next[index];
        p = p == NULL ? root : p;

        /* 匹配完成 */
        if (p->key) {
            value = p->value;
            *version_pos = pos + p->key;
            if (!p->greedy) {
                return value;
            }
            p = root;
        }

        i++;  /* 从str的下一个字符开始匹配 */
    }

    return value;
}

介绍到这里基本就都完结了呢. 真心学习到了很多东西

Comments