NCCL源码解析5——跨机channel合并
接下来我们回到代码,开始第二次AllGather的准备工作。首先分配allGather3Data用于rank间聚合channel的信息,其中allGather3Data对应的结构定义如下
点击(此处)折叠或打开
-
struct allGatherInfo *allGather3Data
-
struct allGatherInfo {
-
struct graphInfo graphInfo[NCCL_NUM_ALGORITHMS];
-
struct ncclTopoRanks topoRanks;
-
int cpuArch;
-
int cpuVendor;
- };
allGatherInfo结构包含所有算法的graph和channel信息。
点击(此处)折叠或打开
上述代码的for循环用于初始化allGather3Data[rank].graphInfo,而ncclTopoPreset函数则用于初始化allGather3Data[rank].topoRanks。首先看一下topoRanks的结构体定义:
点击(此处)折叠或打开
涉及以下信息:
l ringRecv[c]:当前rank所在的”部分 channel c ”的头节点,也可以理解为所在机器的头节点rank;
l ringSend[c]:当前rank所在的”部分 channel c ”的尾节点也可以理解为所在机器的尾节点rank;
l ringPrev[c]:当前rank所在的”部分 channel c ”中的前一个rank;
l ringNext[c]:当前rank所在的”部分 channel c ”中的后一个rank;
其中{BANNED}中国第一个rank的prev和{BANNED}最佳后一个rank的next为-1。ncclTopoPreset同时还会将comm->channels进行初始化。我们只关注Ring算法的部分,其他代码省去,注意这里{BANNED}最佳后将搜索到的环复制了一遍,即channel这里会翻倍,这里在官方issue中看到相关解释是为了进一步的并行以充分利用带宽。
点击(此处)折叠或打开
回到initTransportsRank,接着,通过bootstrapAllGather获取的所有进程间的图信息,并计算下面的数据结构。
l nodesFirstRank: 记录机器 node 上的{BANNED}中国第一个 rank 编号。
l nodesTreePatters: 记录机器 node 上的树形拓扑的模式。
l rankToNode: 记录 rank 所属的机器 node。
l localRankToRank:记录 local_rank 到全局 rank 的映射。
l localRank: 记录本进程的 local_rank。
l localRanks: 记录 local_rank 的数量。
如果所有进程中使用有不同型号或者厂商的 CPU,则给出警告信息以便于排查错误,因为在过去的实现中这种配置可能导致集合通信死锁。
点击(此处)折叠或打开
-
NCCLCHECKGOTO(bootstrapAllGather(comm->bootstrap, allGather3Data, sizeof(*allGather3Data)), ret, fail);
-
-
// Determine nNodes, firstRanks, ...
-
NCCLCHECKGOTO(ncclCalloc(&nodesFirstRank, nranks), ret, fail);
-
NCCLCHECKGOTO(ncclCalloc(&nodesTreePatterns, nranks), ret, fail);
-
NCCLCHECKGOTO(ncclCalloc(&comm->rankToNode, comm->nRanks), ret, fail);
-
for (int r=0; r<nranks; r++) {
-
int node;
-
int firstRank = allGather3Data[r].topoRanks.ringRecv[0];
-
for (node=0; node<comm->nNodes && nodesFirstRank[node] != firstRank; node++);
-
if (node == comm->nNodes) {
-
comm->nNodes++;
-
nodesFirstRank[node] = firstRank;
-
// Record tree pattern of each node as they can be different depending on sm arch
-
nodesTreePatterns[node] = allGather3Data[r].graphInfo[NCCL_ALGO_TREE].pattern;
-
}
-
comm->rankToNode[r] = node;
-
-
if (comm->cpuArch != allGather3Data[r].cpuArch &&
-
comm->cpuArch != NCCL_TOPO_CPU_ARCH_MIXED) {
-
comm->cpuArch = NCCL_TOPO_CPU_ARCH_MIXED;
-
}
-
if (comm->cpuVendor != allGather3Data[r].cpuVendor &&
-
comm->cpuVendor != NCCL_TOPO_CPU_VENDOR_MIXED) {
-
comm->cpuVendor = NCCL_TOPO_CPU_VENDOR_MIXED;
-
}
-
}
-
-
// Alert the user to the presence of mixed CPUs. In the past this has caused
-
// locks in some collective routines. This may help debug issues in the future.
-
if (rank==0) {
-
if (comm->cpuArch == NCCL_TOPO_CPU_ARCH_MIXED) {
-
INFO(NCCL_GRAPH, "CPUs with mixed architecture were detected.");
-
}
-
if (comm->cpuVendor == NCCL_TOPO_CPU_VENDOR_MIXED) {
-
INFO(NCCL_GRAPH, "CPUs with mixed vendors were detected.");
-
}
-
}
-
-
// Now that we know nNodes, alloc nodeRanks and compute localRanks for each node
-
NCCLCHECKGOTO(ncclCalloc(&comm->nodeRanks, comm->nNodes), ret, fail);
-
NCCLCHECKGOTO(ncclCalloc(&comm->rankToLocalRank, comm->nRanks), ret, fail);
-
for (int r=0; r<comm->nRanks; r++) {
-
int node = comm->rankToNode[r];
-
comm->rankToLocalRank[r] = comm->nodeRanks[node].localRanks;
-
comm->nodeRanks[node].localRanks++;
-
}
-
// Allocate ranks arrays for each node
-
for (int n=0; n<comm->nNodes; n++) {
-
NCCLCHECKGOTO(ncclCalloc(&comm->nodeRanks[n].localRankToRank, comm->nodeRanks[n].localRanks), ret, fail);
-
comm->maxLocalRanks = std::max(comm->maxLocalRanks, comm->nodeRanks[n].localRanks);
-
comm->nodeRanks[n].localRanks = 0;
-
}
-
// And fill the ranks arrays
-
for (int r=0; r<comm->nRanks; r++) {
-
int node = comm->rankToNode[r];
-
comm->nodeRanks[node].localRankToRank[comm->nodeRanks[node].localRanks++] = r;
-
}
-
comm->node = comm->rankToNode[rank];
-
comm->localRankToRank = comm->nodeRanks[comm->node].localRankToRank;
-
comm->localRank = comm->rankToLocalRank[rank];
-
comm->localRanks = comm->nodeRanks[comm->node].localRanks;
-
-
TRACE(NCCL_INIT,"hostHash[%d] %lx localRank %d localRanks %d localRank0 %d",
-
rank, comm->peerInfo[rank].hostHash, comm->localRank, comm->localRanks, comm->localRankToRank[0]);
-
if (comm->localRank == -1 || comm->localRankToRank[0] == -1 || comm->localRanks == 0) {
-
WARN("Failed to determine local ranks rank %d hostHash %lx pidHash %lx localRank %d localRanks %d localRank0 %d",
-
rank, comm->peerInfo[rank].hostHash, comm->peerInfo[rank].pidHash,
-
comm->localRank, comm->localRanks, comm->localRankToRank[0]);
-
ret = ncclInternalError;
-
goto fail;
-
}
-
-
INFO(NCCL_INIT, "comm %p rank %d nRanks %d nNodes %d localRanks %d localRank %d MNNVL %d",
- comm, rank, comm->nRanks, comm->nNodes, comm->localRanks, comm->localRank, comm->MNNVL);
接着,我们对齐所有 rank 间的配置信息。
点击(此处)折叠或打开
-
nChannelsOrig = comm->nChannels;
-
NCCLCHECKGOTO(ncclCalloc(&allTopoRanks, comm->nRanks), ret, fail);
-
for (int i=0; i<nranks; i++) {
-
allTopoRanks[i] = &allGather3Data[i].topoRanks;
-
// Make sure we align all ranks so that the tuning is consistent across ranks
-
for (int a=0; a<NCCL_NUM_ALGORITHMS; a++) {
-
graphs[a]->nChannels = std::min(allGather3Data[i].graphInfo[a].nChannels, graphs[a]->nChannels);
-
graphs[a]->sameChannels = std::min(allGather3Data[i].graphInfo[a].sameChannels, graphs[a]->sameChannels);
-
graphs[a]->bwIntra = std::min(allGather3Data[i].graphInfo[a].bwIntra, graphs[a]->bwIntra);
-
graphs[a]->bwInter = std::min(allGather3Data[i].graphInfo[a].bwInter, graphs[a]->bwInter);
-
graphs[a]->typeIntra = std::max(allGather3Data[i].graphInfo[a].typeIntra, graphs[a]->typeIntra);
-
graphs[a]->typeInter = std::max(allGather3Data[i].graphInfo[a].typeInter, graphs[a]->typeInter);
-
graphs[a]->crossNic = std::max(allGather3Data[i].graphInfo[a].crossNic, graphs[a]->crossNic);
-
}
- }
随后,由于删除了一些 channels,因此需要重新设置副本 channels。
点击(此处)折叠或打开
-
if (comm->nChannels < nChannelsOrig) {
-
// We started duplicating channels during Preset(), so we need to move the
-
// duplicated channels since we have removed some.
-
for (int i=0; i<comm->nChannels; i++) memcpy(comm->channels+comm->nChannels+i, comm->channels+nChannelsOrig+i, sizeof(struct ncclChannel));
- }
之后,设置是否支持 CollNet。
点击(此处)折叠或打开
-
// Determine CollNet support after all-gather now that we know nNodes and each node localRanks
-
if (comm->collNetSupport == 1) {
-
int collNetNodeThreshold = ncclParamCollNetNodeThreshold();
-
if (comm->nNodes < collNetNodeThreshold) {
-
INFO(NCCL_INIT, "Communicator has %d nodes which is less than CollNet node threshold %d, disabling CollNet", comm->nNodes, collNetNodeThreshold);
-
comm->collNetSupport = 0;
-
}
-
comm->collNetRegSupport = true;
-
for (int n=0; n<comm->nNodes; n++) {
-
if (comm->nodeRanks[n].localRanks > NCCL_MAX_DIRECT_ARITY+1) {
-
WARN("CollNet currently only supports up to %d GPUs per node, disabling CollNet", NCCL_MAX_DIRECT_ARITY+1);
-
comm->collNetSupport = 0;
-
break;
-
}
-
if (comm->nodeRanks[n].localRanks > 1) {
-
// As long as there is more than 1 rank on any node, we need to disable collnet reg
-
comm->collNetRegSupport = false;
-
}
-
}
- }
完成上述操作后,我们调用 ncclTopoPostset 函数合并所有的 Ring、Tree 等拓扑。例如,对于 Ring 形的拓扑,依次连接各个子环,构成完成的环。第二次AllGather结束。
点击(此处)折叠或打开
-
NCCLCHECKGOTO(ncclCalloc(&rings, nranks*MAXCHANNELS), ret, fail);
-
NCCLCHECKGOTO(ncclTopoPostset(comm, nodesFirstRank, nodesTreePatterns, allTopoRanks, rings, graphs, parent), ret, fail);
- // AllGather3 - end
如下图所示,有两台机器,每台机器上有 4 张 GPU,rank 编号为 0~7。每台机器上的 rank 构建本机器上的部分环,然后通过 ncclTopoPostset 是两个部分环首尾相连,构建完整的环。
ncclTopoPostset将所有channel的prev,next,send,recv信息打平到数组中,例如recv[0]表示{BANNED}中国第一个ring中rank0的recv是哪个rank,然后开始计算当前机器{BANNED}中国第一个rank的prev和{BANNED}最佳后一个rank的next。
点击(此处)折叠或打开