NCCL源码解析2——bootstrap网络初始化
如果我们注意本系列的{BANNED}最佳开头nccl-test的run函数中,会发现当ncclProc == 1(只有一个rank)时调用的是ncclCommInitAll,否则调用的是ncclCommInitRank。在这一部分我们讨论 ncclCommInitRank 函数,而暂时忽略 ncclCommInitAll 函数。ncclCommInitAll 通过单个函数调用,为其管理的多个 GPU 依次创建通信子,其底层也是通过调用 ncclCommInitRank 函数实现的。此外,PyTorch 也推荐使用单个进程管理单张 GPU 的模式,意味着通过 ncclCommInitRank 函数创建 ncclComm_t 结构的通信子。
ncclCommInitRank 函数使用 ncclUniqueId 数据结构、进程总数和当前进程的 rank 值初始化 newcomm 数据结构。具体流程如下:
l ncclCommInitRank
点击(此处)折叠或打开
-
ncclResult_t ncclCommInitRank(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank) {
-
// Load the CUDA driver and dlsym hooks (can fail on old drivers)
-
(void)ncclCudaLibraryInit();//函数初始化 cuda 驱动,并做一些必要的驱动版本兼容性检查。
-
-
int cudaDev;
-
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
-
CUDACHECK(cudaGetDevice(&cudaDev));
-
-
NvtxParamsCommInitRank payload{myrank, nranks, cudaDev};
-
NVTX3_FUNC_WITH_PARAMS(CommInitRank, CommInitRankSchema, payload)
-
-
NCCLCHECK(ncclCommInitRankDev(newcomm, nranks, commId, myrank, cudaDev, &config));
-
return ncclSuccess;
- }
ncclCommInitRank 首先通过 ncclCudaLibraryInit 函数初始化 cuda 驱动,并做一些必要的版本兼容性检查。
接着调用 cudaGetDevice 函数获取当前进程管理的GPU并调用 ncclCommInitRankDev 函数来创建通信子。
通信子用ncclComm_t结构表示,其实也就是struct ncclComm,其包含了 NCCL 通信所需要的所有信息,包括 NCCL 配置、通道、拓扑等信息,如下面的代码所示。
点击(此处)折叠或打开
-
struct ncclComm {
-
uint64_t startMagic;
-
struct ncclMemoryStack memPermanent, memScoped;
-
// List of destructors to run when comm is destructed
-
struct ncclDestructor* destructorHead;
-
struct ncclSharedResources* sharedRes;
-
/* map to top parent ranks. */
-
int* topParentRanks;
-
int* topParentLocalRanks;
-
struct ncclChannel channels[MAXCHANNELS];
-
struct ncclPeerInfo* peerInfo;
-
struct ncclTopoSystem* topo;
-
int netPluginLoaded;
-
ncclNet_t* ncclNet;
-
ncclNetDeviceType netDeviceType;
-
ncclCollNet_t* ncclCollNet;
-
void* bootstrap;
-
// Bitmasks for ncclTransportP2pSetup
-
uint64_t* connectSend;
-
uint64_t* connectRecv;
-
struct ncclTopoGraph graphs[NCCL_NUM_ALGORITHMS];
-
bool initAlgoChannels[NCCL_NUM_ALGORITHMS];
-
bool runtimeConn; // if dynamic connection is supported
-
int cuMemSupport;
-
uint64_t magic; // Magic number for all network communication. Not a security key -- only goal is to detect mismatches.
-
uint64_t commHash;
-
int rank; // my rank in the communicator
-
int nRanks; // number of GPUs in communicator
-
int cudaDev; // my cuda device index
-
int nvmlDev; // my nvml device index
-
int compCap; // compute capability of the GPU
-
int minCompCap, maxCompCap; // min/max compute capability in the communicator
-
int64_t busId; // my PCI bus ID in int format
-
cpu_set_t cpuAffinity; // CPU affinity of the GPU
-
int cudaArch; // matches __CUDA_ARCH__ of device
-
int cpuArch; // architecture - As defined in src/include/graph.h, e.g. x86/arm/ppc/mixed
-
int cpuVendor; // vendor - As defined in src/include/graph.h
-
int node;
-
int nNodes;
-
int localRank;
-
int localRanks;
-
int maxLocalRanks;
-
int* rankToNode;
-
int* rankToLocalRank;
-
int* localRankToRank;
-
// localRanks and localRanktoRank for all nodes
-
struct ncclNodeRanks* nodeRanks;
-
// MNNVL: Multi-Node NVLink
-
int MNNVL; // true when MNNVL is available
-
struct cliqueInfo clique; // Our MNNVL clique information
-
int cliqueRank; // Our rank within the MNNVL clique
-
bool checkPointers;
-
bool dmaBufSupport;
-
// Counter for tracking CUDA launches (P2P and collectives included)
-
uint64_t opCount;
-
// Channels for collectives
-
int nChannels; // connection nChannels
-
int collChannels; // enqueue nChannels
-
int nvlsChannels; // enqueue nChannels
-
... ...
-
// Flag to ask NCCL kernels to abort
-
uint32_t* abortFlag;
-
uint32_t* abortFlagDev;
-
int* abortFlagRefCount;
-
uint32_t* childAbortFlag;
-
uint32_t* childAbortFlagDev;
-
uint32_t destroyFlag;
-
... ...
-
uint64_t endMagic;
- };
ncclCudaLibraryInit
ncclCudaLibraryInit 函数通过调用 cudaGetDevice 函数初始化 cuda 驱动。接着获取 cuda 驱动版本,并检查是否满足版本要求。
接着ncclCudaLibraryInit 函数调用 cudaPfnFuncLoader 函数加载 cuda 启动库中的符号/函数,如 cudaDeviceGet 等。
点击(此处)折叠或打开
-
ncclResult_t ncclCudaLibraryInit() {
-
pthread_once(&initOnceControl, initOnceFunc);
-
return initResult;
-
}
-
static void initOnceFunc() {
-
do {
-
const char* val = ncclGetEnv("CUDA_LAUNCH_BLOCKING");
-
ncclCudaLaunchBlocking = val!=nullptr && val[0]!=0 && !(val[0]=='0' && val[1]==0);
-
} while (0);
-
-
ncclResult_t ret = ncclSuccess;
-
int cudaDev;
-
int driverVersion;
-
CUDACHECKGOTO(cudaGetDevice(&cudaDev), ret, error); // Initialize the driver
-
-
CUDACHECKGOTO(cudaDriverGetVersion(&driverVersion), ret, error);
-
INFO(NCCL_INIT, "cudaDriverVersion %d", driverVersion);
-
-
if (driverVersion < CUDA_DRIVER_MIN_VERSION) {
-
// WARN("CUDA Driver version found is %d. Minimum requirement is %d", driverVersion, CUDA_DRIVER_MIN_VERSION);
-
// Silently ignore version check mismatch for backwards compatibility
-
goto error;
-
}
-
-
#if CUDART_VERSION >= 11030
-
if (cudaPfnFuncLoader()) {
-
WARN("CUDA some PFN functions not found in the library");
-
goto error;
-
}
-
#endif
-
-
// Determine whether we support the cuMem APIs or not
-
ncclCuMemSupported = ncclIsCuMemSupported();
-
-
initResult = ret;
-
return;
-
error:
-
initResult = ncclSystemError;
-
return;
- }
{BANNED}最佳后,调用 ncclIsCuMemSupported 判断当前驱动版本是否支持CUMEM和VMM RDMA。整体调用流程图如下。
CUMEM & VMM RDMA
上文提到了CUMEM和VMM RDMA,这里也做一下简单介绍。
CUDA 应用程序对快速和高效管理内存的需求日益增长。在 CUDA 10.2 之前,开发者可用的选项仅限于 CUDA 提供的类似 malloc 的函数抽象。CUDA 10.2 引入了一组新的虚拟内存管理 API,构建更高效的动态数据结构,并在应用程序中更好地控制 GPU 内存的使用。
在许多应用中,很难猜测初始分配应该有多大。以下代码是描述可扩展向量的简单C++类。
点击(此处)折叠或打开
-
class Vector {
-
private:
-
void *d_p;
-
size_t alloc_sz, reserve_sz;
-
public:
-
Vector() : d_p(NULL), alloc_sz(0), reserve_sz(0) {}
-
// 预留一些额外空间以加速扩展
-
CUresult reserve(size_t new_sz);
-
// 实际提交额外的内存
-
CUresult grow(size_t new_sz);
-
// 释放所有关联的资源
-
~Vector();
- };
在 CUDA 10.2 之前,在 CUDA 中实现这个概念的唯一方法是使用 cudaMalloc、cudaFree 和 cudaMemcpy,或者使用 cudaMallocManaged 和 cudaPrefetchAsync 按需提交内存。
点击(此处)折叠或打开
-
CUresult Vector::reserve(size_t new_sz) {
-
if (new_sz > reserve_sz) {
-
void *new_ptr = nullptr;
-
#ifndef USE_MANAGED_MEMORY
-
cudaMalloc(&new_ptr, new_sz);
-
#else
-
cudaMallocManaged(&new_ptr, new_sz);
-
#endif
-
cudaMemcpy(new_ptr, d_p, alloc_sz);
-
cudaFree(d_p);
-
d_p = new_ptr;
-
reserve_sz = new_sz;
-
}
-
}
-
CUresult Vector::grow(size_t new_sz) {
-
Vector::reserve(alloc_sz + new_sz);
-
#ifdef USE_MANAGED_MEMORY
-
cudaPrefetchAsync(d_p + alloc_sz, new_sz, dev);
-
#endif
-
alloc_sz += new_sz;
-
}
-
-
Vector::~Vector() {
-
if (d_p) cudaFree(d_p);
- }
尽管实现相对直接,但上述实现存在许多性能影响。
1. cudaMalloc 函数分配的内存多于扩展所需的内存。为了扩展,需要保留旧的分配并分配一个新的内存块,以容纳旧的分配和额外的空间,这大大减少了可扩展的空间。例如,如果设备只有 2GB 的内存,而已经有一个 1GB 的向量,那么无法再扩展它,因为需要 1GB 加上需要扩展的空间。实际上,无法扩展超过一半 GPU 内存大小的向量。
2. 每个分配必须映射到所有对等上下文,即使它们在这些上下文中从未使用。
3. cudaMemcpy调用增加了扩展请求的延迟,并使用了宝贵的内存带宽来复制数据。这些带宽可以更好地用于其他地方。
4. cudaFree调用在当前上下文上的所有待处理工作完成后才能继续。
使用托管内存解决了一些问题,但是使用托管内存会带来一些兼容性问题,导致其无法适用于所有应用程序。例如,cudaMallocManaged 内存不能与 CUDA 进程间通信函数一起使用,如 cudaIpc*。
CUDA 虚拟内存分配函数与 cudaMalloc 类的高级函数与很大不同。低级虚拟内存分配包括 cuMemCreate、cuMemAddressReserve 等。
回到上面 Vector 的示例。使用 CUDA 虚拟内存管理函数,可以将内存提交到虚拟地址空间的区域。如果用完了保留的空间,无需发出 cudaMemcpy 调用,也无需分配比原始请求更多的内存,只需将已有的分配重新映射到新地址即可。
点击(此处)折叠或打开
-
CUresult Vector::reserve(size_t new_sz) {
-
// 尝试在old_ptr末尾保留
-
status = cuMemAddressReserve(&new_ptr, (aligned_sz - reserve_sz), 0ULL, old_ptr + reserve_sz, 0ULL);
- }
ncclCommInitRankDev
下面继续看ncclCommInitRankDev函数,它是ncclCommInitRank中的关键,负责环形网络的建立和nccl通信子ncclComm_t的初始化,由于流程太多,这里先将前半部分,即环形网络的建立,ncclComm_t通信子的初始化放在下一小结。
ncclCommInitRankDev 针对具体的设备 cudaDev 和 rank 创建通信子 newcomm。相比于 ncclCommInitRank 函数其进一步增加了 cudaDev 参数
点击(此处)折叠或打开
-
static ncclResult_t ncclCommInitRankDev(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev, ncclConfig_t *config) {
-
ncclResult_t res = ncclSuccess;
-
ncclComm_t comm = NULL;
-
struct ncclCommInitRankAsyncJob *job = NULL;
-
const char* env = ncclGetEnv("NCCL_COMM_ID");
-
if (env && myrank == 0) {
-
INFO(NCCL_ENV, "NCCL_COMM_ID set by environment to %s", env);
-
NCCLCHECKGOTO(bootstrapCreateRoot((struct ncclBootstrapHandle*)&commId, true), res, fail);
-
}
-
-
NCCLCHECKGOTO(ncclInit(), res, fail);
-
if (ncclDebugLevel > NCCL_LOG_WARN || (ncclDebugLevel != NCCL_LOG_NONE && myrank == 0)) {
-
static pthread_once_t once = PTHREAD_ONCE_INIT;
-
pthread_once(&once, showVersion);
-
}
-
// Make sure the CUDA runtime is initialized.
-
CUDACHECKGOTO(cudaFree(NULL), res, fail);
-
-
NCCLCHECKGOTO(PtrCheck(newcomm, "CommInitRank", "newcomm"), res, fail);
-
NCCLCHECKGOTO(PtrCheck(config, "CommInitRank", "config"), res, fail);
-
if (nranks < 1 || myrank < 0 || myrank >= nranks) {
-
WARN("Invalid rank requested : %d/%d", myrank, nranks);
-
res = ncclInvalidArgument;
-
goto fail;
-
}
-
-
NCCLCHECKGOTO(ncclCalloc(&comm, 1), res, fail);
-
NCCLCHECKGOTO(ncclCalloc(&comm->abortFlag, 1), res, fail);
-
NCCLCHECKGOTO(ncclCudaHostCalloc(&comm->abortFlagDev, 1), res, fail);
-
NCCLCHECKGOTO(ncclCalloc(&comm->abortFlagRefCount, 1), res, fail);
-
comm->startMagic = comm->endMagic = NCCL_MAGIC; // Used to detect comm corruption.
-
*comm->abortFlagRefCount = 1;
-
NCCLCHECKGOTO(parseCommConfig(comm, config), res, fail);
-
/* start with ncclInternalError and will be changed to ncclSuccess if init succeeds. */
-
comm->initState = ncclInternalError;
-
*newcomm = comm;
-
-
NCCLCHECKGOTO(ncclCalloc(&job, 1), res, fail);
-
job->comm = comm;
-
job->nranks = nranks;
-
job->commId = commId; // C++ struct assignment
-
job->myrank = myrank;
-
job->cudaDev = cudaDev;
-
NCCLCHECKGOTO(ncclAsyncLaunch(&job->base, ncclCommInitRankFunc, NULL, free, comm), res, fail);
-
-
exit:
-
return ncclGroupErrCheck(res);
-
fail:
-
if (comm) {
-
free(comm->abortFlag);
-
if (comm->abortFlagDev) ncclCudaHostFree((void*)comm->abortFlagDev);
-
free(comm->abortFlagRefCount);
-
free(comm);
-
}
-
if (newcomm) *newcomm = NULL;
-
goto exit;
- }
首先,如果设置了 NCCL_COMM_ID 环境变量,并且当前进程是root 进程的话,则直接调用 bootstrapCreateRoot 函数。我们在{BANNED}中国第一部分中已经介绍了 bootstrapCreateRoot 函数,该函数在 root 进程建立监听端口,并将监听地址和端口信息赋值给 ncclUniqueId 数据结构。此外,该函数调用 bootstrapRoot 线程,等待其他rank通信,并在所有进程间建立通信环。需要说明的是,如果设置了 NCCL_COMM_ID 环境变量,root 进程在调用 ncclGetUniqueId 时会跳过 bootstrapCreateRoot 函数的调用。因此,我们需要在这个地方调用 bootstrapCreateRoot 来构建通信环。
接着ncclCommInitRankDev 函数会调用 ncclInit 函数初始化环境变量和 GdrCopy 功能,并设置 bootstrap 网络使用的网络接口,如下面的代码所示。该函数调用 initOnceFunc 函数,并通过 pthread_once 保证 initOnceFunc 在多线程环境中只执行一次。由于对于 root 进程,已经在调用 ncclGetUniqueId 函数时执行了 ncclInit 函数,因此这里不会重复执行 ncclInit 函数。而对于非root进程,这里会执行ncclInit。
点击(此处)折叠或打开
-
static ncclResult_t ncclInit() {
-
pthread_once(&initOnceControl, initOnceFunc);
-
return initResult;
- }
然后ncclCommInitRankDev 为 ncclComm_t 类型的通信子分配空间,并初始化其中的一些域。此外,调用 parseCommConfig 函数使用 config 参数初始化通信子的配置信息,主要是设置为默认值。
下面给出 config 的数据结构,用户可以通过更改 config 中的属性值更改通信子的行为。
点击(此处)折叠或打开
-
typedef struct ncclConfig_v21700 {
-
/* attributes that users should never touch. */
-
size_t size;
-
unsigned int magic;
-
unsigned int version;
-
/* attributes that users are able to customize. */
-
int blocking;
-
int cgaClusterSize;
-
int minCTAs;
-
int maxCTAs;
-
const char *netName;
-
int splitShare;
- } ncclConfig_t;
可见,这些配置信息包括 size、magic、version、blocking、cgaClusterSize、minCTAs、maxCTAs、netName 和 splitShare 等。其中,size 表示该结构体的字节大小,magic 是默认为 0xcafebeef 的魔法值,version 表示当前 NCCL 的版本。对于这几个字段,由 NCCL 自动赋值,用户不要更改。此外,其它几个字段的解释如下:
l netName 标识使用的网络名称,由 NCCL_NET 环境变量控制,其值必须与所使用的 NCCL 网络的名称完全匹配且不区分大小写。内部网络名称为 "IB" 和 "Socket" (TCP/IP sockets)。外部网络插件定义它们自己的名称。默认值未定义。
l blocking 控制是否允许 NCCL 调用阻塞,这包括对 NCCL 的所有调用,包括初始化/终止函数以及由于发送/接收调用的连接懒初始化而可能阻塞的通信函数,由 NCCL_COMM_BLOCKING 环境变量控制。默认地,通信子是阻塞的。
l cgaClusterSize 设置 CUDA Cooperative Group Array (CGA) 集群大小,由 NCCL_CGA_CLUSTER_SIZE 环境变量控制。将此值设置为非零会使 NCCL 启动通信核时相应地设置集群维度属性。默认地,NCCL 会将此值设置为 4。
l minCTAs 设置 NCCL 应使用的{BANNED}最佳小 CTA 数量,由 NCCL_MIN_CTAS 环境变量控制。默认地,NCCL 将此值设置为 1。
l maxCTAs 设置 NCCL 应使用的{BANNED}最佳大 CTA 数量,由 NCCL_MAX_CTAS 环境变量控制。默认地,NCCL 将此值设置为{BANNED}最佳大通道值 32。
l split 控制分割共享资源,由 COMM_SPLIT_SHARE_RESOURCES 环境变量控制。默认地,NCCL 将此值设置为 0,表示不分割共享资源。
随后,检查环境变量是否包含这些配置,如果包括则用环境变量中的值覆盖它们。ncclCommInitRankDev 函数还会在 root 进程上显示 NCCL 版本信息。
{BANNED}最佳后,异步调用 ncclCommInitRankFunc 函数继续完成剩余的初始化操作。下面看一下ncclCommInitRankDev 的完整流程。
l ncclCommInitRankFunc
下面看一下ncclCommInitRankFunc函数,ncclCommInitRankFunc 函数继续完成剩余的通信子初始化操作。
点击(此处)折叠或打开
-
static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) {
-
struct ncclCommInitRankAsyncJob* job = (struct ncclCommInitRankAsyncJob*)job_;
-
ncclComm_t comm = job->comm;
-
ncclResult_t res = ncclSuccess;
-
int archMajor, archMinor;
-
size_t maxLocalSizeBytes = 0;
-
int cudaDev = job->cudaDev;
-
int* parentRanks = NULL;
-
int cudaArch;
-
uint64_t timers[TIMERS_INIT_COUNT];
-
-
timers[TIMER_INIT_TOTAL] = clockNano();
-
CUDACHECKGOTO(cudaSetDevice(cudaDev), res, fail);
-
CUDACHECKGOTO(cudaDeviceGetAttribute(&archMajor, cudaDevAttrComputeCapabilityMajor, cudaDev), res, fail);
-
CUDACHECKGOTO(cudaDeviceGetAttribute(&archMinor, cudaDevAttrComputeCapabilityMinor, cudaDev), res, fail);
-
cudaArch = 100*archMajor + 10*archMinor;
-
-
timers[TIMER_INIT_KERNELS] = clockNano();
-
NCCLCHECK(ncclInitKernelsForDevice(cudaArch, &maxLocalSizeBytes));
-
// Set the maximum kernel stack size of all kernels to avoid
-
// a CUDA memory reconfig on load (c.f. NVSHMEM issue)
-
if (maxLocalSizeBytes > 0 && ncclParamSetStackSize() == 1) {
-
TRACE(NCCL_INIT, "Setting cudaLimitStackSize to %zu", maxLocalSizeBytes);
-
CUDACHECKIGNORE(cudaDeviceSetLimit(cudaLimitStackSize, maxLocalSizeBytes));
-
}
-
timers[TIMER_INIT_KERNELS] = clockNano() - timers[TIMER_INIT_KERNELS];
-
-
timers[TIMER_INIT_BOOTSTRAP] = clockNano();
-
if (job->parent) { //当前的配置下 job->parent=NULL,所以跳过if直接看else
-
NCCLCHECKGOTO(ncclCalloc(&parentRanks, job->parent->nRanks), res, fail);
-
NCCLCHECKGOTO(commGetSplitInfo(comm, job->parent, job->color, job->key, &job->nranks, &job->myrank, parentRanks), res, fail);
-
// Negative color does not create a new comm object. We needed to take part in the allgather, but we're done now.
-
if (job->color == NCCL_SPLIT_NOCOLOR) goto exit;
-
snprintf((char*)&job->commId, sizeof(job->commId), "%016lx-%d", job->parent->commHash, job->color);
-
NCCLCHECKGOTO(commAlloc(comm, job->parent, job->nranks, job->myrank), res, fail);
-
NCCLCHECKGOTO(bootstrapSplit((struct ncclBootstrapHandle*)&job->commId, comm, job->parent, job->color, job->key, parentRanks), res, fail);
-
} else {
-
NCCLCHECKGOTO(commAlloc(comm, NULL, job->nranks, job->myrank), res, fail);
-
NCCLCHECKGOTO(bootstrapInit((struct ncclBootstrapHandle*)&job->commId, comm), res, fail);
-
}
-
timers[TIMER_INIT_BOOTSTRAP] = clockNano() - timers[TIMER_INIT_BOOTSTRAP];
-
-
comm->cudaArch = cudaArch;
-
comm->commHash = getHash(job->commId.internal, NCCL_UNIQUE_ID_BYTES);
-
-
if (job->parent) { //当前的配置下 job->parent=NULL,所以跳过if直接看else
-
INFO(NCCL_INIT,"ncclCommSplit comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx parent %p color %d key %d commId 0x%llx - Init START",
-
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, job->parent, job->color, job->key, (unsigned long long)hashUniqueId(job->commId));
-
} else {
-
INFO(NCCL_INIT,"ncclCommInitRank comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx commId 0x%llx - Init START",
-
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, (unsigned long long)hashUniqueId(job->commId));
-
}
-
-
NCCLCHECKGOTO(initTransportsRank(comm, job->parent, timers), res, fail);
-
-
NCCLCHECKGOTO(ncclTunerPluginLoad(comm), res, fail);
-
if (comm->tuner) {
-
NCCLCHECK(comm->tuner->init(comm->nRanks, comm->nNodes, ncclDebugLog, &comm->tunerContext));
-
}
-
-
// update communicator state
-
comm->initState = ncclSuccess;
-
timers[TIMER_INIT_TOTAL] = clockNano() - timers[TIMER_INIT_TOTAL];
-
-
// Trace this call for replay tool
-
if (job->parent) {
-
/* unlink child abort flag. */
-
__atomic_store_n(&job->parent->childAbortFlag, NULL, __ATOMIC_RELEASE);
-
TRACE_CALL("ncclCommSplit(%p, %d, %d, %p, %d, %d)",
-
job->parent, job->color, job->key, comm, comm->rank, comm->nRanks);
-
} else {
-
TRACE_CALL("ncclCommInitRank(%p, %d, 0x%llx, %d, %d)",
-
comm, comm->nRanks, (unsigned long long)hashUniqueId(job->commId), comm->rank, comm->cudaDev);
-
}
-
-
if (job->parent) {
-
INFO(NCCL_INIT,"ncclCommSplit comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx parent %p color %d key %d commId 0x%llx - Init COMPLETE",
-
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, job->parent, job->color, job->key, (unsigned long long)hashUniqueId(job->commId));
-
} else {
-
INFO(NCCL_INIT,"ncclCommInitRank comm %p rank %d nranks %d cudaDev %d nvmlDev %d busId %lx commId 0x%llx - Init COMPLETE",
-
comm, comm->rank, comm->nRanks, comm->cudaDev, comm->nvmlDev, comm->busId, (unsigned long long)hashUniqueId(job->commId));
-
}
-
INFO(NCCL_INIT|NCCL_PROFILE,"Init timings: rank %d nranks %d total %.2f (kernels %.2f, bootstrap %.2f, allgathers %.2f, topo %.2f, graphs %.2f, connections %.2f, rest %.2f)", comm->rank, comm->nRanks, timers[TIMER_INIT_TOTAL]/1e9,
-
timers[TIMER_INIT_KERNELS]/1e9, timers[TIMER_INIT_BOOTSTRAP]/1e9, timers[TIMER_INIT_ALLGATHER]/1e9, timers[TIMER_INIT_TOPO]/1e9, timers[TIMER_INIT_GRAPHS]/1e9, timers[TIMER_INIT_CONNECT]/1e9,
-
(timers[TIMER_INIT_TOTAL]-timers[TIMER_INIT_KERNELS]-timers[TIMER_INIT_BOOTSTRAP]-timers[TIMER_INIT_ALLGATHER]-timers[TIMER_INIT_TOPO]-timers[TIMER_INIT_GRAPHS]-timers[TIMER_INIT_CONNECT])/1e9);
-
exit:
-
if (job->newcomm) {
-
/* assign it to user pointer. */
-
__atomic_store_n(job->newcomm, comm, __ATOMIC_RELEASE);
-
}
-
free(parentRanks);
-
return res;
-
fail:
-
comm->initState = res;
-
goto exit;
- }
其对应处理流程图如下所示
首先,调用 cudaSetDevice 设置当前设备,获取当前设备的算力和版本信息。
接着,调用 ncclInitKernelsForDevice 获取所有 CUDA kernels 中{BANNED}最佳大栈大小,并在设置环境变量 NCCL_SET_STACK_SIZE 时调用 cudaDeviceSetLimit 函数设置{BANNED}最佳大栈大小,以避免由于 CUDA 显存重配置导致的 hang 问题。这里,CUDA kernels 包含 NCCL 支持的所有算子,包括 Broadcast/Reduce/AllGather/ReduceScatter/AllReduce 等。
当前的配置下 job->parent=NULL ,所以我们先跳过相关代码。
然后,调用 commAlloc 继续初始化通信子结构体信息,包括初始化 memPermanent 和 memScoped 数据结构、本设备的 rank、设备总数和当前设备等。
l commAlloc
点击(此处)折叠或打开
-
static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, int ndev, int rank) {
-
if (ndev < 1) {
-
WARN("invalid device count (%d) requested", ndev);
-
return ncclInvalidArgument;
-
}
-
if (rank >= ndev || rank < 0) {
-
WARN("rank %d exceeds ndev=%d", rank, ndev);
-
return ncclInvalidArgument;
-
}
-
-
ncclMemoryStackConstruct(&comm->memPermanent);
-
ncclMemoryStackConstruct(&comm->memScoped);
-
comm->destructorHead = nullptr;
-
comm->rank = rank;
-
comm->nRanks = ndev;
-
-
NCCLCHECK(ncclNetPluginLoad(comm));
-
NCCLCHECK(ncclNetInit(comm));
-
INFO(NCCL_INIT, "Using network %s", comm->ncclNet->name);
-
-
if (parent && parent->config.splitShare) {
-
if (parent->ncclNet != comm->ncclNet) {
-
WARN("Split shares resources, but parent comm netName %s is different from child comm netName %s", parent->ncclNet->name, comm->ncclNet->name);
-
return ncclInvalidUsage;
-
}
-
}
-
// Try to create a CUDA object right away. If there is something wrong with
-
// the device we're on (failure cause #1) , better know it early.
-
CUDACHECK(cudaGetDevice(&comm->cudaDev));
-
-
NCCLCHECK(getBusId(comm->cudaDev, &comm->busId));
-
nvmlDevice_t nvmlDev;
-
char busId[NVML_DEVICE_PCI_BUS_ID_BUFFER_SIZE];
-
NCCLCHECK(int64ToBusId(comm->busId, busId));
-
NCCLCHECK(ncclNvmlDeviceGetHandleByPciBusId(busId, &nvmlDev));
-
NCCLCHECK(ncclNvmlDeviceGetIndex(nvmlDev, (unsigned int*)&comm->nvmlDev));
-
-
comm->compCap = ncclCudaCompCap();
-
TRACE(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx compCap %d", comm, rank, ndev, comm->cudaDev, comm->busId, comm->compCap);
-
-
comm->checkPointers = ncclParamCheckPointers() == 1 ? true : false;
-
comm->dmaBufSupport = (dmaBufSupported(comm) == ncclSuccess) ? true : false;
-
-
comm->collNetSupport = 0;
-
memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix));
-
-
ncclMemoryPoolConstruct(&comm->memPool_ncclKernelPlan);
-
ncclMemoryPoolConstruct(&comm->memPool_ncclProxyOp);
-
-
comm->groupNext = reinterpret_cast<struct ncclComm*>(0x1);
-
comm->preconnectNext = reinterpret_cast<struct ncclComm*>(0x1);
-
-
static_assert(MAXCHANNELS <= sizeof(*comm->connectSend)*8, "comm->connectSend must have enough bits for all channels");
-
static_assert(MAXCHANNELS <= sizeof(*comm->connectRecv)*8, "comm->connectRecv must have enough bits for all channels");
-
NCCLCHECK(ncclCalloc(&comm->connectSend, comm->nRanks));
-
NCCLCHECK(ncclCalloc(&comm->connectRecv, comm->nRanks));
-
-
// Mark channels as non initialized.
-
for (int c=0; c < MAXCHANNELS; c++) comm->channels[c].id = -1;
-
-
if (parent == NULL || !parent->config.splitShare) {
-
struct ncclSharedResources* sharedRes = NULL;
-
NCCLCHECK(ncclCalloc(&sharedRes, 1));
-
/* most of attributes are assigned later in initTransportsRank(). */
-
sharedRes->owner = comm;
-
sharedRes->tpNRanks = comm->nRanks;
-
NCCLCHECK(ncclCalloc(&sharedRes->tpRankToLocalRank, comm->nRanks));
-
NCCLCHECK(ncclStrongStreamConstruct(&sharedRes->deviceStream));
-
NCCLCHECK(ncclStrongStreamConstruct(&sharedRes->hostStream));
-
comm->sharedRes = sharedRes;
-
sharedRes->refCount = 1;
-
} else {
-
comm->sharedRes = parent->sharedRes;
-
ncclAtomicRefCountIncrement(&parent->sharedRes->refCount);
-
}
-
-
if (comm->topParentRanks == NULL) {
-
NCCLCHECK(ncclCalloc(&comm->topParentRanks, comm->nRanks));
-
for (int i = 0; i < comm->nRanks; ++i)
-
comm->topParentRanks[i] = i;
-
}
-
-
ncclIntruQueueMpscConstruct(&comm->callbackQueue);
-
-
comm->regCache.pageSize = sysconf(_SC_PAGESIZE);
-
return ncclSuccess;
- }
commAlloc 还尝试调用 ncclNetPluginLoad 和 ncclNetInit 函数加载 NetPlugin 模块,以初始化 ncclNets 和 ncclCollNets。
l ncclNetPluginLoad
点击(此处)折叠或打开
-
ncclResult_t ncclNetPluginLoad(struct ncclComm* comm) {
-
netPluginLib = openNetPluginLib(couldNotFindNames, MAX_PLUGIN_LOAD * PATH_MAX);
-
if (netPluginLib == nullptr) {
-
goto fail;
-
}
-
ncclNets[0] = (ncclNet_v8_t*)dlsym(netPluginLib, "ncclNetPlugin_v8");
-
if (ncclNets[0] == nullptr) {
-
// Try v7 plugin
-
ncclNet_v7 = (ncclNet_v7_t*)dlsym(netPluginLib, "ncclNetPlugin_v7");
-
... ...
-
}
-
// Check for CollNet
-
ncclCollNets[0] = (ncclCollNet_v8_t*)dlsym(netPluginLib, "ncclCollNetPlugin_v8");
-
if (ncclCollNets[0] == nullptr) {
-
ncclCollNet_v7 = (ncclCollNet_v7_t*)dlsym(netPluginLib, "ncclCollNetPlugin_v7");
-
... ...
-
}
-
++netPluginRefCount;
-
netPluginStatus = netPluginLoadSuccess;
-
comm->netPluginLoaded = 1;
-
exit:
-
pthread_mutex_unlock(&netPluginLock);
-
return ncclSuccess;
- }
l ncclNetInit
点击(此处)折叠或打开
-
ncclResult_t ncclNetInit(struct ncclComm* comm) {
-
// Initialize main communication network
-
const char* netName;
-
bool ok = false;
-
netName = comm->config.netName;
-
for (int i=0; i<3; i++) {
-
if (ncclNets[i] == nullptr) continue;
-
enum ncclNetState state;
-
NCCLCHECK(netGetState(i, &state));
-
if (state != ncclNetStateEnabled) continue;
-
if (netName && strcasecmp(netName, ncclNets[i]->name) != 0) continue;
-
if (ncclSuccess != ncclNetCheckDeviceVersion(comm, ncclNets[i], 0)) {
-
// Mismatched device plugin version
-
continue;
-
}
-
comm->ncclNet = ncclNets[i];
-
ok = true;
-
if (ncclCollNets[i]) {
-
NCCLCHECK(collNetGetState(i, &state));
-
if (state == ncclNetStateEnabled) {
-
comm->ncclCollNet = ncclCollNets[i];
-
}
-
}
-
break;
-
}
-
-
return ncclSuccess;
- }
这一过程描述如下:
1. 如果设置了 NCCL_NET_PLUGIN环境变量,则尝试加载该环境为名称的动态库;
2. 如果设置了 NCCL_NET_PLUGIN 环境变量但前述过程失败,则尝试加载 libnccl-net-
3. 如果没有设置 NCCL_NET_PLUGIN 环境变量,尝试加载 libnccl-net.so 库;
4. 如果找不到 plugin,则使用内部网络 plugin。
例如,如果设置 NCCL_NET_PLUGIN=aws 那么 NCCL 会尝试加载 aws。如果无法找到 aws 库,则尝试加载 libnccl-net-aws.so。
注意到上述代码使用到了ncclCollNets[0],在nccl中有如下全局初始化:
ncclNet_t* ncclNets[3] = { nullptr, &ncclNetIb, &ncclNetSocket };
ncclCollNet_t* ncclCollNets[3] = { nullptr, nullptr, nullptr };
其中ncclNets表示nccl的网络插件,如果指定了外部插件,就会赋值给ncclCollNets[0]优先使用,否则就会使用ncclNetIb或ncclNetSocket这两个内部插件。
ncclCollNets则表示SHARP插件。
然后,ncclCommInitRankFunc 函数调用 bootstrapInit 函数初始化 bootstrap 网络。关于 bootstrapInit 的具体实现,我们放在后面一个小节介绍。
{BANNED}最佳后,ncclCommInitRankFunc 调用 initTransportsRank 函数探索网络拓扑,并调用 ncclTunerPluginLoad 加载和和初始化 libnccl-tuner.so。
下面首先看前面提到的bootstrapInit函数。
l bootstrapInit
bootstrapInit函数真正完成了和前面root rank的bootstrapRoot线程配合完成了环形网络的建立。
点击(此处)折叠或打开
-
ncclResult_t bootstrapInit(struct ncclBootstrapHandle* handle, struct ncclComm* comm) {
-
int rank = comm->rank;
-
int nranks = comm->nRanks;
-
struct bootstrapState* state;
-
struct ncclSocket* proxySocket;
-
ncclSocketAddress nextAddr;
-
struct ncclSocket sock, listenSockRoot;
-
struct extInfo info = { 0 };
-
-
NCCLCHECK(ncclCalloc(&state, 1));
-
state->rank = rank;
-
state->nranks = nranks;
-
state->abortFlag = comm->abortFlag;
-
comm->bootstrap = state;
-
comm->magic = state->magic = handle->magic;
-
-
TRACE(NCCL_INIT, "rank %d nranks %d", rank, nranks);
-
-
info.rank = rank;
-
info.nranks = nranks;
-
// Create socket for other ranks to contact me
-
NCCLCHECK(ncclSocketInit(&state->listenSock, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
-
NCCLCHECK(ncclSocketListen(&state->listenSock));
-
NCCLCHECK(ncclSocketGetAddr(&state->listenSock, &info.extAddressListen));
-
-
// Create socket for root to contact me
-
NCCLCHECK(ncclSocketInit(&listenSockRoot, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
-
NCCLCHECK(ncclSocketListen(&listenSockRoot));
-
NCCLCHECK(ncclSocketGetAddr(&listenSockRoot, &info.extAddressListenRoot));
-
-
// stagger connection times to avoid an overload of the root
-
if (nranks > 128) {
-
long msec = rank;
-
struct timespec tv;
-
tv.tv_sec = msec / 1000;
-
tv.tv_nsec = 1000000 * (msec % 1000);
-
TRACE(NCCL_INIT, "rank %d delaying connection to root by %ld msec", rank, msec);
-
(void) nanosleep(&tv, NULL);
-
}
-
-
// send info on my listening socket to root
-
/* rank=i 的进程(当前进程)通过 ncclUniqueId 中的bootstrapNetIfAddr创建socket,并通过这个socket将将extAddressListenRoot 和 extAddressListen 发送到 root 进程。这样,root 进程就获取了所有进程的监听端信息 */
-
NCCLCHECK(ncclSocketInit(&sock, &handle->addr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
-
NCCLCHECK(ncclSocketConnect(&sock));
-
NCCLCHECK(bootstrapNetSend(&sock, &info, sizeof(info)));
-
NCCLCHECK(ncclSocketClose(&sock));
-
-
// get info on my "next" rank in the bootstrap ring from root
-
/* rank=i 的进程在与 root 连接的监听端口上等待 root 进程发送的 rank=i+1 进程的监听端信息。这样,就可以构建通信环。接着进程关闭与 root 连接的监听端口。这样,每个进程上只剩下与其它进程进行连接的监听端 */
-
NCCLCHECK(ncclSocketInit(&sock));
-
NCCLCHECK(ncclSocketAccept(&sock, &listenSockRoot));
-
NCCLCHECK(bootstrapNetRecv(&sock, &nextAddr, sizeof(union ncclSocketAddress)));
-
NCCLCHECK(ncclSocketClose(&sock));
-
NCCLCHECK(ncclSocketClose(&listenSockRoot));
-
/* 进程在 ringSendSocket 上连接其后继进程,并在 ringRecvSocket 接收来自其前序进程的连接。
-
这样,进程就可以和其相邻的进程上进行通信。对于 rank=i 的进程,其相邻的进程指的是后继 rank=(i+1)%nranks 和前序 rank=(i-1)%nranks 的进程。*/
-
NCCLCHECK(ncclSocketInit(&state->ringSendSocket, &nextAddr, comm->magic, ncclSocketTypeBootstrap, comm->abortFlag));
-
NCCLCHECK(ncclSocketConnect(&state->ringSendSocket));
-
// Accept the connect request from the previous rank in the AllGather ring
-
NCCLCHECK(ncclSocketInit(&state->ringRecvSocket));
-
NCCLCHECK(ncclSocketAccept(&state->ringRecvSocket, &state->listenSock));
-
-
// AllGather all listen handlers
-
/* 以 Ring 的方式实现进程监听地址的 AllGather。这样,每个进程上就获得了所有进程的监听地址。也就是说,每个进程都可以和其它任意进程通信。 */
-
NCCLCHECK(ncclCalloc(&state->peerCommAddresses, nranks));
-
NCCLCHECK(ncclSocketGetAddr(&state->listenSock, state->peerCommAddresses+rank));
-
NCCLCHECK(bootstrapAllGather(state, state->peerCommAddresses, sizeof(union ncclSocketAddress)));
-
-
// Create the service proxy
-
/* 建立 proxy 监听端口,并采用 bootstrapAllGather 方法获取所有进程的 proxy 监听端口和所有进程的 UDS 信息。其中,UDS 是由于一个随机数和进程号哈希值组成的 64 比特值。 */
-
NCCLCHECK(ncclCalloc(&state->peerProxyAddresses, nranks));
-
NCCLCHECK(ncclCalloc(&state->peerProxyAddressesUDS, nranks));
-
-
// proxy is aborted through a message; don't set abortFlag
-
NCCLCHECK(ncclCalloc(&proxySocket, 1));
-
NCCLCHECK(ncclSocketInit(proxySocket, &bootstrapNetIfAddr, comm->magic, ncclSocketTypeProxy, comm->abortFlag));
-
NCCLCHECK(ncclSocketListen(proxySocket));
-
NCCLCHECK(ncclSocketGetAddr(proxySocket, state->peerProxyAddresses+rank));
-
NCCLCHECK(bootstrapAllGather(state, state->peerProxyAddresses, sizeof(union ncclSocketAddress)));
-
// cuMem UDS support
-
// Make sure we create a unique UDS socket name
-
uint64_t randId;
-
NCCLCHECK(getRandomData(&randId, sizeof(randId)));
-
state->peerProxyAddressesUDS[rank] = getPidHash()+randId;
-
NCCLCHECK(bootstrapAllGather(state, state->peerProxyAddressesUDS, sizeof(*state->peerProxyAddressesUDS)));
-
/* 调用 ncclProxyInit 函数执行初始化 */
-
NCCLCHECK(ncclProxyInit(comm, proxySocket, state->peerProxyAddresses, state->peerProxyAddressesUDS));
-
-
TRACE(NCCL_INIT, "rank %d nranks %d - DONE", rank, nranks);
-
-
return ncclSuccess;
- }
其对应处理流程图如下:
bootstrapInit 函数在 bootstrap 网卡上创建两个监听端,分别用于连接 root 进程和非 root 进程,并分别将它们的 sockaddr 数据结构存储在 extAddressListenRoot 和 extAddressListen 结构体中。此外,当进程总数超过 128 时,为了避免 root 过载,会延迟连接 root 的时间。延迟时间为 rank/1000+rank%1000 毫秒。
我们在{BANNED}中国第一部分中介绍 bootstrapCreateRoot 时介绍了通信环的构建过程。简单来说,root 进程等待所有进程连接自己,并将它们的监听端口信息发送给 root 进程。然后,root 进程给所有进程发送它们后继进程的监听端口信息。这里bootstrapInit所做的工作就是配合root进程完成上述工作的。其时序和调用关系如下图所示。
l bootstrapRingAllGather
下面我们看一下其中bootstrapRingAllGather的实现,在 RingAllGather 算法的第 s 步,rank=i 的进程从其前序 rank=i-1 进程接收 rank=rank-s-1 的监听地址,并将第 s-1 步获取的 rank-s 的监听地址发送到后继 rank=i+1。具体过程如下所示。
点击(此处)折叠或打开
-
ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) {
-
struct bootstrapState* state = (struct bootstrapState*)commState;
-
int rank = state->rank;
-
int nranks = state->nranks;
-
-
TRACE(NCCL_INIT, "rank %d nranks %d size %d", rank, nranks, size);
-
-
NCCLCHECK(bootstrapRingAllGather(&state->ringRecvSocket, &state->ringSendSocket, rank, nranks, (char*)allData, size));
-
-
TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
-
return ncclSuccess;
- }
我们以下图为例说明 RingAllGather 的过程。在初始状态下,每个进程只有自己的监听地址信息。进程首先将自己的地址信息发送到其后继进程。在后续每一步骤中,进程将其在上一步骤中接收到的监听地址信息发送到后继进程。这样,经过整个流程后,每个进程都得到了所有进程监听地址。
{BANNED}最佳后我们看一下ncclProxyInit的实现,ncclProxyInit 函数初始化 proxy 信息,并调用 ncclIpcSocketInit 函数创建一个 Unix 域的监听端 ipcSock。这里,UDS 是 Unix Domain Socket 的简称。
l ncclProxyInit
点击(此处)折叠或打开
-
ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union ncclSocketAddress* peerAddresses, uint64_t *peerAddressesUDS) {
-
assert(comm->sharedRes->proxyState == NULL);
-
NCCLCHECK(ncclCalloc(&comm->sharedRes->proxyState, 1));
-
comm->proxyState = comm->sharedRes->proxyState;
-
comm->proxyState->refCount = 1;
-
comm->proxyState->listenSock = sock;
-
comm->proxyState->peerAddresses = peerAddresses;
-
comm->proxyState->peerAddressesUDS = peerAddressesUDS;
-
-
// UDS support
-
NCCLCHECK(ncclIpcSocketInit(&comm->proxyState->ipcSock, comm->rank, peerAddressesUDS[comm->rank], comm->abortFlag));
-
return ncclSuccess;
- }
小结
在本部分,我们讨论了 NCCL 如何初始化 bootstrap 网络和通信子。我们将复杂的拓扑检测和建立通信通道部分留在下一部分介绍。我们本小结介绍的流程都是ncclCommInitRank内部的逻辑,整体流程如下,其中{BANNED}最佳后的initTransportsRank和ncclTunerPluginLoad我们下个小结再介绍。