网络可编程数据面——DPDK Graph Library

7470阅读 0评论2020-07-11 lvyilong316
分类:LINUX

网络可编程数据面——DPDK Graph Library

——lvyilong316

Graph libraryDPDK 20.05版本引入的新特性,最近抽时间把其中关键代码看了一遍,主要希望看下其实现思路是否有可以借鉴的东西。下面将其大概实现进行了总结。

DPDK中的Graph架构将数据的处理抽象成了nodelink构成的graph,这种思想并不是新东西,VPP就采用了类似的思路实现,只不过DPDK这里讲graph思想实现为了一个lib框架,提供了graph的创建,删除,查找,node cloneedge updateadge shrink等操作,可供上层应用基于此框架开发。

这种graph架构有什么优势呢?首先graph架构将报文处理的相似逻辑抽象同样到一个node中,这样减少了报文处理过程中的I cacheD cache miss;其次提高了报文处理逻辑的复用性,同样的处理逻辑抽象为node可以在graph路径中多次出现复用;其次提高了报文处理模块的灵活性,通过不同node的任意顺序组合完成不同的报文处理流程。

node的构成

   Graph lib中的node构成如下图所示,由以下几部分构成:

(1)process:这是一个callback function,是node的核心处理流程,包含了数据处理的实现,通过rte_graph_walk()函数遍历每个node,然后调用每个nodeprocess函数实现数据的处理,如果数据处理完成后需要交给下一个node处理,则需要在其中调用rte_node_enqueue*()函数。

(2)initnode的初始化函数,在rte_graph_create()函数创建graph时会调用各个nodeinit函数;

(3)fininode的析构函数,在rte_graph_destroy()函数销毁graph时会调用各个nodefini函数;

(4)Context memory:存放当前node所需的私有信息的内存空间,processinitfini等函数都可能会用到;

(5)nb_edges:和当前node关联的边数;

(6)next_node[]:存放当前node的邻居节点,由于graph是一个有向图,所以这里next_node[]其实存放是其下游的邻居节点;

内置node类型

用户可以根据自身需求实现注册自己的node节点,dpdk中的rte_node库中也实现了一些内置的基础node。下面我们以ethdev_tx node为例分析一下一个node的定义和注册过程。

ethdev_tx

   ethdev_tx node定义了网卡port的收包行为。在rte_node中每个node的定义是通过注册rte_node_register信息完成的,rte_node_register包含了node所需要的基本信息,包括其process处理函数, ethdev_txrte_node_register如下所示:

点击(此处)折叠或打开

  1. static struct rte_node_register ethdev_tx_node_base = {
  2.     .process = ethdev_tx_node_process,
  3.     .name = "ethdev_tx",

  4.     .init = ethdev_tx_node_init,

  5.     .nb_edges = ETHDEV_TX_NEXT_MAX,
  6.     .next_nodes = {
  7.         [ETHDEV_TX_NEXT_PKT_DROP] = "pkt_drop",
  8.     },
  9. };

   之后通过RTE_NODE_REGISTER进行注册:

RTE_NODE_REGISTER(ethdev_tx_node_base);

   RTE_NODE_REGISTER的实现如下,最终调用__rte_node_register

点击(此处)折叠或打开

  1. #define RTE_NODE_REGISTER(node) \
  2.     RTE_INIT(rte_node_register_##node) \
  3.     { \
  4.         node.parent_id = RTE_NODE_ID_INVALID; \
  5.         node.id = __rte_node_register(&node); \
  6.     }

__rte_node_register的主要工作就是分配struct node结构和对应的nodeid,并将其加入到全局链表node_list中。

点击(此处)折叠或打开

  1. rte_node_t
  2. __rte_node_register(const struct rte_node_register *reg)
  3. {
  4.     struct node *node;
  5.     rte_edge_t i;
  6.     size_t sz;

  7.     graph_spinlock_lock();

  8.     sz = sizeof(struct node) + (reg->nb_edges * RTE_NODE_NAMESIZE);
  9.     node = calloc(1, sz);
  10.     if (node == NULL) {
  11.         rte_errno = ENOMEM;
  12.         goto fail;
  13.     }

  14.     /* Initialize the node */
  15.     if (rte_strscpy(node->name, reg->name, RTE_NODE_NAMESIZE) < 0) {
  16.         rte_errno = E2BIG;
  17.         goto free;
  18.     }
  19.     node->flags = reg->flags;
  20.     node->process = reg->process;
  21.     node->init = reg->init;
  22.     node->fini = reg->fini;
  23.     node->nb_edges = reg->nb_edges;
  24.     node->parent_id = reg->parent_id;
  25.     for (i = 0; i < reg->nb_edges; i++) {
  26.         if (rte_strscpy(node->next_nodes[i], reg->next_nodes[i],
  27.                 RTE_NODE_NAMESIZE) < 0) {
  28.             rte_errno = E2BIG;
  29.             goto free;
  30.         }
  31.     }

  32.     node->id = node_id++;

  33.     /* Add the node at tail */
  34.     STAILQ_INSERT_TAIL(&node_list, node, next);
  35.     graph_spinlock_unlock();

  36.     return node->id;
  37. free:
  38.     free(node);
  39. fail:
  40.     graph_spinlock_unlock();
  41.     return RTE_NODE_ID_INVALID;
  42. }

对应数据结构如下所示:

下面我们看一下ethdev_tx nodeprocess函数,即ethdev_tx_node_process

点击(此处)折叠或打开

  1. static uint16_t
  2. ethdev_tx_node_process(struct rte_graph *graph, struct rte_node *node,
  3.          void **objs, uint16_t nb_objs)
  4. {
  5.     ethdev_tx_node_ctx_t *ctx = (ethdev_tx_node_ctx_t *)node->ctx;
  6.     uint16_t port, queue;
  7.     uint16_t count;

  8.     /* Get Tx port id */
  9.     port = ctx->port;
  10.     queue = ctx->queue;

  11.     count = rte_eth_tx_burst(port, queue, (struct rte_mbuf **)objs,
  12.                  nb_objs);

  13.     /* Redirect unsent pkts to drop node */
  14.     if (count != nb_objs) {
  15.         rte_node_enqueue(graph, node, ETHDEV_TX_NEXT_PKT_DROP,
  16.                  &objs[count], nb_objs - count);
  17.     }

  18.     return count;
  19. }

这里我们也可以清楚的看到node->ctx的作用,其中process所需的portqueue信息都存放在其中。

ethdev_rx

ethdev_rx是处理网卡port接收报文逻辑的node,其rte_node_register定义如下所示:

点击(此处)折叠或打开

  1. static struct rte_node_register ethdev_rx_node_base = {
  2.     .process = ethdev_rx_node_process,
  3.     .flags = RTE_NODE_SOURCE_F,
  4.     .name = "ethdev_rx",

  5.     .init = ethdev_rx_node_init,

  6.     .nb_edges = ETHDEV_RX_NEXT_MAX,
  7.     .next_nodes = {[ETHDEV_RX_NEXT_IP4_LOOKUP] = "ip4_lookup"},
  8. };

ethdev_tx不同的是这里多了一个 RTE_NODE_SOURCE_Fflag,这个flag标识了这是一个source node,在DPDK graph中,source node是一种特殊的node,作为这个graph的起点,当然一个graph可以有多个source node。另外注意这里静态定义这个node的邻居nodeip4_lookup

我们再来看一下其process处理函数ethdev_rx_node_process

点击(此处)折叠或打开

  1. static __rte_always_inline uint16_t
  2. ethdev_rx_node_process(struct rte_graph *graph, struct rte_node *node,
  3.          void **objs, uint16_t cnt)
  4. {
  5.     ethdev_rx_node_ctx_t *ctx = (ethdev_rx_node_ctx_t *)node->ctx;
  6.     uint16_t n_pkts = 0;

  7.     RTE_SET_USED(objs);
  8.     RTE_SET_USED(cnt);

  9.     n_pkts = ethdev_rx_node_process_inline(graph, node, ctx->port_id,
  10.                      ctx->queue_id);
  11.     return n_pkts;
  12. }

   其中主要调用了 ethdev_rx_node_process_inline

点击(此处)折叠或打开

  1. static __rte_always_inline uint16_t
  2. ethdev_rx_node_process_inline(struct rte_graph *graph, struct rte_node *node,
  3.              uint16_t port, uint16_t queue)
  4. {
  5.     uint16_t count, next_index = ETHDEV_RX_NEXT_IP4_LOOKUP;

  6.     /* Get pkts from port */
  7.     count = rte_eth_rx_burst(port, queue, (struct rte_mbuf **)node->objs,
  8.                  RTE_GRAPH_BURST_SIZE);

  9.     if (!count)
  10.         return 0;
  11.     node->idx = count;
  12.     /* Enqueue to next node */
  13.     rte_node_next_stream_move(graph, node, next_index);

  14.     return count;
  15. }

   通过rte_eth_rx_burst接收到报文后,再通过rte_node_next_stream_move将报文交给下一个节点处理,而下一个节点已经被指定好为next_index = ETHDEV_RX_NEXT_IP4_LOOKUP,即ip4_lookup node

ip4_rewrite

这个nodeIPv4重写节点:包含IPv4和以太网报文头的重写功能,可以通过rte_node_ip4_rewrite_add 函数进行配置。

ip4_lookup

IPv4查找节点:由IPv4提取和LPM查找节点组成。路由表可以由应用程序通过rte_node_ip4_route_add 函数进行配置。

点击(此处)折叠或打开

  1. static struct rte_node_register ip4_lookup_node = {
  2.     .process = ip4_lookup_node_process,
  3.     .name = "ip4_lookup",

  4.     .init = ip4_lookup_node_init,

  5.     .nb_edges = RTE_NODE_IP4_LOOKUP_NEXT_MAX,
  6.     .next_nodes = {
  7.         [RTE_NODE_IP4_LOOKUP_NEXT_REWRITE] = "ip4_rewrite",
  8.         [RTE_NODE_IP4_LOOKUP_NEXT_PKT_DROP] = "pkt_drop",
  9.     },
  10. };

   其邻居节点有ip4_rewritepkt_drop

null

空节点:定义了节点通用结构的skeleton节点。

pkt_drop

数据包丢弃节点:将各自mempool中接受到的数据包释放。

node添加到graph的几种方法

将一个node加入图中link起来有以下几种方法:

1. node注册的时候提供并初始化next_node[]数组,上文提过next_node[]中用来存放当前节点的下游节点对象的,所以也体现了节点间的link关系,这种方法也称为静态方法;

2. 使用rte_node_edge_get(), rte_node_edge_update(), rte_node_edge_shrink() 几个函数来在运行时更新 next_nodes[],但是这种方式需要在graph创建之前;

3. 使用rte_node_clone()方法clone一个已有节点,这种方法适用于将系统中更多CPU加入到转发逻辑的扩展操作,clone操作会复制原有节点的所有属性,但是不会复制"context memory"context memory中包含了portqueue pair信息。

创建graph对象

graph是通过rte_graph_create函数创建的,这个函数也是理解DPDK graph架构的核心。

点击(此处)折叠或打开

  1. rte_graph_t
  2. rte_graph_create(const char *name, struct rte_graph_param *prm)
  3. {
  4.     rte_node_t src_node_count;
  5.     struct graph *graph;
  6.     const char *pattern;
  7.     uint16_t i;

  8.     graph_spinlock_lock();

  9.     /* Check arguments sanity */
  10.     if (prm == NULL)
  11.         SET_ERR_JMP(EINVAL, fail, "Param should not be NULL");

  12.     if (name == NULL)
  13.         SET_ERR_JMP(EINVAL, fail, "Graph name should not be NULL");

  14.     /* Check for existence of duplicate graph */
  15.     /* 在graph全局链表中查找看是否已有同名的graph */
  16.     STAILQ_FOREACH(graph, &graph_list, next)
  17.         if (strncmp(name, graph->name, RTE_GRAPH_NAMESIZE) == 0)
  18.             SET_ERR_JMP(EEXIST, fail, "Found duplicate graph %s",
  19.                  name);

  20.     /* Create graph object */
  21.     /* 分配graph结构 */
  22.     graph = calloc(1, sizeof(*graph));
  23.     if (graph == NULL)
  24.         SET_ERR_JMP(ENOMEM, fail, "Failed to calloc graph object");

  25.     /* Initialize the graph object */
  26.     STAILQ_INIT(&graph->node_list);
  27.     if (rte_strscpy(graph->name, name, RTE_GRAPH_NAMESIZE) < 0)
  28.         SET_ERR_JMP(E2BIG, free, "Too big name=%s", name);

  29.     /* Expand node pattern and add the nodes to the graph */
  30.     for (i = 0; i < prm->nb_node_patterns; i++) {
  31.         pattern = prm->node_patterns[i];
  32.         if (expand_pattern_to_node(graph, pattern))
  33.             goto graph_cleanup;
  34.     }

  35.     /* Go over all the nodes edges and add them to the graph */
  36.     if (graph_node_edges_add(graph))
  37.         goto graph_cleanup;

  38.     /* Update adjacency list of all nodes in the graph */
  39.     if (graph_adjacency_list_update(graph))
  40.         goto graph_cleanup;

  41.     /* Make sure at least a source node present in the graph */
  42.     src_node_count = graph_src_nodes_count(graph);
  43.     if (src_node_count == 0)
  44.         goto graph_cleanup;

  45.     /* Make sure no node is pointing to source node */
  46.     if (graph_node_has_edge_to_src_node(graph))
  47.         goto graph_cleanup;

  48.     /* Don't allow node has loop to self */
  49.     if (graph_node_has_loop_edge(graph))
  50.         goto graph_cleanup;

  51.     /* Do BFS from src nodes on the graph to find isolated nodes */
  52.     if (graph_has_isolated_node(graph))
  53.         goto graph_cleanup;

  54.     /* Initialize graph object */
  55.     graph->socket = prm->socket_id;
  56.     graph->src_node_count = src_node_count;
  57.     graph->node_count = graph_nodes_count(graph);
  58.     graph->id = graph_id;

  59.     /* Allocate the Graph fast path memory and populate the data */
  60.     if (graph_fp_mem_create(graph))
  61.         goto graph_cleanup;

  62.     /* Call init() of the all the nodes in the graph */
  63.     if (graph_node_init(graph))
  64.         goto graph_mem_destroy;

  65.     /* All good, Lets add the graph to the list */
  66.     graph_id++;
  67.     STAILQ_INSERT_TAIL(&graph_list, graph, next);

  68.     graph_spinlock_unlock();
  69.     return graph->id;

  70. graph_mem_destroy:
  71.     graph_fp_mem_destroy(graph);
  72. graph_cleanup:
  73.     graph_cleanup(graph);
  74. free:
  75.     free(graph);
  76. fail:
  77.     graph_spinlock_unlock();
  78.     return RTE_GRAPH_ID_INVALID;
  79. }

   我将此函数的实现展开并添加注释得到下图。内嵌函数不再单独说明。

   这个函数的核心是创建并初始化struct graph结构,包括其中的node和邻居关系,然后将struct graph添加到全局链表graph_list中。相关数据结构如下图所示。注意struct graph底层有对应的struct rte_graphstruct node底层有对应的struct rte_node

graph的遍历

    graph创建后需要让整个graph运行起来,这个工作主要是由rte_graph_walk函数完成的。

点击(此处)折叠或打开

  1. /**
  2.  * Perform graph walk on the circular buffer and invoke the process function
  3.  * of the nodes and collect the stats.
  4.  *
  5.  * @param graph
  6.  * Graph pointer returned from rte_graph_lookup function.
  7.  *
  8.  * @see rte_graph_lookup()
  9.  */
  10. __rte_experimental
  11. static inline void
  12. rte_graph_walk(struct rte_graph *graph)
  13. {
  14.     const rte_graph_off_t *cir_start = graph->cir_start;
  15.     const rte_node_t mask = graph->cir_mask;
  16.     uint32_t head = graph->head;
  17.     struct rte_node *node;
  18.     uint64_t start;
  19.     uint16_t rc;
  20.     void **objs;

  21.     /*
  22.      * Walk on the source node(s) ((cir_start - head) -> cir_start) and then
  23.      * on the pending streams (cir_start -> (cir_start + mask) -> cir_start)
  24.      * in a circular buffer fashion.
  25.      *
  26.      *    +-----+ <= cir_start - head [number of source nodes]
  27.      *    | |
  28.      *    | ... | <= source nodes
  29.      *    | |
  30.      *    +-----+ <= cir_start [head = 0] [tail = 0]
  31.      *    | |
  32.      *    | ... | <= pending streams
  33.      *    | |
  34.      *    +-----+ <= cir_start + mask
  35.      */
  36.     while (likely(head != graph->tail)) {
  37.         node = RTE_PTR_ADD(graph, cir_start[(int32_t)head++]);
  38.         RTE_ASSERT(node->fence == RTE_GRAPH_FENCE);
  39.         objs = node->objs;
  40.         rte_prefetch0(objs);

  41.         if (rte_graph_has_stats_feature()) { /*如果graph开启了统计功能*/
  42.             start = rte_rdtsc();
  43.             rc = node->process(graph, node, objs, node->idx);
  44.             node->total_cycles += rte_rdtsc() - start;
  45.             node->total_calls++;
  46.             node->total_objs += rc;
  47.         } else {
  48.             node->process(graph, node, objs, node->idx);
  49.         }
  50.         node->idx = 0;
  51.         head = likely((int32_t)head > 0) ? head & mask : head;
  52.     }
  53.     graph->tail = 0;
  54. }

   要理解这个过程需要再次拿出来rte_graph的结构,如下图所示。该函数从graph->head开始遍历graph的每个node,并调用其process函数。而graph->headgraph创建时被做了如下初始化:

graph->head = (int32_t)-_graph->src_node_count; 

也就是负的src_node_count,根据rte_graph的结构图可知cir_start[head]指向的就是graphsource node的起始位置。所以函数的整个过程是先从cir_start[-src_node_count]遍历到cir_start[0]将所有的source node遍历一遍,然后再从cir_start[0]遍历到cir_start[mask]graph中的所有node遍历一遍。为什么要先遍历source node呢?这也很容易理解,source node一般负责原始数据的处理,如ethdev_rx负责将报文从网卡收上来,如果没有source node后续node也就没有报文可处理。

graph的统计

DPDKgraph也带了统计功能,可以统计graph的每个node被调用了多少次,每个node被调用消耗了多少个cycle。可以使用rte_graph_cluster_stats_get()函数获取,获取信息如下图所示。

l3fwd-graph示例分析

l3fwd-graphDPDK使用graph架构对l3fwd example的重实现,也是我们学习使用graph的很好的例子。

l3fwd-graph通过DPDK中的ethdev_rx, ip4_lookup, ip4_rewrite, ethdev_tx,pkt_drop等内置node,在每个转发core上创建一个graph来实现三层的转发功能。

启动参数如下所示:

点击(此处)折叠或打开

  1. ./build/l3fwd-graph -l 1,2 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)"

其中--config (port,queue,lcore)[,(port,queue,lcore)]表示queueportcore三者的绑定关系。下面分段介绍这个例子的实现。

Graph Node Pre-Init Configuration

   和正常的DPDK程序一样,l3fwd-graph启动会先调用rte_eth_dev_configure对每个port进行配置,然后调用rte_eth_tx_queue_setup配置tx queue,调用rte_eth_rx_queue_setup配置每个队列的tx queue,然后调用rte_eth_dev_start对每个port进行start。这些通用逻辑我们就不再展开,这里只关注和graph相关的。

   其中第一点是在rte_eth_tx_queue_setuprte_eth_rx_queue_setup后调用rte_node_eth_config。这个函数分配对ethdev_rxethdev_tx 两个node进行 clone ,其clone出来的节点name分别为 ethdev_rx-X-Yethdev_tx-X,其中XY分别代表port idqueue id,所以ethdev_tx是每个port一个,其queue id通过graph id指定,而ethdev_rx是每个port-queue映射对应一个。

Graph Initialization

在每个转发面上创建一个graph对象,每个core根据配置txrx的能力包含对应的ethdev_rxethdev_tx node

点击(此处)折叠或打开

  1. static const char *const default_patterns[] = {
  2.     "ip4*",
  3.     "ethdev_tx-*",
  4.     "pkt_drop",};const char **node_patterns;uint16_t nb_pattern;
  5. /* ... */
  6. /* Create a graph object per lcore with common nodes and * lcore specific nodes based on application arguments */
  7. nb_patterns = RTE_DIM(default_patterns);
  8. node_patterns = malloc((MAX_RX_QUEUE_PER_LCORE + nb_patterns) *sizeof(*node_patterns));
  9. memcpy(node_patterns, default_patternsnb_patterns * sizeof(*node_patterns));
  10. memset(&graph_conf, 0, sizeof(graph_conf));

  11. /* Common set of nodes in every lcore's graph object */
  12. graph_conf.node_patterns = node_patterns;
  13. for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
  14.     /* ... */

  15.     /* Skip graph creation if no source exists */
  16.     if (!qconf->n_rx_queue)
  17.         continue;

  18.     /* Add rx node patterns of this lcore based on --config */
  19.     for (i = 0; i < qconf->n_rx_queue; i++) {
  20.         graph_conf.node_patterns[nb_patterns + i] =
  21.                             qconf->rx_queue_list[i].node_name;
  22.     }

  23.     graph_conf.nb_node_patterns = nb_patterns + i;
  24.     graph_conf.socket_id = rte_lcore_to_socket_id(lcore_id);

  25.     snprintf(qconf->name, sizeof(qconf->name), "worker_%u", lcore_id);

  26.     graph_id = rte_graph_create(qconf->name, &graph_conf);

  27.     /* ... */

  28.     qconf->graph = rte_graph_lookup(qconf->name);

  29.     /* ... */
  30. }

注意:如果通过shell传递的一组节点pattern不满足他们的相互依赖关系或给定的正则表达式node pattern找不到对应nodegraph将会创建失败。

Forwarding data(Route, Next-Hop) addition

graph创建好之后就可以通过rte_node_ip4_route_add() rte_node_ip4_rewrite_add() 函数分别向ipv4_lookupipv4_rewrite node添加配置规则。

点击(此处)折叠或打开

  1. /* Add route to ip4 graph infra */for (i = 0; i < IPV4_L3FWD_LPM_NUM_ROUTES; i++) {
  2.     /* ... */

  3.     dst_port = ipv4_l3fwd_lpm_route_array[i].if_out;
  4.     next_hop = i;

  5.     /* ... */
  6.     ret = rte_node_ip4_route_add(ipv4_l3fwd_lpm_route_array[i].ip,
  7.                                  ipv4_l3fwd_lpm_route_array[i].depth, next_hop,
  8.                                  RTE_NODE_IP4_LOOKUP_NEXT_REWRITE);

  9.     /* ... */

  10.     memcpy(rewrite_data, val_eth + dst_port, rewrite_len);

  11.     /* Add next hop for a given destination */
  12.     ret = rte_node_ip4_rewrite_add(next_hop, rewrite_data,
  13.                                    rewrite_len, dst_port);

  14.     RTE_LOG(INFO, L3FWD_GRAPH, "Added route %s, next_hop %u\n",
  15.             route_str, next_hop);
  16. }

Packet Forwarding using Graph Walk

graph配置完成后就可以启动graph的转发了,其核心函数为graph_main_loop.

点击(此处)折叠或打开

  1. /* Main processing loop */
  2. static intgraph_main_loop(void *conf){
  3.     // ...

  4.     lcore_id = rte_lcore_id();
  5.     qconf = &lcore_conf[lcore_id];
  6.     graph = qconf->graph;

  7.     RTE_LOG(INFO, L3FWD_GRAPH,
  8.             "Entering main loop on lcore %u, graph %s(%p)\n", lcore_id,
  9.             qconf->name, graph);

  10.     /* Walk over graph until signal to quit */
  11.     while (likely(!force_quit))
  12.         rte_graph_walk(graph);
  13. return 0;
  14. }

即使用rte_graph_walk遍历当前core上的graph,其中rte_graph_walk我们前面已经分析过,其内部会先遍历指向graphsource node,也就是 ethdev_rx-X-Y,然后遍历graph中的其他node,这个执行nodeprocess函数。

参考文档:

上一篇:DPDK中的内存特点和IOVA
下一篇:IOMMU和VFIO概述