Gloo 是一个集合通信库。它带有许多对机器学习应用有用的集体算法。参与机器之间的数据传输是抽象的,因此可以随时使用 IP,或者在可用时使用 InifiniBand(或 RoCE)。 在后一种情况下,如果使用 InfiniBand 传输,GPUDirect可用于加速跨机器 GPU 到 GPU 的内存传输。
在适用的情况下,算法具有一种适用于系统内存缓冲区的实现,以及一种适用于 NVIDIA GPU 内存缓冲区的实现。 在后一种情况下,主机和设备之间不需要复制内存; 这是由算法实现处理的。
aligned_allocator
将分配的内存区域对齐到32字节。使用了using和模板。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48// Align buffers to 32 bytes to support vectorized code
const size_t kBufferAlignment = 32;
template <typename T, int ALIGNMENT = kBufferAlignment>
class aligned_allocator {
static_assert(
!(ALIGNMENT & (ALIGNMENT - 1)),
"alignment must be a power of 2");
public:
using value_type = T;
using pointer = value_type*;
using const_pointer = const value_type*;
using reference = value_type&;
using const_reference = const value_type&;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;
template <typename U>
struct rebind {
using other = aligned_allocator<U, ALIGNMENT>;
};
inline explicit aligned_allocator() = default;
inline ~aligned_allocator() = default;
inline explicit aligned_allocator(const aligned_allocator& a) = default;
inline pointer address(reference r) {
return &r;
}
inline const_pointer address(const_reference r) {
return &r;
}
inline pointer allocate(size_type sz) {
pointer p;
if (posix_memalign(
reinterpret_cast<void**>(&p), ALIGNMENT, sizeof(T) * sz)) {
abort();
}
return p;
}
void deallocate(pointer p, size_type /*sz*/) {
free(p);
}
};
调用posix_memalign(void **memptr, size_t alignment, size_t size)
成功时会返回size字节的动态内存,并且这块内存的地址是alignment的倍数。参数alignment必须是2的幂,还是void指针的大小的倍数。返回的内存块的地址放在了memptr里面,函数返回值是0。调用失败时,没有内存会被分配,memptr的值没有被定义,返回如下错误码之一:
- EINVAL:参数不是2的幂,或者不是void指针的倍数。
- ENOMEM:没有足够的内存去满足函数的请求。
transport
一个样例是这样调用mpi的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16int main(int /*argc*/, char** /*argv*/) {
// We'll use the TCP transport in this example
auto dev = gloo::transport::tcp::CreateDevice("localhost");
// Create Gloo context and delegate management of MPI_Init/MPI_Finalize
auto context = gloo::mpi::Context::createManaged();
context->connectFullMesh(dev);
// Create and run simple allreduce
int rank = context->rank;
gloo::AllreduceRing<int> allreduce(context, {&rank}, 1);
allreduce.run();
std::cout << "Result: " << rank << std::endl;
return 0;
}
transport共有三种通信方式:ibverbs、tcp、uv。
ibverbs
应该是RDMA方式。context应该是上下文,记录通信设备和rank相关信息。
1 | namespace ibverbs { |
这个pair是不能复制和赋值,避免出现内存问题,感觉应该是通信双方组成一个pair:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124// Forward declaration
class Buffer;
class Pair : public ::gloo::transport::Pair {
static constexpr int kMaxBuffers = 8;
static constexpr auto kRecvCompletionQueueCapacity = kMaxBuffers;
static constexpr auto kSendCompletionQueueCapacity = kMaxBuffers;
static constexpr auto kCompletionQueueCapacity =
kRecvCompletionQueueCapacity + kSendCompletionQueueCapacity;
// The ibv_req_notify(3) function takes an argument called
// 'solicited_only' which makes it only trigger a notification for
// work requests that are flagged as solicited. Every completion
// should trigger a notification, so always pass 0.
static constexpr auto kNotifyOnAnyCompletion = 0;
public:
explicit Pair(
const std::shared_ptr<Device>& dev,
std::chrono::milliseconds timeout);
virtual ~Pair();
Pair(const Pair& that) = delete;
Pair& operator=(const Pair& that) = delete;
virtual const Address& address() const override;
virtual void connect(const std::vector<char>& bytes) override;
virtual void setSync(bool enable, bool busyPoll) override;
virtual std::unique_ptr<::gloo::transport::Buffer>
createSendBuffer(int slot, void* ptr, size_t size) override;
virtual std::unique_ptr<::gloo::transport::Buffer>
createRecvBuffer(int slot, void* ptr, size_t size) override;
// Send from the specified buffer to remote side of pair.
virtual void send(
transport::UnboundBuffer* tbuf,
uint64_t tag,
size_t offset,
size_t nbytes) override;
// Receive into the specified buffer from the remote side of pair.
virtual void recv(
transport::UnboundBuffer* tbuf,
uint64_t tag,
size_t offset,
size_t nbytes) override;
void handleCompletionEvent();
void pollCompletions();
void handleCompletion(struct ibv_wc* wc);
void send(Buffer* buf, size_t offset, size_t length, size_t roffset);
void close() override;
protected:
std::shared_ptr<Device> dev_;
// Whether or not this pair is running in sync mode.
std::atomic<bool> sync_;
// Whether or not this pair is busy polling in sync mode.
std::atomic<bool> busyPoll_;
const std::chrono::milliseconds timeout_;
// 该pair的完成队列处理的完成事件数。在销毁完成队列之前,需要确认这么多事件。否则,销毁将挂起。
int completionEventsHandled_;
Address self_;
Address peer_;
struct ibv_cq* cq_;
struct ibv_qp* qp_;
std::mutex m_;
std::condition_variable cv_;
// For us to copy the remote peer's ibv_mr into.
std::map<int, struct ibv_mr> peerMemoryRegions_;
// 这些字段存储pair的远程端可以发送到的内存区域以及pair的本地端可以从中发送的内存区域。
// 注册接收缓冲区时,本地 ibv_mr 被发送到pair的远程端,并且相应的 MemoryRegion 实例保留在 mappedSendRegions_ 列表中,直到发送操作完成。
// 为了允许pair的远程端发送其内存区域,我们在 mappedRecvRegions_ 中保留了固定数量的 MemoryRegion 实例。
// 对于每个发布的接收工作请求,这些区域都会被循环引用。
std::map<int, std::unique_ptr<MemoryRegion> > mappedSendRegions_;
std::array<std::unique_ptr<MemoryRegion>, kMaxBuffers> mappedRecvRegions_;
// 跟踪发布和完成的请求工作请求的数量。 在发布 WR 和完成 WR 时,都需要对 mappedRecvRegions_ 数组进行索引。
uint64_t recvPosted_;
// Completions on behalf of buffers need to be forwarded to those buffers.
std::map<int, Buffer*> sendCompletionHandlers_;
std::map<int, Buffer*> recvCompletionHandlers_;
void sendMemoryRegion(struct ibv_mr* mr, int slot);
const struct ibv_mr* getMemoryRegion(int slot);
void postReceive();
std::chrono::milliseconds getTimeout() const {
return timeout_;
}
const Address& peer() const {
return peer_;
}
private:
std::exception_ptr ex_;
bool closed_ = false;
// Used to signal IO exceptions from one thread and propagate onto others.
void signalIoFailure(const std::string& msg);
void checkErrorState();
friend class Buffer;
};
以下是逐个函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116Pair::Pair(
const std::shared_ptr<Device>& dev,
std::chrono::milliseconds timeout)
: dev_(dev),
sync_(false),
busyPoll_(false),
timeout_(timeout),
completionEventsHandled_(0),
recvPosted_(0),
ex_(nullptr) {
int rv;
// Create completion queue
{
// 必须向设备的完成通道注册此完成队列以支持异步完成处理。
// Pairs 默认使用异步完成处理,因此我们调用 ibv_req_notify_cq(3) 来请求第一个通知。
cq_ = ibv_create_cq(
dev_->context_,
kCompletionQueueCapacity,
this,
dev_->comp_channel_,
0);
GLOO_ENFORCE(cq_);
// 在完成队列 (CQ) 上请求Completion Notification(完成通知)。
rv = ibv_req_notify_cq(cq_, kNotifyOnAnyCompletion);
GLOO_ENFORCE_EQ(rv, 0);
}
// Create queue pair
{
struct ibv_qp_init_attr attr;
memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
attr.send_cq = cq_;
attr.recv_cq = cq_;
attr.cap.max_send_wr = Pair::kSendCompletionQueueCapacity;
attr.cap.max_recv_wr = Pair::kRecvCompletionQueueCapacity;
attr.cap.max_send_sge = 1;
attr.cap.max_recv_sge = 1;
attr.qp_type = IBV_QPT_RC;
qp_ = ibv_create_qp(dev->pd_, &attr);
// 创建queue pair
GLOO_ENFORCE(qp_);
}
// Init queue pair
{
struct ibv_qp_attr attr;
memset(&attr, 0, sizeof(struct ibv_qp_attr));
attr.qp_state = IBV_QPS_INIT;
attr.pkey_index = 0;
attr.port_num = dev_->attr_.port;
attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
rv = ibv_modify_qp(
qp_,
&attr,
IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS);
GLOO_ENFORCE_EQ(rv, 0);
}
// Populate local address.
// The Packet Sequence Number field (PSN) is random which makes that
// the remote end of this pair needs to have the contents of the
// full address struct in order to connect, and vice versa.
{
struct ibv_port_attr attr;
memset(&attr, 0, sizeof(struct ibv_port_attr));
rv = ibv_query_port(dev_->context_, dev_->attr_.port, &attr);
GLOO_ENFORCE_EQ(rv, 0);
rv = ibv_query_gid(
dev_->context_,
dev_->attr_.port,
dev_->attr_.index,
&self_.addr_.ibv_gid);
GLOO_ENFORCE_EQ(rv, 0);
self_.addr_.lid = attr.lid;
self_.addr_.qpn = qp_->qp_num;
self_.addr_.psn = rand() & 0xffffff;
}
// 在连接之前发布接收请求。
// 每当这pair的远程端注册接收缓冲区时,就会触发它们的内存注册被发送到这一端。
// 由于这些发送是单方面的,我们总是需要一整套接收工作请求。
// 内存区域接收可以与常规缓冲区写入交错,因此我们主动在每个接收工作请求中包含一个内存区域。
for (int i = 0; i < kMaxBuffers; ++i) {
mappedRecvRegions_[i] = make_unique<MemoryRegion>(dev_->pd_);
postReceive();
}
}
Pair::~Pair() {
int rv;
// Acknowledge number of completion events handled by this
// pair's completion queue (also see ibv_get_cq_event(3)).
ibv_ack_cq_events(cq_, completionEventsHandled_);
rv = ibv_destroy_qp(qp_);
GLOO_ENFORCE_EQ(rv, 0);
rv = ibv_destroy_cq(cq_);
GLOO_ENFORCE_EQ(rv, 0);
}
void Pair::close() {
if (closed_) {
// TODO: add proper handling of duplicate closes T21171834
return;
}
closed_ = true;
}
const Address& Pair::address() const {
return self_;
}
连接函数先获取到对方的地址,更新attr结构体,使用ibv_modify_qp函数修改RDMA通信所需的结构体。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79void Pair::connect(const std::vector<char>& bytes) {
struct ibv_qp_attr attr;
int rv;
checkErrorState();
peer_ = Address(bytes);
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = IBV_MTU_1024;
attr.dest_qp_num = peer_.addr_.qpn;
attr.rq_psn = peer_.addr_.psn;
attr.max_dest_rd_atomic = 1;
attr.min_rnr_timer = 20;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = peer_.addr_.lid;
attr.ah_attr.port_num = dev_->attr_.port;
if (peer_.addr_.ibv_gid.global.interface_id) {
attr.ah_attr.is_global = 1;
attr.ah_attr.grh.hop_limit = 1;
attr.ah_attr.grh.dgid = peer_.addr_.ibv_gid;
attr.ah_attr.grh.sgid_index = dev_->attr_.index;
}
// ibv_modify_qp()修改队列对的属性。更改的属性描述了QP的发送和接收属性。
// ibv_create_qp仅仅分配了资源,要通过这个modify来让硬件进入工作状态。
// Move to Ready To Receive (RTR) state
rv = ibv_modify_qp(
qp_,
&attr,
IBV_QP_STATE | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN |
IBV_QP_AV | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER);
GLOO_ENFORCE_EQ(rv, 0);
memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.sq_psn = self_.addr_.psn;
attr.ah_attr.is_global = 1;
attr.timeout = 14;
attr.retry_cnt = 7;
attr.rnr_retry = 7; /* infinite */
attr.max_rd_atomic = 1;
// Move to Ready To Send (RTS) state
rv = ibv_modify_qp(
qp_,
&attr,
IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC);
GLOO_ENFORCE_EQ(rv, 0);
}
// Switches the pair into synchronous mode.
//
// Note: busy polling is NOT optional. Currently, since all pairs
// share a single completion channel, busy polling is mandatory
// through ibv_poll_cq(3). If a use case comes up for supporting
// synchronous mode where the calling thread should be suspended, this
// can be revisited and we can add a completion channel per pair.
//
void Pair::setSync(bool sync, bool busyPoll) {
checkErrorState();
if (!sync) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION("Can only switch to sync mode");
}
if (!busyPoll) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION(
"The ibverbs transport only supports busy polling in sync mode");
}
// The notification mechanism for this pair's completion queue is
// still armed. This means the device thread will still call
// handleCompletions() one more time, but this is ignored.
//
// No need to lock a mutex; these are atomics.
//
sync_ = true;
busyPoll_ = true;
}
使用ibv_post_send函数发送,1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23void Pair::sendMemoryRegion(struct ibv_mr* src, int slot) {
auto mr = make_unique<MemoryRegion>(dev_->pd_, src);
struct ibv_sge list = mr->sge();
struct ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
wr.wr_id = slot;
wr.sg_list = &list;
wr.num_sge = 1;
wr.opcode = IBV_WR_SEND_WITH_IMM;
wr.send_flags = IBV_SEND_SIGNALED;
wr.imm_data = slot;
// 工作请求被序列化并发送到驱动程序,因此它不需要在 ibv_post_send 调用后有效。
// ibv_post_send和recv用于发送verb,verb承载在一个称为ibv_send_wr或者ibv_recv_wr的数据结构中,里面是verb类型和mr的相关细节。
struct ibv_send_wr* bad_wr = nullptr;
int rv = ibv_post_send(qp_, &wr, &bad_wr);
if (rv != 0) {
signalIoFailure(GLOO_ERROR_MSG("ibv_post_send: ", rv));
}
GLOO_ENFORCE_EQ(mappedSendRegions_.count(slot), 0);
mappedSendRegions_[slot] = std::move(mr);
}
先获取到锁,如果是异步的,需要检查是不是超时了;否则就等待。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43const struct ibv_mr* Pair::getMemoryRegion(int slot) {
std::unique_lock<std::mutex> lock(m_);
if (sync_) {
auto it = peerMemoryRegions_.find(slot);
auto start = std::chrono::steady_clock::now();
while (it == peerMemoryRegions_.end()) {
lock.unlock();
pollCompletions();
lock.lock();
if (timeout_ != kNoTimeout &&
(std::chrono::steady_clock::now() - start) >= timeout_) {
lock.unlock();
signalIoFailure(
GLOO_ERROR_MSG(
"Timeout waiting for memory region from ",
peer_.str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
it = peerMemoryRegions_.find(slot);
}
return &it->second;
} else {
auto pred = [&]{
return peerMemoryRegions_.find(slot) != peerMemoryRegions_.end();
};
if (timeout_ == kNoTimeout) {
// No timeout set. Wait for read to complete.
cv_.wait(lock, pred);
} else {
auto done = cv_.wait_for(lock, timeout_, pred);
if (!done) {
signalIoFailure(
GLOO_ERROR_MSG(
"Timeout waiting for memory region from ",
peer_.str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
auto it = peerMemoryRegions_.find(slot);
GLOO_ENFORCE(it != peerMemoryRegions_.end());
return &it->second;
}
}
1 | void Pair::postReceive() { |
轮询这pair的完成队列以获取工作完成情况。当从设备线程调用时,这对的互斥锁已经被获取。从用户线程调用时,不会获取互斥锁(因为只有一个线程使用这对)。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87void Pair::pollCompletions() {
std::array<struct ibv_wc, kCompletionQueueCapacity> wc;
// Invoke handler for every work completion.
for (;;) {
auto nwc = ibv_poll_cq(cq_, wc.size(), wc.data());
GLOO_ENFORCE_GE(nwc, 0);
// Handle work completions
for (int i = 0; i < nwc; i++) {
checkErrorState();
handleCompletion(&wc[i]);
}
// Break unless wc was filled
if (nwc == 0 || nwc < wc.size()) {
break;
}
}
}
void Pair::handleCompletion(struct ibv_wc* wc) {
if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) {
// Incoming RDMA write completed.
// Slot is encoded in immediate data on receive work completion.
// It is set in the Pair::send function.
auto slot = wc->imm_data;
GLOO_ENFORCE_EQ( wc->status, IBV_WC_SUCCESS, "Recv for slot ", slot, ": ", ibv_wc_status_str(wc->status));
GLOO_ENFORCE(recvCompletionHandlers_[slot] != nullptr);
recvCompletionHandlers_[slot]->handleCompletion(wc);
// Backfill receive work requests.
postReceive();
} else if (wc->opcode == IBV_WC_RDMA_WRITE) {
// Outbound RDMA write completed.
// Slot is encoded in wr_id fields on send work request. Unlike
// the receive work completions, the immediate data field on send
// work requests are not pass to the respective work completion.
auto slot = wc->wr_id;
GLOO_ENFORCE_EQ( wc->status, IBV_WC_SUCCESS, "Send for slot ", slot, ": ", ibv_wc_status_str(wc->status));
GLOO_ENFORCE(sendCompletionHandlers_[slot] != nullptr);
sendCompletionHandlers_[slot]->handleCompletion(wc);
} else if (wc->opcode == IBV_WC_RECV) {
// 内存区域 recv 完成。
// 仅由pair的远程端用于传递 ibv_mr。
// 它们以 FIFO 顺序写入,因此我们可以在映射的接收区域列表中选择并使用第一个 MemoryRegion 实例。
// 尝试写入此插槽的缓冲区可能正在等待该对的另一端发送其内存区域。
// 锁定访问权限,然后通知任何等待的人。
// 时隙在接收工作完成后立即编码为数据。 它在 Pair::sendMemoryRegion 函数中设置。
auto slot = wc->imm_data;
GLOO_ENFORCE_EQ(
wc->status,
IBV_WC_SUCCESS,
"Memory region recv for slot ",
slot,
": ",
ibv_wc_status_str(wc->status));
// Move ibv_mr from memory region 'inbox' to final slot.
const auto& mr = mappedRecvRegions_[recvPosted_ % kMaxBuffers];
peerMemoryRegions_[slot] = mr->mr();
// Notify any buffer waiting for the details of its remote peer.
cv_.notify_all();
// Backfill receive work requests.
postReceive();
} else if (wc->opcode == IBV_WC_SEND) {
// Memory region send completed.
auto slot = wc->wr_id;
GLOO_ENFORCE_EQ(
wc->status,
IBV_WC_SUCCESS,
"Memory region send for slot ",
slot,
": ",
ibv_wc_status_str(wc->status));
GLOO_ENFORCE_GT(mappedSendRegions_.size(), 0);
GLOO_ENFORCE_EQ(mappedSendRegions_.count(slot), 1);
mappedSendRegions_.erase(slot);
} else {
GLOO_ENFORCE(false, "Unexpected completion with opcode: ", wc->opcode);
}
}
使用一些信息填充结构体,获取到内存区域后发送1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54void Pair::send(Buffer* buffer, size_t offset, size_t length, size_t roffset) {
struct ibv_sge list;
list.addr = (uint64_t)buffer->ptr_ + offset;
list.length = length;
list.lkey = buffer->mr_->lkey;
struct ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
wr.wr_id = buffer->slot_;
wr.sg_list = &list;
wr.num_sge = 1;
wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
wr.send_flags = IBV_SEND_SIGNALED;
wr.imm_data = buffer->slot_;
const struct ibv_mr* peer = getMemoryRegion(buffer->slot_);
GLOO_ENFORCE_NE(peer, (const struct ibv_mr*)nullptr);
wr.wr.rdma.remote_addr = (uint64_t)peer->addr + roffset;
wr.wr.rdma.rkey = peer->rkey;
struct ibv_send_wr* bad_wr;
auto rv = ibv_post_send(qp_, &wr, &bad_wr);
if (rv != 0) {
signalIoFailure(GLOO_ERROR_MSG("ibv_post_send: ", rv));
}
}
void Pair::signalIoFailure(const std::string& msg) {
std::lock_guard<std::mutex> lock(m_);
auto ex = ::gloo::IoException(msg);
if (ex_ == nullptr) {
// If we haven't seen an error yet, store the exception to throw on future calling threads.
ex_ = std::make_exception_ptr(ex);
// Loop through the completion handlers and signal that an error has
// occurred.
for (auto& it : recvCompletionHandlers_) {
GLOO_ENFORCE(it.second != nullptr);
it.second->signalError(ex_);
}
for (auto& it : sendCompletionHandlers_) {
GLOO_ENFORCE(it.second != nullptr);
it.second->signalError(ex_);
}
}
// Finally, throw the exception on this thread.
throw ex;
};
void Pair::checkErrorState() {
// If we previously encountered an error, rethrow here.
if (ex_ != nullptr) {
std::rethrow_exception(ex_);
}
}
device 保存了设备信息,以下是构建一个设备1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28Device::Device(const struct attr& attr, ibv_context* context)
: attr_(attr),
pciBusID_(infinibandToBusID(attr.name)),
hasNvPeerMem_(kernelModules().count("nv_peer_mem") > 0),
context_(context) {
int rv;
// Query and store device attributes
rv = ibv_query_device(context_, &deviceAttr_);
GLOO_ENFORCE_EQ(rv, 0, "ibv_query_device: ", strerror(errno));
// Query and store port attributes
rv = ibv_query_port(context_, attr_.port, &portAttr_);
GLOO_ENFORCE_EQ(rv, 0, "ibv_query_port: ", strerror(errno));
// Protection domain
pd_ = ibv_alloc_pd(context_);
GLOO_ENFORCE(pd_);
// Completion channel
comp_channel_ = ibv_create_comp_channel(context_);
GLOO_ENFORCE(comp_channel_);
// Start thread to poll completion queue and dispatch
// completions for completed work requests.
done_ = false;
loop_.reset(new std::thread(&Device::loop, this));
}
buffer分配一个缓冲区1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37Buffer::Buffer(Pair* pair, int slot, void* ptr, size_t size)
: ::gloo::transport::Buffer(slot, ptr, size),
pair_(pair),
recvCompletions_(0),
sendCompletions_(0),
sendPending_(0),
ex_(nullptr) {
// 注册一个memory region
mr_ = ibv_reg_mr(
pair_->dev_->pd_,
ptr_,
size_,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
// Provide hint if the error is EFAULT and nv_peer_mem is not loaded
if (mr_ == nullptr && errno == EFAULT) {
if (!pair->dev_->hasNvPeerMem_) {
GLOO_ENFORCE(
mr_ != nullptr,
"ibv_reg_mr: ",
strerror(errno),
" (kernel module 'nv_peer_mem' not loaded;"
" did you specify a pointer to GPU memory?)");
}
}
// Provide hint if the error is ENOMEM
if (mr_ == nullptr && errno == ENOMEM) {
GLOO_ENFORCE(
mr_ != nullptr,
"ibv_reg_mr: ",
strerror(errno),
" (did you run into the locked memory limit?)");
}
GLOO_ENFORCE(mr_ != nullptr, "ibv_reg_mr: ", strerror(errno));
}
等待接收操作完成。根据是不是异步判断是否需要等待1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44void Buffer::waitRecv() {
// 如果该pair处于同步模式,则当前线程负责轮询工作完成情况。
// 由于单个pair可能为多个缓冲区提供服务,因此完成可能旨在用于另一个缓冲区。
auto timeout = pair_->getTimeout();
if (pair_->sync_) {
auto start = std::chrono::steady_clock::now();
// We can assume a single pair is never used by more than one
// thread, so there is no need to acquire the mutex here.
while (recvCompletions_ == 0) {
pair_->pollCompletions();
if (timeout != kNoTimeout &&
(std::chrono::steady_clock::now() - start) >= timeout) {
pair_->signalIoFailure(
GLOO_ERROR_MSG("Read timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
recvCompletions_--;
} else {
// The device thread will signal completion. If the completion
// hasn't arrived yet, wait until it does.
auto pred = [&]{
checkErrorState();
return recvCompletions_ > 0;
};
std::unique_lock<std::mutex> lock(m_);
if (timeout == kNoTimeout) {
// No timeout set. Wait for read to complete.
recvCv_.wait(lock, pred);
} else {
auto done = recvCv_.wait_for(lock, timeout, pred);
if (!done) {
// Release the mutex before calling into the pair to avoid deadlock.
// Calling signalIoFailure() will throw, so no need to
// reacquire.
lock.unlock();
pair_->signalIoFailure(
GLOO_ERROR_MSG("Read timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
recvCompletions_--;
}
}
等待发送操作完成。根据是不是异步判断是否需要等待1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78// Wait for the previous send operation to finish.
void Buffer::waitSend() {
// 如果该pair处于同步模式,则当前线程负责轮询工作完成情况。
auto timeout = pair_->getTimeout();
if (pair_->sync_) {
// We can assume a single pair is never used by more than one
// thread, so there is no need to acquire the mutex here.
if (sendCompletions_ == 0) {
GLOO_ENFORCE_GT(sendPending_, 0, "No send to wait for");
auto start = std::chrono::steady_clock::now();
// We can assume a single pair is never used by more than one
// thread, so there is no need to acquire the mutex here.
while (sendCompletions_ == 0) {
pair_->pollCompletions();
if (timeout != kNoTimeout &&
(std::chrono::steady_clock::now() - start) >= timeout) {
pair_->signalIoFailure(
GLOO_ERROR_MSG("Send timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
}
sendCompletions_--;
} else {
// The device thread will signal completion. If the completion
// hasn't arrived yet, wait until it does.
std::unique_lock<std::mutex> lock(m_);
checkErrorState();
if (sendCompletions_ == 0) {
GLOO_ENFORCE_GT(sendPending_, 0, "No send to wait for");
auto pred = [&]{
checkErrorState();
return sendCompletions_ > 0;
};
if (timeout == kNoTimeout) {
// No timeout set. Wait for read to complete.
sendCv_.wait(lock, pred);
} else {
auto done = sendCv_.wait_for(lock, timeout, pred);
if (!done) {
// Release the mutex before calling into the pair to avoid deadlock.
// Calling signalIoFailure() will throw, so no need to
// reacquire.
lock.unlock();
pair_->signalIoFailure(
GLOO_ERROR_MSG("Send timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
}
sendCompletions_--;
}
}
void Buffer::send(size_t offset, size_t length, size_t roffset) {
int rv;
// Can't assert on roffset, since we don't know the size of
// the remote buffer. Refactor of initialization code needed
// to support this.
GLOO_ENFORCE_LE(offset + length, size_);
{
std::unique_lock<std::mutex> lock(m_);
checkErrorState();
}
if (debug_) {
std::cout << "[" << getpid() << "] ";
std::cout << "send " << length << " bytes";
std::cout << std::endl;
}
// Increment number of sends in flight
sendPending_++;
pair_->send(this, offset, length, roffset);
}
tcp
TCP中包含一个tls层。TLS(Transport Layer Security,安全传输层),TLS是建立在传输层TCP协议之上的协议,服务于应用层,它的前身是SSL(Secure Socket Layer,安全套接字层),它实现了将应用层的报文进行加密后再交由TCP进行传输的功能。
tls
上下文:1
2
3
4
5
6
7
8
9Context::Context(std::shared_ptr<Device> device, int rank, int size)
: ::gloo::transport::tcp::Context(
std::dynamic_pointer_cast<::gloo::transport::tcp::Device>(device),
rank, size),
ssl_ctx_(create_ssl_ctx(c_str_or_null(device->getPKeyFile()),
c_str_or_null(device->getCertFile()),
c_str_or_null(device->getCAFile()),
c_str_or_null(device->getCAPath())),
[](::SSL_CTX *x) { ::_glootls::SSL_CTX_free(x); }) {}
真正创建ssl context的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66SSL_CTX *Context::create_ssl_ctx(const char *pkey, const char *cert,
const char *ca_file, const char *ca_path) {
GLOO_ENFORCE(pkey != nullptr && cert != nullptr,
"Private key and certificate location must be specified");
GLOO_ENFORCE(ca_file != nullptr || ca_path != nullptr,
"CAfile or CApath must be specified");
static std::once_flag ssl_ctx_init_;
std::call_once(ssl_ctx_init_, [] {
// SSL_load_error_strings();
// SSL_library_init();
_glootls::OPENSSL_init_ssl(
OPENSSL_INIT_LOAD_SSL_STRINGS | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, NULL);
_glootls::OPENSSL_init_ssl(0, NULL);
});
SSL_CTX *ssl_ctx = _glootls::SSL_CTX_new(_glootls::TLS_method());
GLOO_ENFORCE(ssl_ctx != nullptr, getSSLErrorMessage());
GLOO_ENFORCE(
_glootls::SSL_CTX_set_min_proto_version(ssl_ctx, TLS_MAX_VERSION) == 1,
getSSLErrorMessage());
// As we don't need to handle legacy clients,
// let's remove support for legacy renegotiation:
_glootls::SSL_CTX_clear_options(ssl_ctx, SSL_OP_LEGACY_SERVER_CONNECT);
_glootls::SSL_CTX_set_verify_depth(ssl_ctx, 1);
// To enforcing a higher security level, set it to 3.
//
// 2级
// 安全级别设置为 112 位安全。 因此,禁止使用短于 2048 位的 RSA、DSA 和 DH 密钥以及短于 224 位的 ECC 密钥。
// 除了 1 级排除之外,还禁止使用任何使用 RC4 的密码套件。 SSL 版本 3 也是不允许的。 压缩被禁用。
//
// Level 3
// 安全级别设置为 128 位安全。
// 因此,禁止使用小于 3072 位的 RSA、DSA 和 DHkey 以及小于 256 位的 ECC 密钥。
// 除了 2 级排除之外,禁止使用不提供前向保密的密码套件。 不允许使用低于 1.1 的 TLS 版本。 会话票证被禁用。
//
// TODO: should be 3, but it doesn't work yet :(
_glootls::SSL_CTX_set_security_level(ssl_ctx, 2);
GLOO_ENFORCE(
_glootls::SSL_CTX_load_verify_locations(ssl_ctx, ca_file, ca_path) == 1,
getSSLErrorMessage());
GLOO_ENFORCE(_glootls::SSL_CTX_use_certificate_chain_file(ssl_ctx, cert) == 1,
getSSLErrorMessage());
GLOO_ENFORCE(_glootls::SSL_CTX_use_PrivateKey_file(ssl_ctx, pkey,
SSL_FILETYPE_PEM) == 1,
getSSLErrorMessage());
GLOO_ENFORCE(_glootls::SSL_CTX_check_private_key(ssl_ctx) == 1,
getSSLErrorMessage());
// SSL_VERIFY_PEER
//
// 服务器模式:服务器向客户端发送客户端证书请求。
// 检查返回的证书(如果有)。 如果验证过程失败,TLS/SSL 握手会立即终止,并发出一条包含验证失败原因的警报消息。
// 该行为可以通过附加的 SSL_VERIFY_FAIL_IF_NO_PEER_CERT 和 SSL_VERIFY_CLIENT_ONCE 标志来控制。
//
// 客户端模式:验证服务器证书。
// 如果验证过程失败,TLS/SSL 握手会立即终止,并发出一条包含验证失败原因的警报消息。
// 如果没有发送服务器证书,因为使用了匿名密码,SSL_VERIFY_PEER 将被忽略。
_glootls::SSL_CTX_set_verify(ssl_ctx,
SSL_VERIFY_PEER |
SSL_VERIFY_FAIL_IF_NO_PEER_CERT |
SSL_VERIFY_CLIENT_ONCE,
nullptr);
return ssl_ctx;
}
正经tcp
同样需要创建通信对,缓冲区,以及处理通信等。1
2
3
4
5
6
7
8
9
10
11
12std::unique_ptr<transport::Pair>& Context::createPair(int rank) {
pairs_[rank] = std::unique_ptr<transport::Pair>(
new tcp::Pair(this, device_.get(), rank, getTimeout()));
return pairs_[rank];
}
std::unique_ptr<transport::UnboundBuffer> Context::createUnboundBuffer(
void* ptr,
size_t size) {
auto buf = new tcp::UnboundBuffer(shared_from_this(), ptr, size);
return std::unique_ptr<transport::UnboundBuffer>(buf);
}
以下是处理通信的过程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94void Context::recvFromAny(
UnboundBuffer* buf,
uint64_t slot,
size_t offset,
size_t nbytes,
std::vector<int> srcRanks) {
for (;;) {
// Find rank of pair we can attempt a recv from
auto rank = recvFromAnyFindRank(buf, slot, offset, nbytes, srcRanks);
if (rank == -1) {
return;
}
// Try recv from returned rank
auto ptr = pairs_[rank].get();
GLOO_ENFORCE(ptr != nullptr);
auto pair = dynamic_cast<Pair*>(ptr);
GLOO_ENFORCE(pair != nullptr);
if (pair->tryRecv(buf, slot, offset, nbytes)) {
return;
}
}
}
int Context::recvFromAnyFindRank(
UnboundBuffer* buf,
uint64_t slot,
size_t offset,
size_t nbytes,
const std::vector<int>& srcRanks) {
std::unique_lock<std::mutex> lock(mutex_);
// See if there is a remote pending send that can fulfill this recv.
auto it = findPendingOperations(slot);
if (it != pendingOperations_.end()) {
auto& pendingOperation = *it;
// Out of all remote pending sends, find the first one
// that exists in the set of eligible ranks.
for (const auto rank : pendingOperation.getSendList()) {
for (const auto srcRank : srcRanks) {
if (rank == srcRank) {
// 我们找到了一个可以满足这个recv的等级。
// 此函数的调用者将尝试进行recv,如果该远程挂起发送操作仍然存在,它将删除它。
//
return rank;
}
}
}
}
// No candidates; register buffer for recv
pendingRecv_[slot].emplace_back(
buf->getWeakNonOwningPtr(),
offset,
nbytes,
std::unordered_set<int>(srcRanks.begin(), srcRanks.end()));
return -1;
}
// Allowed to be called only by ContextMutator::findRecvFromAny,
// where the context lock is already held.
bool Context::findRecvFromAny(
uint64_t slot,
int rank,
WeakNonOwningPtr<UnboundBuffer>* buf,
size_t* offset,
size_t* nbytes) {
// See if there is a pending recv for this slot.
auto pit = pendingRecv_.find(slot);
if (pit != pendingRecv_.end()) {
auto& recvs = pit->second;
// Iterate over available buffers to find a match.
for (auto rit = recvs.begin(); rit != recvs.end(); rit++) {
const auto& ranks = std::get<3>(*rit);
// 如果此对等点的rank在此插槽的可接受rank集中,我们可以继续并将缓冲区返回到 recv 中。
if (ranks.count(rank) > 0) {
// Capture values to return.
*buf = std::get<0>(*rit);
*offset = std::get<1>(*rit);
*nbytes = std::get<2>(*rit);
// Cleanup.
recvs.erase(rit);
if (recvs.empty()) {
pendingRecv_.erase(pit);
}
return true;
}
}
}
return false;
}
loop
其中包含了epoll的使用方法,单独拿出来
创建一个epoll1
2
3
4
5Loop::Loop() {
fd_ = epoll_create(1);
GLOO_ENFORCE_NE(fd_, -1, "epoll_create: ", strerror(errno));
loop_.reset(new std::thread(&Loop::run, this));
}
epoll_ctl,用于操作epoll函数所生成的实例。1
2
int epoll_ctl(int epfd,int op,int fd,struct epoll_event * event);
该系统调用对文件描述符epfd引用的epoll实例执行控制操作。它要求操作op对目标文件描述符fd执行。op参数的有效值为:
- EPOLL_CTL_ADD:在文件描述符epfd所引用的epoll实例上注册目标文件描述符fd,并将事件事件与内部文件链接到fd。
- EPOLL_CTL_MOD:更改与目标文件描述符fd相关联的事件事件。
- EPOLL_CTL_DEL:从epfd引用的epoll实例中删除(注销)目标文件描述符fd。该事件将被忽略,并且可以为NULL。
1 | void Loop::registerDescriptor(int fd, int events, Handler* h) { |
等待某个epoll完成1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26void Loop::run() {
std::array<struct epoll_event, capacity_> events;
int nfds;
while (!done_) {
// Wakeup everyone waiting for a loop tick to finish.
cv_.notify_all();
// Wait for something to happen
nfds = epoll_wait(fd_, events.data(), events.size(), 10);
if (nfds == 0) {
continue;
}
if (nfds == -1 && errno == EINTR) {
continue;
}
GLOO_ENFORCE_NE(nfds, -1);
for (int i = 0; i < nfds; i++) {
Handler* h = reinterpret_cast<Handler*>(events[i].data.ptr);
h->handleEvents(events[i].events);
TSAN_ANNOTATE_HAPPENS_BEFORE(h);
}
}
}
mpi
mpi相关的通信操作,MPIScope应该是MPI的上下文什么的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20std::shared_ptr<MPIScope> getMPIScope() {
static std::once_flag once;
// Use weak pointer so that the initializer is destructed when the
// last context referring to it is destructed, not when statics
// are destructed on program termination.
static std::weak_ptr<MPIScope> wptr;
std::shared_ptr<MPIScope> sptr;
// Create MPIScope only once
std::call_once(once, [&]() {
sptr = std::make_shared<MPIScope>();
wptr = sptr;
});
// Create shared_ptr<MPIScope> from weak_ptr
sptr = wptr.lock();
GLOO_ENFORCE(sptr, "Cannot create MPI context after MPI_Finalize()");
return sptr;
}
返回MPI上下文(通信域)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16std::shared_ptr<Context> Context::createManaged() {
auto mpiScope = getMPIScope();
auto context = std::make_shared<Context>(MPI_COMM_WORLD);
context->mpiScope_ = std::move(mpiScope);
return context;
}
Context::Context(const MPI_Comm& comm)
: ::gloo::Context(MPICommRank(comm), MPICommSize(comm)) {
auto error = MPI_Comm_dup(comm, &comm_);
GLOO_ENFORCE(error == MPI_SUCCESS, "MPI_Comm_dup: ", error);
}
Context::~Context() {
MPI_Comm_free(&comm_);
}
为本进程和其他进程创建通信对pair和缓冲区1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62void Context::connectFullMesh(std::shared_ptr<transport::Device>& dev) {
std::vector<std::vector<char>> addresses(size);
unsigned long maxLength = 0;
int rv;
// Create pair to connect to every other node in the collective
auto transportContext = dev->createContext(rank, size);
transportContext->setTimeout(getTimeout());
for (int i = 0; i < size; i++) {
if (i == rank) {
continue;
}
auto& pair = transportContext->createPair(i);
// Store address for pair for this rank
auto address = pair->address().bytes();
maxLength = std::max(maxLength, address.size());
addresses[i] = std::move(address);
}
// Agree on maximum length so we can prepare buffers
rv = MPI_Allreduce(
MPI_IN_PLACE, &maxLength, 1, MPI_UNSIGNED_LONG, MPI_MAX, comm_);
if (rv != MPI_SUCCESS) {
GLOO_THROW_IO_EXCEPTION("MPI_Allreduce: ", rv);
}
// Prepare input and output
std::vector<char> in(size * maxLength);
std::vector<char> out(size * size * maxLength);
for (int i = 0; i < size; i++) {
if (i == rank) {
continue;
}
auto& address = addresses[i];
memcpy(in.data() + (i * maxLength), address.data(), address.size());
}
// Allgather to collect all addresses of all pairs
rv = MPI_Allgather(
in.data(), in.size(), MPI_BYTE, out.data(), in.size(), MPI_BYTE, comm_);
if (rv != MPI_SUCCESS) {
GLOO_THROW_IO_EXCEPTION("MPI_Allgather: ", rv);
}
// Connect every pair
for (int i = 0; i < size; i++) {
if (i == rank) {
continue;
}
auto offset = (rank + i * size) * maxLength;
std::vector<char> address(maxLength);
memcpy(address.data(), out.data() + offset, maxLength);
transportContext->getPair(i)->connect(address);
}
device_ = dev;
transportContext_ = std::move(transportContext);
}
通信
首先得获取本进程的左方和右方1
2
3
4
5
6
7
8
9
10
11
12
13// Helper for ring algorithms
std::unique_ptr<transport::Pair>& Algorithm::getLeftPair() {
auto rank = (context_->size + context_->rank - 1) % context_->size;
GLOO_ENFORCE(context_->getPair(rank), "pair missing (index ", rank, ")");
return context_->getPair(rank);
}
// Helper for ring algorithms
std::unique_ptr<transport::Pair>& Algorithm::getRightPair() {
auto rank = (context_->rank + 1) % context_->size;
GLOO_ENFORCE(context_->getPair(rank), "pair missing (index ", rank, ")");
return context_->getPair(rank);
}
gloo支持reduce的一些操作,对于一些reduce时自定义的方法,gloo也做了兼容:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69// Type of reduction function.
// 如果reduce类型是内置类型之一,则算法实现可以使用加速版本(如果可用)。
// 例如,如果将 ReductionType 等于 SUM 的 ReductionFunction 传递给 CUDA 感知的 Allreduce,它知道它可以使用 NCCL 实现而不是指定的函数。
//
enum ReductionType {
SUM = 1,
PRODUCT = 2,
MAX = 3,
MIN = 4,
// Use larger number so we have plenty of room to add built-ins
CUSTOM = 1000,
};
template <typename T>
class ReductionFunction {
public:
using Function = void(T*, const T*, size_t n);
static const ReductionFunction<T>* sum;
static const ReductionFunction<T>* product;
static const ReductionFunction<T>* min;
static const ReductionFunction<T>* max;
ReductionFunction(ReductionType type, Function* fn)
: type_(type), fn_(fn) {}
ReductionType type() const {
return type_;
}
void call(T* x, const T* y, size_t n) const {
fn_(x, y, n);
}
protected:
ReductionType type_;
Function* fn_;
};
template <typename T>
const ReductionFunction<T>* ReductionFunction<T>::sum =
new ReductionFunction<T>(SUM, &::gloo::sum<T>);
template <typename T>
const ReductionFunction<T>* ReductionFunction<T>::product =
new ReductionFunction<T>(PRODUCT, &::gloo::product<T>);
template <typename T>
const ReductionFunction<T>* ReductionFunction<T>::min =
new ReductionFunction<T>(MIN, &::gloo::min<T>);
template <typename T>
const ReductionFunction<T>* ReductionFunction<T>::max =
new ReductionFunction<T>(MAX, &::gloo::max<T>);
// Local operation.
// If an algorithm uses multiple local pointers, local operations
// can be used for local reduction, broadcast, gathering, etc.
template <typename T>
class LocalOp {
public:
virtual ~LocalOp() noexcept(false) {}
virtual void runAsync() = 0;
virtual void wait() = 0;
// Synchronous run is equal to asynchronous run and wait.
inline void run() {
runAsync();
wait();
}
};
allgather
AllgatherRing 类似于 MPI_Allgather,所有进程都从所有其他进程接收缓冲区(inPtrs)。 调用者需要传递一个预先分配的接收缓冲区 (outPtr),其大小等于[ 上下文大小 x 发送缓冲区的总大小] (inPtrs),其中 rank = k 的进程的发送缓冲区将被写入 outPtr[k 输入缓冲区数量 count] 连续。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32template <typename T>
class AllgatherRing : public Algorithm {
public:
AllgatherRing(
const std::shared_ptr<Context>& context,
const std::vector<const T*>& inPtrs,
T* outPtr,
int count)
: Algorithm(context),
inPtrs_(inPtrs),
outPtr_(outPtr),
count_(count),
bytes_(count * sizeof(T)),
inputStride_(count_ * inPtrs_.size()),
leftPair_(this->getLeftPair()),
rightPair_(this->getRightPair()) {
auto slot = this->context_->nextSlot();
// std::unique_ptr<transport::Buffer>
sendDataBuf_ = rightPair_->createSendBuffer(
slot, outPtr_, inPtrs_.size() * context_->size * bytes_);
recvDataBuf_ = leftPair_->createRecvBuffer(
slot, outPtr_, inPtrs_.size() * context_->size * bytes_);
auto notificationSlot = this->context_->nextSlot();
// std::unique_ptr<transport::Buffer>
sendNotificationBuf_ =
leftPair_->createSendBuffer(notificationSlot, &dummy_, sizeof(dummy_));
recvNotificationBuf_ =
rightPair_->createRecvBuffer(notificationSlot, &dummy_, sizeof(dummy_));
}
真正运行的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 void run() {
const int rank = this->contextRank_;
const int numRounds = this->contextSize_ - 1;
// Copy local buffers.
for (int i = 0; i < inPtrs_.size(); i++) {
memcpy(outPtr_ + rank * inputStride_ + i * count_, inPtrs_[i], bytes_);
}
// We send input buffers in order.
for (int i = 0; i < inPtrs_.size(); i++) {
// We start every iteration by sending local buffer.
int inRank = rank;
// 10个进程,就是9个round。1号进程,第一个round给0,第二个round给9,第三个round给8...
for (int round = 0; round < numRounds; round++) {
const int sendOffset = inRank * inputStride_ + i * count_;
sendDataBuf_->send(
sendOffset * sizeof(T), bytes_, sendOffset * sizeof(T));
recvDataBuf_->waitRecv();
// Nodes receive data from the left node in every round and forward it
// to the right node.
inRank = (numRounds - round + rank) % this->contextSize_;
// Send notification to node on the left that this node is ready for an
// inbox write.
sendNotificationBuf_->send();
// Wait for notification from node on the right.
recvNotificationBuf_->waitRecv();
}
}
}
private:
const std::vector<const T*> inPtrs_;
T* outPtr_;
const int count_;
const int bytes_;
const int inputStride_;
std::unique_ptr<transport::Pair>& leftPair_;
std::unique_ptr<transport::Pair>& rightPair_;
std::unique_ptr<transport::Buffer> sendDataBuf_;
std::unique_ptr<transport::Buffer> recvDataBuf_;
int dummy_;
std::unique_ptr<transport::Buffer> sendNotificationBuf_;
std::unique_ptr<transport::Buffer> recvNotificationBuf_;
};
一般的allgather
1 | void allgather(AllgatherOptions& opts) { |
allgatherv
1 | void allgatherv(AllgathervOptions& opts) { |
allreduce
1 | using BufferVector = std::vector<std::unique_ptr<transport::UnboundBuffer>>; |
allreduce的ring方法
给定的输入被分成与进程数相等的块数。 算法完成后,每个进程按顺序托管一个reduction输出块(rank 0 具有块 0,rank 1 具有块 1,等等)。由于输入可能不能被进程数整除,因此最终的块有部分输出或可能为空。
当一个块沿着环传递并且包含连续更多rank的reduction时,我们必须在为该块执行 I/O 和计算接收到的块和本地块之间的reduction之间交替。为了避免这种交替模式,我们将一个块分成多个段(> = 2),并确保我们有一个段在运行,同时计算另一个段的reduction。
段大小有一个上限,以最大限度地减少内存使用并避免不良的缓存行为。 这意味着在处理非常大的输入时,每个块可能有很多段。
1 | void ring( |
bcube 算法
bcube 算法实现了一种类似超立方体的reduce策略。约束是进程的数量可以分解。如果分解中的最小分量为 2,并且进程数等于 2 的幂,则该算法与递归减半/加倍相同。
分解中的元素数量决定了算法的步数。分解的每个元素决定了每个进程在算法的特定步骤中与之通信的进程数。如果进程数不可分解,则该算法与直接reduce-scatter 后allgather 相同。
例如,如果#processes == 8,并且我们将其分解为 4 * 2,则算法分 2 步运行。在第一步中,2 组 4 个进程之间交换数据,以使所有进程具有部分结果的 1/4。第二步,4组2个进程交换它们的部分结果,使得所有进程都有1/8的结果。然后,反向执行相同的分解以执行 allgather。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240void bcube(
const detail::AllreduceOptionsImpl& opts,
ReduceRangeFunction reduceInputs,
BroadcastRangeFunction broadcastOutputs) {
const auto& context = opts.context;
const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag);
const auto elementSize = opts.elementSize;
auto& out = opts.out[0];
constexpr auto n = 2;
// 算出这个算法的步数。
const auto groupSizePerStep = computeGroupSizePerStep(context->size, n);
struct group {
// Distance between peers in this group.
size_t peerDistance;
// Segment that this group is responsible for reducing.
size_t bufferOffset;
size_t bufferLength;
// The process ranks that are a member of this group.
std::vector<size_t> ranks;
// Upper bound of the length of the chunk that each process has the
// reduced values for by the end of the reduction for this group.
size_t chunkLength;
// Chunk within the segment that this process is responsible for reducing.
size_t myChunkOffset;
size_t myChunkLength;
};
// 在每个算法步骤计算组的详细信息。
// 我们将它保存在一个向量中,因为我们在reduce/scatter阶段以正序迭代它,在全聚集阶段以反向顺序迭代它。
std::vector<struct group> groups;
{
struct group group;
group.peerDistance = 1;
group.bufferOffset = 0;
group.bufferLength = opts.elements;
for (const size_t groupSize : groupSizePerStep) {
const size_t groupRank = (context->rank / group.peerDistance) % groupSize;
const size_t baseRank = context->rank - (groupRank * group.peerDistance);
group.ranks.reserve(groupSize);
for (size_t i = 0; i < groupSize; i++) {
group.ranks.push_back(baseRank + i * group.peerDistance);
}
// 每隔groupSize个进程是一组,也就是说一个组内的rank都隔着groupSize
// Compute the length of the chunk we're exchanging at this step.
group.chunkLength = ((group.bufferLength + (groupSize - 1)) / groupSize);
// 此过程正在计算当前段中位于 <rank>/<size> 的块的减少量。
//
group.myChunkOffset =
group.bufferOffset + (groupRank * group.chunkLength);
group.myChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) -
int64_t(groupRank * group.chunkLength))));
// Store a const copy of this group in the vector.
groups.push_back(group);
// 使用更新的对等距离和段偏移和长度进行初始化。
struct group nextGroup;
nextGroup.peerDistance = group.peerDistance * groupSize;
nextGroup.bufferOffset = group.myChunkOffset;
nextGroup.bufferLength = group.myChunkLength;
std::swap(group, nextGroup);
}
}
// 块长度向上取整,因此我们需要的最大暂存空间可能大于输出缓冲区的大小。 计算最大值
size_t bufferLength = opts.elements;
for (const auto& group : groups) {
bufferLength =
std::max(bufferLength, group.ranks.size() * group.chunkLength);
}
// 分配暂存空间以从对等方接收数据。
const size_t bufferSize = bufferLength * elementSize;
std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);
std::unique_ptr<transport::UnboundBuffer> tmp =
context->createUnboundBuffer(buffer.get(), bufferSize);
// Reduce/scatter.
for (size_t step = 0; step < groups.size(); step++) {
const auto& group = groups[step];
// 从对等点发出块的接收操作。
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
tmp->recv(
src,
slot,
i * group.chunkLength * elementSize,
group.myChunkLength * elementSize);
}
// 向对等方发出本地块的发送操作。
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto dst = group.ranks[i];
if (dst == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
// 仅在算法的第一步中计算局部reduce。
// 在随后的步骤中,我们已经得到了部分reduce的结果。
if (step == 0) {
reduceInputs(
currentChunkOffset * elementSize, currentChunkLength * elementSize);
}
out->send(
dst,
slot,
currentChunkOffset * elementSize,
currentChunkLength * elementSize);
}
// Wait for send and receive operations to complete.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
tmp->waitRecv();
out->waitSend();
}
// 第一步,准备这个进程负责的chunk
// 使用其输入的简化版本(如果指定了多个)。
if (step == 0) {
reduceInputs(
group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
}
// Reduce chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
opts.reduce(
static_cast<uint8_t*>(out->ptr) + (group.myChunkOffset * elementSize),
static_cast<const uint8_t*>(out->ptr) +
(group.myChunkOffset * elementSize),
static_cast<const uint8_t*>(tmp->ptr) +
(i * group.chunkLength * elementSize),
group.myChunkLength);
}
}
// 有一个块包含最终结果,并且该块已经可以在本地广播到 out[1..N](如果适用)。
// 这样做意味着我们只需要在本地广播到 out[1..N] 所有块,因为我们在 allgather 阶段从对等方接收到它们。
{
const auto& group = groups.back();
broadcastOutputs(
group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
}
// Allgather.
for (auto it = groups.rbegin(); it != groups.rend(); it++) {
const auto& group = *it;
// Issue receive operations for reduced chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
out->recv(
src,
slot,
currentChunkOffset * elementSize,
currentChunkLength * elementSize);
}
// Issue send operations for reduced chunk to peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto dst = group.ranks[i];
if (dst == context->rank) {
continue;
}
out->send(
dst,
slot,
group.myChunkOffset * elementSize,
group.myChunkLength * elementSize);
}
// Wait for operations to complete.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
out->waitRecv();
out->waitSend();
}
// Broadcast result to multiple output buffers, if applicable.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
broadcastOutputs(
currentChunkOffset * elementSize, currentChunkLength * elementSize);
}
}
}
alltoallv
同alltoall一样,只不过alltoallv的实现多了offset,没有什么高深的算法,只是在send-recv1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38void alltoallv(AlltoallvOptions& opts) {
const auto& context = opts.context;
transport::UnboundBuffer* in = opts.in.get();
transport::UnboundBuffer* out = opts.out.get();
std::vector<size_t>& inOffsetPerRank = opts.inOffsetPerRank;
std::vector<size_t>& inLengthPerRank = opts.inLengthPerRank;
std::vector<size_t>& outOffsetPerRank = opts.outOffsetPerRank;
std::vector<size_t>& outLengthPerRank = opts.outLengthPerRank;
const auto slot = Slot::build(kAlltoallSlotPrefix, opts.tag);
int myRank = context->rank;
int worldSize = context->size;
// Local copy.
GLOO_ENFORCE(inLengthPerRank[myRank] == outLengthPerRank[myRank]);
size_t myInOffset = inOffsetPerRank[myRank];
size_t myOutOffset = outOffsetPerRank[myRank];
size_t myChunkSize = inLengthPerRank[myRank];
memcpy(
static_cast<char*>(out->ptr) + myOutOffset,
static_cast<char*>(in->ptr) + myInOffset,
myChunkSize);
// Remote copy.
for (int i = 1; i < worldSize; i++) {
int sendRank = (myRank + i) % worldSize;
int recvRank = (myRank + worldSize - i) % worldSize;
in->send(
sendRank, slot, inOffsetPerRank[sendRank], inLengthPerRank[sendRank]);
out->recv(
recvRank, slot, outOffsetPerRank[recvRank], outLengthPerRank[recvRank]);
}
for (int i = 1; i < worldSize; i++) {
in->waitSend(opts.timeout);
out->waitRecv(opts.timeout);
}
}
barrier
如果是共有16进程的话,
0号进程会与15 1,14 2,12 4,8 8进程recv send
1号进程会与0 2,15 3,13 5,9 9进程recv send
1 | void barrier(BarrierOptions& opts) { |
broadcast
1 | void broadcast(BroadcastOptions& opts) { |
avx优化
一些reduce函数使用了avx。_mm256_cvtph_ps
将八个半精度(16 位)浮点值转换为单精度浮点值。_mm256_cvtps_ph
将八个单精度浮点值转换为半精度(16 位)浮点值。_mm_storeu_si128
将计算结果等SSE暂存器的数据保存到内存中。_mm256_mul_ps
对第一个源向量 m1 中的八个压缩单精度浮点元素(float32 元素)与第二个源向量 m2 中的八个 float32 元素执行 SIMD 乘法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//假设 x 和 y 要么都对齐到 32 字节,要么未对齐相同的偏移量,就像在对齐缓冲区内的偏移量处减少时会发生的那样
template <>
void sum<float16>(void* c_, const void* a_, const void* b_, size_t n) {
float16* c = static_cast<float16*>(c_);
const float16* a = static_cast<const float16*>(a_);
const float16* b = static_cast<const float16*>(b_);
size_t i;
for (i = 0; i < (n / 8) * 8; i += 8) {
__m256 va32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&a[i])));
__m256 vb32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&b[i])));
__m128i vc16 = _mm256_cvtps_ph(_mm256_add_ps(va32, vb32), 0);
_mm_storeu_si128((__m128i*)(&c[i]), vc16);
}
// Leftovers
for (; i < n; i++) {
c[i] = a[i] + b[i];
}
}
template <>
void product<float16>(void* c_, const void* a_, const void* b_, size_t n) {
float16* c = static_cast<float16*>(c_);
const float16* a = static_cast<const float16*>(a_);
const float16* b = static_cast<const float16*>(b_);
size_t i;
for (i = 0; i < (n / 8) * 8; i += 8) {
__m256 va32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&a[i])));
__m256 vb32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&b[i])));
__m128i vc16 = _mm256_cvtps_ph(_mm256_mul_ps(va32, vb32), 0);
_mm_storeu_si128((__m128i*)(&c[i]), vc16);
}
// Leftovers
for (; i < n; i++) {
c[i] = a[i] * b[i];
}
}
template <>
void max<float16>(void* c_, const void* a_, const void* b_, size_t n) {
float16* c = static_cast<float16*>(c_);
const float16* a = static_cast<const float16*>(a_);
const float16* b = static_cast<const float16*>(b_);
size_t i;
for (i = 0; i < (n / 8) * 8; i += 8) {
__m256 va32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&a[i])));
__m256 vb32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&b[i])));
__m128i vc16 = _mm256_cvtps_ph(_mm256_max_ps(va32, vb32), 0);
_mm_storeu_si128((__m128i*)(&c[i]), vc16);
}
// Leftovers
for (; i < n; i++) {
c[i] = std::max(a[i], b[i]);
}
}
template <>
void min<float16>(void* c_, const void* a_, const void* b_, size_t n) {
float16* c = static_cast<float16*>(c_);
const float16* a = static_cast<const float16*>(a_);
const float16* b = static_cast<const float16*>(b_);
size_t i;
for (i = 0; i < (n / 8) * 8; i += 8) {
__m256 va32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&a[i])));
__m256 vb32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&b[i])));
__m128i vc16 = _mm256_cvtps_ph(_mm256_min_ps(va32, vb32), 0);
_mm_storeu_si128((__m128i*)(&c[i]), vc16);
}
// Leftovers
for (; i < n; i++) {
c[i] = std::min(a[i], b[i]);
}
}
reduce
1 | void reduce(ReduceOptions& opts) { |