gloo介绍

Gloo 是一个集合通信库。它带有许多对机器学习应用有用的集体算法。参与机器之间的数据传输是抽象的,因此可以随时使用 IP,或者在可用时使用 InifiniBand(或 RoCE)。 在后一种情况下,如果使用 InfiniBand 传输,GPUDirect可用于加速跨机器 GPU 到 GPU 的内存传输。

在适用的情况下,算法具有一种适用于系统内存缓冲区的实现,以及一种适用于 NVIDIA GPU 内存缓冲区的实现。 在后一种情况下,主机和设备之间不需要复制内存; 这是由算法实现处理的。

aligned_allocator

将分配的内存区域对齐到32字节。使用了using和模板。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Align buffers to 32 bytes to support vectorized code
const size_t kBufferAlignment = 32;

template <typename T, int ALIGNMENT = kBufferAlignment>
class aligned_allocator {
static_assert(
!(ALIGNMENT & (ALIGNMENT - 1)),
"alignment must be a power of 2");

public:
using value_type = T;
using pointer = value_type*;
using const_pointer = const value_type*;
using reference = value_type&;
using const_reference = const value_type&;
using size_type = std::size_t;
using difference_type = std::ptrdiff_t;

template <typename U>
struct rebind {
using other = aligned_allocator<U, ALIGNMENT>;
};

inline explicit aligned_allocator() = default;
inline ~aligned_allocator() = default;
inline explicit aligned_allocator(const aligned_allocator& a) = default;

inline pointer address(reference r) {
return &r;
}

inline const_pointer address(const_reference r) {
return &r;
}

inline pointer allocate(size_type sz) {
pointer p;
if (posix_memalign(
reinterpret_cast<void**>(&p), ALIGNMENT, sizeof(T) * sz)) {
abort();
}
return p;
}

void deallocate(pointer p, size_type /*sz*/) {
free(p);
}
};

调用posix_memalign(void **memptr, size_t alignment, size_t size)成功时会返回size字节的动态内存,并且这块内存的地址是alignment的倍数。参数alignment必须是2的幂,还是void指针的大小的倍数。返回的内存块的地址放在了memptr里面,函数返回值是0。调用失败时,没有内存会被分配,memptr的值没有被定义,返回如下错误码之一:

  • EINVAL:参数不是2的幂,或者不是void指针的倍数。
  • ENOMEM:没有足够的内存去满足函数的请求。

transport

一个样例是这样调用mpi的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int main(int /*argc*/, char** /*argv*/) {
// We'll use the TCP transport in this example
auto dev = gloo::transport::tcp::CreateDevice("localhost");

// Create Gloo context and delegate management of MPI_Init/MPI_Finalize
auto context = gloo::mpi::Context::createManaged();
context->connectFullMesh(dev);

// Create and run simple allreduce
int rank = context->rank;
gloo::AllreduceRing<int> allreduce(context, {&rank}, 1);
allreduce.run();
std::cout << "Result: " << rank << std::endl;

return 0;
}

transport共有三种通信方式:ibverbs、tcp、uv。

ibverbs

应该是RDMA方式。context应该是上下文,记录通信设备和rank相关信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
namespace ibverbs {

Context::Context(std::shared_ptr<Device> device, int rank, int size)
: ::gloo::transport::Context(rank, size), device_(device) {}

Context::~Context() {}

std::unique_ptr<transport::Pair>& Context::createPair(int rank) {
pairs_[rank] = std::unique_ptr<transport::Pair>(
new ibverbs::Pair(device_, getTimeout()));
return pairs_[rank];
}

std::unique_ptr<transport::UnboundBuffer> Context::createUnboundBuffer(
void* ptr,
size_t size) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION(
"Unbound buffers not supported yet for ibverbs transport");
return std::unique_ptr<transport::UnboundBuffer>();
}

} // namespace ibverbs

这个pair是不能复制和赋值,避免出现内存问题,感觉应该是通信双方组成一个pair:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Forward declaration
class Buffer;

class Pair : public ::gloo::transport::Pair {
static constexpr int kMaxBuffers = 8;
static constexpr auto kRecvCompletionQueueCapacity = kMaxBuffers;
static constexpr auto kSendCompletionQueueCapacity = kMaxBuffers;
static constexpr auto kCompletionQueueCapacity =
kRecvCompletionQueueCapacity + kSendCompletionQueueCapacity;

// The ibv_req_notify(3) function takes an argument called
// 'solicited_only' which makes it only trigger a notification for
// work requests that are flagged as solicited. Every completion
// should trigger a notification, so always pass 0.
static constexpr auto kNotifyOnAnyCompletion = 0;

public:
explicit Pair(
const std::shared_ptr<Device>& dev,
std::chrono::milliseconds timeout);

virtual ~Pair();

Pair(const Pair& that) = delete;

Pair& operator=(const Pair& that) = delete;

virtual const Address& address() const override;
virtual void connect(const std::vector<char>& bytes) override;
virtual void setSync(bool enable, bool busyPoll) override;

virtual std::unique_ptr<::gloo::transport::Buffer>
createSendBuffer(int slot, void* ptr, size_t size) override;

virtual std::unique_ptr<::gloo::transport::Buffer>
createRecvBuffer(int slot, void* ptr, size_t size) override;

// Send from the specified buffer to remote side of pair.
virtual void send(
transport::UnboundBuffer* tbuf,
uint64_t tag,
size_t offset,
size_t nbytes) override;

// Receive into the specified buffer from the remote side of pair.
virtual void recv(
transport::UnboundBuffer* tbuf,
uint64_t tag,
size_t offset,
size_t nbytes) override;

void handleCompletionEvent();

void pollCompletions();

void handleCompletion(struct ibv_wc* wc);

void send(Buffer* buf, size_t offset, size_t length, size_t roffset);

void close() override;

protected:
std::shared_ptr<Device> dev_;

// Whether or not this pair is running in sync mode.
std::atomic<bool> sync_;

// Whether or not this pair is busy polling in sync mode.
std::atomic<bool> busyPoll_;

const std::chrono::milliseconds timeout_;

// 该pair的完成队列处理的完成事件数。在销毁完成队列之前,需要确认这么多事件。否则,销毁将挂起。
int completionEventsHandled_;

Address self_;
Address peer_;

struct ibv_cq* cq_;
struct ibv_qp* qp_;

std::mutex m_;
std::condition_variable cv_;

// For us to copy the remote peer's ibv_mr into.
std::map<int, struct ibv_mr> peerMemoryRegions_;

// 这些字段存储pair的远程端可以发送到的内存区域以及pair的本地端可以从中发送的内存区域。
// 注册接收缓冲区时,本地 ibv_mr 被发送到pair的远程端,并且相应的 MemoryRegion 实例保留在 mappedSendRegions_ 列表中,直到发送操作完成。
// 为了允许pair的远程端发送其内存区域,我们在 mappedRecvRegions_ 中保留了固定数量的 MemoryRegion 实例。
// 对于每个发布的接收工作请求,这些区域都会被循环引用。
std::map<int, std::unique_ptr<MemoryRegion> > mappedSendRegions_;
std::array<std::unique_ptr<MemoryRegion>, kMaxBuffers> mappedRecvRegions_;

// 跟踪发布和完成的请求工作请求的数量。 在发布 WR 和完成 WR 时,都需要对 mappedRecvRegions_ 数组进行索引。
uint64_t recvPosted_;

// Completions on behalf of buffers need to be forwarded to those buffers.
std::map<int, Buffer*> sendCompletionHandlers_;
std::map<int, Buffer*> recvCompletionHandlers_;

void sendMemoryRegion(struct ibv_mr* mr, int slot);
const struct ibv_mr* getMemoryRegion(int slot);

void postReceive();

std::chrono::milliseconds getTimeout() const {
return timeout_;
}

const Address& peer() const {
return peer_;
}

private:
std::exception_ptr ex_;
bool closed_ = false;

// Used to signal IO exceptions from one thread and propagate onto others.
void signalIoFailure(const std::string& msg);
void checkErrorState();

friend class Buffer;
};

以下是逐个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
Pair::Pair(
const std::shared_ptr<Device>& dev,
std::chrono::milliseconds timeout)
: dev_(dev),
sync_(false),
busyPoll_(false),
timeout_(timeout),
completionEventsHandled_(0),
recvPosted_(0),
ex_(nullptr) {
int rv;

// Create completion queue
{
// 必须向设备的完成通道注册此完成队列以支持异步完成处理。
// Pairs 默认使用异步完成处理,因此我们调用 ibv_req_notify_cq(3) 来请求第一个通知。
cq_ = ibv_create_cq(
dev_->context_,
kCompletionQueueCapacity,
this,
dev_->comp_channel_,
0);
GLOO_ENFORCE(cq_);

// 在完成队列 (CQ) 上请求Completion Notification(完成通知)。
rv = ibv_req_notify_cq(cq_, kNotifyOnAnyCompletion);
GLOO_ENFORCE_EQ(rv, 0);
}

// Create queue pair
{
struct ibv_qp_init_attr attr;
memset(&attr, 0, sizeof(struct ibv_qp_init_attr));
attr.send_cq = cq_;
attr.recv_cq = cq_;
attr.cap.max_send_wr = Pair::kSendCompletionQueueCapacity;
attr.cap.max_recv_wr = Pair::kRecvCompletionQueueCapacity;
attr.cap.max_send_sge = 1;
attr.cap.max_recv_sge = 1;
attr.qp_type = IBV_QPT_RC;
qp_ = ibv_create_qp(dev->pd_, &attr);
// 创建queue pair
GLOO_ENFORCE(qp_);
}

// Init queue pair
{
struct ibv_qp_attr attr;
memset(&attr, 0, sizeof(struct ibv_qp_attr));
attr.qp_state = IBV_QPS_INIT;
attr.pkey_index = 0;
attr.port_num = dev_->attr_.port;
attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE;
rv = ibv_modify_qp(
qp_,
&attr,
IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS);
GLOO_ENFORCE_EQ(rv, 0);
}

// Populate local address.
// The Packet Sequence Number field (PSN) is random which makes that
// the remote end of this pair needs to have the contents of the
// full address struct in order to connect, and vice versa.
{
struct ibv_port_attr attr;
memset(&attr, 0, sizeof(struct ibv_port_attr));
rv = ibv_query_port(dev_->context_, dev_->attr_.port, &attr);
GLOO_ENFORCE_EQ(rv, 0);
rv = ibv_query_gid(
dev_->context_,
dev_->attr_.port,
dev_->attr_.index,
&self_.addr_.ibv_gid);
GLOO_ENFORCE_EQ(rv, 0);
self_.addr_.lid = attr.lid;
self_.addr_.qpn = qp_->qp_num;
self_.addr_.psn = rand() & 0xffffff;
}

// 在连接之前发布接收请求。
// 每当这pair的远程端注册接收缓冲区时,就会触发它们的内存注册被发送到这一端。
// 由于这些发送是单方面的,我们总是需要一整套接收工作请求。
// 内存区域接收可以与常规缓冲区写入交错,因此我们主动在每个接收工作请求中包含一个内存区域。
for (int i = 0; i < kMaxBuffers; ++i) {
mappedRecvRegions_[i] = make_unique<MemoryRegion>(dev_->pd_);
postReceive();
}
}

Pair::~Pair() {
int rv;

// Acknowledge number of completion events handled by this
// pair's completion queue (also see ibv_get_cq_event(3)).
ibv_ack_cq_events(cq_, completionEventsHandled_);

rv = ibv_destroy_qp(qp_);
GLOO_ENFORCE_EQ(rv, 0);

rv = ibv_destroy_cq(cq_);
GLOO_ENFORCE_EQ(rv, 0);
}

void Pair::close() {
if (closed_) {
// TODO: add proper handling of duplicate closes T21171834
return;
}

closed_ = true;
}

const Address& Pair::address() const {
return self_;
}

连接函数先获取到对方的地址,更新attr结构体,使用ibv_modify_qp函数修改RDMA通信所需的结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
void Pair::connect(const std::vector<char>& bytes) {
struct ibv_qp_attr attr;
int rv;
checkErrorState();

peer_ = Address(bytes);

memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTR;
attr.path_mtu = IBV_MTU_1024;
attr.dest_qp_num = peer_.addr_.qpn;
attr.rq_psn = peer_.addr_.psn;
attr.max_dest_rd_atomic = 1;
attr.min_rnr_timer = 20;
attr.ah_attr.is_global = 0;
attr.ah_attr.dlid = peer_.addr_.lid;
attr.ah_attr.port_num = dev_->attr_.port;
if (peer_.addr_.ibv_gid.global.interface_id) {
attr.ah_attr.is_global = 1;
attr.ah_attr.grh.hop_limit = 1;
attr.ah_attr.grh.dgid = peer_.addr_.ibv_gid;
attr.ah_attr.grh.sgid_index = dev_->attr_.index;
}

// ibv_modify_qp()修改队列对的属性。更改的属性描述了QP的发送和接收属性。
// ibv_create_qp仅仅分配了资源,要通过这个modify来让硬件进入工作状态。
// Move to Ready To Receive (RTR) state
rv = ibv_modify_qp(
qp_,
&attr,
IBV_QP_STATE | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN |
IBV_QP_AV | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER);
GLOO_ENFORCE_EQ(rv, 0);

memset(&attr, 0, sizeof(attr));
attr.qp_state = IBV_QPS_RTS;
attr.sq_psn = self_.addr_.psn;
attr.ah_attr.is_global = 1;
attr.timeout = 14;
attr.retry_cnt = 7;
attr.rnr_retry = 7; /* infinite */
attr.max_rd_atomic = 1;

// Move to Ready To Send (RTS) state
rv = ibv_modify_qp(
qp_,
&attr,
IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC);
GLOO_ENFORCE_EQ(rv, 0);
}

// Switches the pair into synchronous mode.
//
// Note: busy polling is NOT optional. Currently, since all pairs
// share a single completion channel, busy polling is mandatory
// through ibv_poll_cq(3). If a use case comes up for supporting
// synchronous mode where the calling thread should be suspended, this
// can be revisited and we can add a completion channel per pair.
//
void Pair::setSync(bool sync, bool busyPoll) {
checkErrorState();
if (!sync) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION("Can only switch to sync mode");
}
if (!busyPoll) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION(
"The ibverbs transport only supports busy polling in sync mode");
}

// The notification mechanism for this pair's completion queue is
// still armed. This means the device thread will still call
// handleCompletions() one more time, but this is ignored.
//
// No need to lock a mutex; these are atomics.
//
sync_ = true;
busyPoll_ = true;
}

使用ibv_post_send函数发送,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void Pair::sendMemoryRegion(struct ibv_mr* src, int slot) {
auto mr = make_unique<MemoryRegion>(dev_->pd_, src);
struct ibv_sge list = mr->sge();
struct ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
wr.wr_id = slot;
wr.sg_list = &list;
wr.num_sge = 1;
wr.opcode = IBV_WR_SEND_WITH_IMM;
wr.send_flags = IBV_SEND_SIGNALED;
wr.imm_data = slot;

// 工作请求被序列化并发送到驱动程序,因此它不需要在 ibv_post_send 调用后有效。
// ibv_post_send和recv用于发送verb,verb承载在一个称为ibv_send_wr或者ibv_recv_wr的数据结构中,里面是verb类型和mr的相关细节。
struct ibv_send_wr* bad_wr = nullptr;
int rv = ibv_post_send(qp_, &wr, &bad_wr);
if (rv != 0) {
signalIoFailure(GLOO_ERROR_MSG("ibv_post_send: ", rv));
}

GLOO_ENFORCE_EQ(mappedSendRegions_.count(slot), 0);
mappedSendRegions_[slot] = std::move(mr);
}

先获取到锁,如果是异步的,需要检查是不是超时了;否则就等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
const struct ibv_mr* Pair::getMemoryRegion(int slot) {
std::unique_lock<std::mutex> lock(m_);
if (sync_) {
auto it = peerMemoryRegions_.find(slot);
auto start = std::chrono::steady_clock::now();
while (it == peerMemoryRegions_.end()) {
lock.unlock();
pollCompletions();
lock.lock();
if (timeout_ != kNoTimeout &&
(std::chrono::steady_clock::now() - start) >= timeout_) {
lock.unlock();
signalIoFailure(
GLOO_ERROR_MSG(
"Timeout waiting for memory region from ",
peer_.str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
it = peerMemoryRegions_.find(slot);
}
return &it->second;
} else {
auto pred = [&]{
return peerMemoryRegions_.find(slot) != peerMemoryRegions_.end();
};
if (timeout_ == kNoTimeout) {
// No timeout set. Wait for read to complete.
cv_.wait(lock, pred);
} else {
auto done = cv_.wait_for(lock, timeout_, pred);
if (!done) {
signalIoFailure(
GLOO_ERROR_MSG(
"Timeout waiting for memory region from ",
peer_.str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
auto it = peerMemoryRegions_.find(slot);
GLOO_ENFORCE(it != peerMemoryRegions_.end());
return &it->second;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
void Pair::postReceive() {
const auto& mr = mappedRecvRegions_[recvPosted_++ % kMaxBuffers];
struct ibv_sge list = mr->sge();
struct ibv_recv_wr wr;
memset(&wr, 0, sizeof(wr));
wr.sg_list = &list;
wr.num_sge = 1;

// 工作请求被序列化并发送到驱动程序,因此它不需要在 ibv_post_recv 调用后有效。
struct ibv_recv_wr* bad_wr = nullptr;
auto rv = ibv_post_recv(qp_, &wr, &bad_wr);
if (rv != 0) {
signalIoFailure(GLOO_ERROR_MSG("ibv_post_recv: ", rv));
}
}

std::unique_ptr<::gloo::transport::Buffer>
Pair::createSendBuffer(int slot, void* ptr, size_t size) {
// 创建一个buffer
std::unique_lock<std::mutex> lock(m_);
GLOO_ENFORCE_EQ(sendCompletionHandlers_.count(slot), 0);
auto buffer = new Buffer(this, slot, ptr, size);
sendCompletionHandlers_[slot] = buffer;
return std::unique_ptr<::gloo::transport::Buffer>(buffer);
}

std::unique_ptr<::gloo::transport::Buffer>
Pair::createRecvBuffer(int slot, void* ptr, size_t size) {
std::unique_lock<std::mutex> lock(m_);
GLOO_ENFORCE_EQ(recvCompletionHandlers_.count(slot), 0);
auto buffer = new Buffer(this, slot, ptr, size);
recvCompletionHandlers_[slot] = buffer;
sendMemoryRegion(buffer->mr_, buffer->slot_);
return std::unique_ptr<::gloo::transport::Buffer>(buffer);
}

// Send from the specified buffer to remote side of pair.
void Pair::send(
transport::UnboundBuffer* tbuf,
uint64_t /* unused */,
size_t /* unused */,
size_t /* unused */) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION(
"Unbound buffers not supported yet for ibverbs transport");
}

// Receive into the specified buffer from the remote side of pair.
void Pair::recv(
transport::UnboundBuffer* tbuf,
uint64_t /* unused */,
size_t /* unused */,
size_t /* unused */) {
GLOO_THROW_INVALID_OPERATION_EXCEPTION(
"Unbound buffers not supported yet for ibverbs transport");
}

// handleCompletionEvent is called by the device thread when it
// received an event for this pair's completion queue on its
// completion channel.
void Pair::handleCompletionEvent() {
int rv;

completionEventsHandled_++;

// If in sync mode, the pair was just switched and this is
// the last notification from the device thread because
// the notification mechanism is not re-armed below.
if (sync_) {
return;
}

try {
checkErrorState();

// Arm notification mechanism for completion queue.
rv = ibv_req_notify_cq(cq_, kNotifyOnAnyCompletion);
GLOO_ENFORCE_EQ(rv, 0);

// Now poll for work completions to drain the completion queue.
std::unique_lock<std::mutex> lock(m_);
pollCompletions();
} catch (const ::gloo::IoException&) {
// Catch IO exceptions on the event handling thread. The exception has
// already been saved and user threads signaled.
}
}

轮询这pair的完成队列以获取工作完成情况。当从设备线程调用时,这对的互斥锁已经被获取。从用户线程调用时,不会获取互斥锁(因为只有一个线程使用这对)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
void Pair::pollCompletions() {
std::array<struct ibv_wc, kCompletionQueueCapacity> wc;

// Invoke handler for every work completion.
for (;;) {
auto nwc = ibv_poll_cq(cq_, wc.size(), wc.data());
GLOO_ENFORCE_GE(nwc, 0);

// Handle work completions
for (int i = 0; i < nwc; i++) {
checkErrorState();
handleCompletion(&wc[i]);
}

// Break unless wc was filled
if (nwc == 0 || nwc < wc.size()) {
break;
}
}
}

void Pair::handleCompletion(struct ibv_wc* wc) {
if (wc->opcode == IBV_WC_RECV_RDMA_WITH_IMM) {
// Incoming RDMA write completed.
// Slot is encoded in immediate data on receive work completion.
// It is set in the Pair::send function.
auto slot = wc->imm_data;
GLOO_ENFORCE_EQ( wc->status, IBV_WC_SUCCESS, "Recv for slot ", slot, ": ", ibv_wc_status_str(wc->status));

GLOO_ENFORCE(recvCompletionHandlers_[slot] != nullptr);
recvCompletionHandlers_[slot]->handleCompletion(wc);

// Backfill receive work requests.
postReceive();
} else if (wc->opcode == IBV_WC_RDMA_WRITE) {
// Outbound RDMA write completed.
// Slot is encoded in wr_id fields on send work request. Unlike
// the receive work completions, the immediate data field on send
// work requests are not pass to the respective work completion.
auto slot = wc->wr_id;
GLOO_ENFORCE_EQ( wc->status, IBV_WC_SUCCESS, "Send for slot ", slot, ": ", ibv_wc_status_str(wc->status));

GLOO_ENFORCE(sendCompletionHandlers_[slot] != nullptr);
sendCompletionHandlers_[slot]->handleCompletion(wc);
} else if (wc->opcode == IBV_WC_RECV) {
// 内存区域 recv 完成。
// 仅由pair的远程端用于传递 ibv_mr。
// 它们以 FIFO 顺序写入,因此我们可以在映射的接收区域列表中选择并使用第一个 MemoryRegion 实例。
// 尝试写入此插槽的缓冲区可能正在等待该对的另一端发送其内存区域。
// 锁定访问权限,然后通知任何等待的人。
// 时隙在接收工作完成后立即编码为数据。 它在 Pair::sendMemoryRegion 函数中设置。
auto slot = wc->imm_data;
GLOO_ENFORCE_EQ(
wc->status,
IBV_WC_SUCCESS,
"Memory region recv for slot ",
slot,
": ",
ibv_wc_status_str(wc->status));

// Move ibv_mr from memory region 'inbox' to final slot.
const auto& mr = mappedRecvRegions_[recvPosted_ % kMaxBuffers];
peerMemoryRegions_[slot] = mr->mr();

// Notify any buffer waiting for the details of its remote peer.
cv_.notify_all();

// Backfill receive work requests.
postReceive();
} else if (wc->opcode == IBV_WC_SEND) {
// Memory region send completed.
auto slot = wc->wr_id;
GLOO_ENFORCE_EQ(
wc->status,
IBV_WC_SUCCESS,
"Memory region send for slot ",
slot,
": ",
ibv_wc_status_str(wc->status));

GLOO_ENFORCE_GT(mappedSendRegions_.size(), 0);
GLOO_ENFORCE_EQ(mappedSendRegions_.count(slot), 1);
mappedSendRegions_.erase(slot);
} else {
GLOO_ENFORCE(false, "Unexpected completion with opcode: ", wc->opcode);
}
}

使用一些信息填充结构体,获取到内存区域后发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
void Pair::send(Buffer* buffer, size_t offset, size_t length, size_t roffset) {
struct ibv_sge list;
list.addr = (uint64_t)buffer->ptr_ + offset;
list.length = length;
list.lkey = buffer->mr_->lkey;

struct ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
wr.wr_id = buffer->slot_;
wr.sg_list = &list;
wr.num_sge = 1;
wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
wr.send_flags = IBV_SEND_SIGNALED;
wr.imm_data = buffer->slot_;

const struct ibv_mr* peer = getMemoryRegion(buffer->slot_);
GLOO_ENFORCE_NE(peer, (const struct ibv_mr*)nullptr);
wr.wr.rdma.remote_addr = (uint64_t)peer->addr + roffset;
wr.wr.rdma.rkey = peer->rkey;

struct ibv_send_wr* bad_wr;
auto rv = ibv_post_send(qp_, &wr, &bad_wr);
if (rv != 0) {
signalIoFailure(GLOO_ERROR_MSG("ibv_post_send: ", rv));
}
}

void Pair::signalIoFailure(const std::string& msg) {
std::lock_guard<std::mutex> lock(m_);
auto ex = ::gloo::IoException(msg);
if (ex_ == nullptr) {
// If we haven't seen an error yet, store the exception to throw on future calling threads.
ex_ = std::make_exception_ptr(ex);
// Loop through the completion handlers and signal that an error has
// occurred.
for (auto& it : recvCompletionHandlers_) {
GLOO_ENFORCE(it.second != nullptr);
it.second->signalError(ex_);
}
for (auto& it : sendCompletionHandlers_) {
GLOO_ENFORCE(it.second != nullptr);
it.second->signalError(ex_);
}
}
// Finally, throw the exception on this thread.
throw ex;
};

void Pair::checkErrorState() {
// If we previously encountered an error, rethrow here.
if (ex_ != nullptr) {
std::rethrow_exception(ex_);
}
}

device 保存了设备信息,以下是构建一个设备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Device::Device(const struct attr& attr, ibv_context* context)
: attr_(attr),
pciBusID_(infinibandToBusID(attr.name)),
hasNvPeerMem_(kernelModules().count("nv_peer_mem") > 0),
context_(context) {
int rv;

// Query and store device attributes
rv = ibv_query_device(context_, &deviceAttr_);
GLOO_ENFORCE_EQ(rv, 0, "ibv_query_device: ", strerror(errno));

// Query and store port attributes
rv = ibv_query_port(context_, attr_.port, &portAttr_);
GLOO_ENFORCE_EQ(rv, 0, "ibv_query_port: ", strerror(errno));

// Protection domain
pd_ = ibv_alloc_pd(context_);
GLOO_ENFORCE(pd_);

// Completion channel
comp_channel_ = ibv_create_comp_channel(context_);
GLOO_ENFORCE(comp_channel_);

// Start thread to poll completion queue and dispatch
// completions for completed work requests.
done_ = false;
loop_.reset(new std::thread(&Device::loop, this));
}

buffer分配一个缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Buffer::Buffer(Pair* pair, int slot, void* ptr, size_t size)
: ::gloo::transport::Buffer(slot, ptr, size),
pair_(pair),
recvCompletions_(0),
sendCompletions_(0),
sendPending_(0),
ex_(nullptr) {
// 注册一个memory region
mr_ = ibv_reg_mr(
pair_->dev_->pd_,
ptr_,
size_,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);

// Provide hint if the error is EFAULT and nv_peer_mem is not loaded
if (mr_ == nullptr && errno == EFAULT) {
if (!pair->dev_->hasNvPeerMem_) {
GLOO_ENFORCE(
mr_ != nullptr,
"ibv_reg_mr: ",
strerror(errno),
" (kernel module 'nv_peer_mem' not loaded;"
" did you specify a pointer to GPU memory?)");
}
}

// Provide hint if the error is ENOMEM
if (mr_ == nullptr && errno == ENOMEM) {
GLOO_ENFORCE(
mr_ != nullptr,
"ibv_reg_mr: ",
strerror(errno),
" (did you run into the locked memory limit?)");
}

GLOO_ENFORCE(mr_ != nullptr, "ibv_reg_mr: ", strerror(errno));
}

等待接收操作完成。根据是不是异步判断是否需要等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void Buffer::waitRecv() {
// 如果该pair处于同步模式,则当前线程负责轮询工作完成情况。
// 由于单个pair可能为多个缓冲区提供服务,因此完成可能旨在用于另一个缓冲区。
auto timeout = pair_->getTimeout();
if (pair_->sync_) {
auto start = std::chrono::steady_clock::now();
// We can assume a single pair is never used by more than one
// thread, so there is no need to acquire the mutex here.
while (recvCompletions_ == 0) {
pair_->pollCompletions();
if (timeout != kNoTimeout &&
(std::chrono::steady_clock::now() - start) >= timeout) {
pair_->signalIoFailure(
GLOO_ERROR_MSG("Read timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
recvCompletions_--;
} else {
// The device thread will signal completion. If the completion
// hasn't arrived yet, wait until it does.
auto pred = [&]{
checkErrorState();
return recvCompletions_ > 0;
};
std::unique_lock<std::mutex> lock(m_);
if (timeout == kNoTimeout) {
// No timeout set. Wait for read to complete.
recvCv_.wait(lock, pred);
} else {
auto done = recvCv_.wait_for(lock, timeout, pred);
if (!done) {
// Release the mutex before calling into the pair to avoid deadlock.
// Calling signalIoFailure() will throw, so no need to
// reacquire.
lock.unlock();
pair_->signalIoFailure(
GLOO_ERROR_MSG("Read timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
recvCompletions_--;
}
}

等待发送操作完成。根据是不是异步判断是否需要等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Wait for the previous send operation to finish.
void Buffer::waitSend() {
// 如果该pair处于同步模式,则当前线程负责轮询工作完成情况。
auto timeout = pair_->getTimeout();
if (pair_->sync_) {
// We can assume a single pair is never used by more than one
// thread, so there is no need to acquire the mutex here.
if (sendCompletions_ == 0) {
GLOO_ENFORCE_GT(sendPending_, 0, "No send to wait for");
auto start = std::chrono::steady_clock::now();
// We can assume a single pair is never used by more than one
// thread, so there is no need to acquire the mutex here.
while (sendCompletions_ == 0) {
pair_->pollCompletions();
if (timeout != kNoTimeout &&
(std::chrono::steady_clock::now() - start) >= timeout) {
pair_->signalIoFailure(
GLOO_ERROR_MSG("Send timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
}
sendCompletions_--;
} else {
// The device thread will signal completion. If the completion
// hasn't arrived yet, wait until it does.
std::unique_lock<std::mutex> lock(m_);
checkErrorState();
if (sendCompletions_ == 0) {
GLOO_ENFORCE_GT(sendPending_, 0, "No send to wait for");
auto pred = [&]{
checkErrorState();
return sendCompletions_ > 0;
};
if (timeout == kNoTimeout) {
// No timeout set. Wait for read to complete.
sendCv_.wait(lock, pred);
} else {
auto done = sendCv_.wait_for(lock, timeout, pred);
if (!done) {
// Release the mutex before calling into the pair to avoid deadlock.
// Calling signalIoFailure() will throw, so no need to
// reacquire.
lock.unlock();
pair_->signalIoFailure(
GLOO_ERROR_MSG("Send timeout ", pair_->peer().str()));
GLOO_ENFORCE(false, "Unexpected code path");
}
}
}
sendCompletions_--;
}
}

void Buffer::send(size_t offset, size_t length, size_t roffset) {
int rv;

// Can't assert on roffset, since we don't know the size of
// the remote buffer. Refactor of initialization code needed
// to support this.
GLOO_ENFORCE_LE(offset + length, size_);

{
std::unique_lock<std::mutex> lock(m_);
checkErrorState();
}

if (debug_) {
std::cout << "[" << getpid() << "] ";
std::cout << "send " << length << " bytes";
std::cout << std::endl;
}

// Increment number of sends in flight
sendPending_++;

pair_->send(this, offset, length, roffset);
}

tcp

TCP中包含一个tls层。TLS(Transport Layer Security,安全传输层),TLS是建立在传输层TCP协议之上的协议,服务于应用层,它的前身是SSL(Secure Socket Layer,安全套接字层),它实现了将应用层的报文进行加密后再交由TCP进行传输的功能。

tls

上下文:

1
2
3
4
5
6
7
8
9
Context::Context(std::shared_ptr<Device> device, int rank, int size)
: ::gloo::transport::tcp::Context(
std::dynamic_pointer_cast<::gloo::transport::tcp::Device>(device),
rank, size),
ssl_ctx_(create_ssl_ctx(c_str_or_null(device->getPKeyFile()),
c_str_or_null(device->getCertFile()),
c_str_or_null(device->getCAFile()),
c_str_or_null(device->getCAPath())),
[](::SSL_CTX *x) { ::_glootls::SSL_CTX_free(x); }) {}

真正创建ssl context的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
SSL_CTX *Context::create_ssl_ctx(const char *pkey, const char *cert,
const char *ca_file, const char *ca_path) {
GLOO_ENFORCE(pkey != nullptr && cert != nullptr,
"Private key and certificate location must be specified");
GLOO_ENFORCE(ca_file != nullptr || ca_path != nullptr,
"CAfile or CApath must be specified");
static std::once_flag ssl_ctx_init_;
std::call_once(ssl_ctx_init_, [] {
// SSL_load_error_strings();
// SSL_library_init();
_glootls::OPENSSL_init_ssl(
OPENSSL_INIT_LOAD_SSL_STRINGS | OPENSSL_INIT_LOAD_CRYPTO_STRINGS, NULL);
_glootls::OPENSSL_init_ssl(0, NULL);
});
SSL_CTX *ssl_ctx = _glootls::SSL_CTX_new(_glootls::TLS_method());
GLOO_ENFORCE(ssl_ctx != nullptr, getSSLErrorMessage());
GLOO_ENFORCE(
_glootls::SSL_CTX_set_min_proto_version(ssl_ctx, TLS_MAX_VERSION) == 1,
getSSLErrorMessage());

// As we don't need to handle legacy clients,
// let's remove support for legacy renegotiation:
_glootls::SSL_CTX_clear_options(ssl_ctx, SSL_OP_LEGACY_SERVER_CONNECT);

_glootls::SSL_CTX_set_verify_depth(ssl_ctx, 1);

// To enforcing a higher security level, set it to 3.
//
// 2级
// 安全级别设置为 112 位安全。 因此,禁止使用短于 2048 位的 RSA、DSA 和 DH 密钥以及短于 224 位的 ECC 密钥。
// 除了 1 级排除之外,还禁止使用任何使用 RC4 的密码套件。 SSL 版本 3 也是不允许的。 压缩被禁用。
//
// Level 3
// 安全级别设置为 128 位安全。
// 因此,禁止使用小于 3072 位的 RSA、DSA 和 DHkey 以及小于 256 位的 ECC 密钥。
// 除了 2 级排除之外,禁止使用不提供前向保密的密码套件。 不允许使用低于 1.1 的 TLS 版本。 会话票证被禁用。
//
// TODO: should be 3, but it doesn't work yet :(
_glootls::SSL_CTX_set_security_level(ssl_ctx, 2);

GLOO_ENFORCE(
_glootls::SSL_CTX_load_verify_locations(ssl_ctx, ca_file, ca_path) == 1,
getSSLErrorMessage());
GLOO_ENFORCE(_glootls::SSL_CTX_use_certificate_chain_file(ssl_ctx, cert) == 1,
getSSLErrorMessage());
GLOO_ENFORCE(_glootls::SSL_CTX_use_PrivateKey_file(ssl_ctx, pkey,
SSL_FILETYPE_PEM) == 1,
getSSLErrorMessage());
GLOO_ENFORCE(_glootls::SSL_CTX_check_private_key(ssl_ctx) == 1,
getSSLErrorMessage());
// SSL_VERIFY_PEER
//
// 服务器模式:服务器向客户端发送客户端证书请求。
// 检查返回的证书(如果有)。 如果验证过程失败,TLS/SSL 握手会立即终止,并发出一条包含验证失败原因的警报消息。
// 该行为可以通过附加的 SSL_VERIFY_FAIL_IF_NO_PEER_CERT 和 SSL_VERIFY_CLIENT_ONCE 标志来控制。
//
// 客户端模式:验证服务器证书。
// 如果验证过程失败,TLS/SSL 握手会立即终止,并发出一条包含验证失败原因的警报消息。
// 如果没有发送服务器证书,因为使用了匿名密码,SSL_VERIFY_PEER 将被忽略。
_glootls::SSL_CTX_set_verify(ssl_ctx,
SSL_VERIFY_PEER |
SSL_VERIFY_FAIL_IF_NO_PEER_CERT |
SSL_VERIFY_CLIENT_ONCE,
nullptr);
return ssl_ctx;
}

正经tcp

同样需要创建通信对,缓冲区,以及处理通信等。

1
2
3
4
5
6
7
8
9
10
11
12
std::unique_ptr<transport::Pair>& Context::createPair(int rank) {
pairs_[rank] = std::unique_ptr<transport::Pair>(
new tcp::Pair(this, device_.get(), rank, getTimeout()));
return pairs_[rank];
}

std::unique_ptr<transport::UnboundBuffer> Context::createUnboundBuffer(
void* ptr,
size_t size) {
auto buf = new tcp::UnboundBuffer(shared_from_this(), ptr, size);
return std::unique_ptr<transport::UnboundBuffer>(buf);
}

以下是处理通信的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
void Context::recvFromAny(
UnboundBuffer* buf,
uint64_t slot,
size_t offset,
size_t nbytes,
std::vector<int> srcRanks) {
for (;;) {
// Find rank of pair we can attempt a recv from
auto rank = recvFromAnyFindRank(buf, slot, offset, nbytes, srcRanks);
if (rank == -1) {
return;
}
// Try recv from returned rank
auto ptr = pairs_[rank].get();
GLOO_ENFORCE(ptr != nullptr);
auto pair = dynamic_cast<Pair*>(ptr);
GLOO_ENFORCE(pair != nullptr);
if (pair->tryRecv(buf, slot, offset, nbytes)) {
return;
}
}
}

int Context::recvFromAnyFindRank(
UnboundBuffer* buf,
uint64_t slot,
size_t offset,
size_t nbytes,
const std::vector<int>& srcRanks) {
std::unique_lock<std::mutex> lock(mutex_);

// See if there is a remote pending send that can fulfill this recv.
auto it = findPendingOperations(slot);
if (it != pendingOperations_.end()) {
auto& pendingOperation = *it;

// Out of all remote pending sends, find the first one
// that exists in the set of eligible ranks.
for (const auto rank : pendingOperation.getSendList()) {
for (const auto srcRank : srcRanks) {
if (rank == srcRank) {
// 我们找到了一个可以满足这个recv的等级。
// 此函数的调用者将尝试进行recv,如果该远程挂起发送操作仍然存在,它将删除它。
//
return rank;
}
}
}
}

// No candidates; register buffer for recv
pendingRecv_[slot].emplace_back(
buf->getWeakNonOwningPtr(),
offset,
nbytes,
std::unordered_set<int>(srcRanks.begin(), srcRanks.end()));
return -1;
}

// Allowed to be called only by ContextMutator::findRecvFromAny,
// where the context lock is already held.
bool Context::findRecvFromAny(
uint64_t slot,
int rank,
WeakNonOwningPtr<UnboundBuffer>* buf,
size_t* offset,
size_t* nbytes) {
// See if there is a pending recv for this slot.
auto pit = pendingRecv_.find(slot);
if (pit != pendingRecv_.end()) {
auto& recvs = pit->second;

// Iterate over available buffers to find a match.
for (auto rit = recvs.begin(); rit != recvs.end(); rit++) {
const auto& ranks = std::get<3>(*rit);

// 如果此对等点的rank在此插槽的可接受rank集中,我们可以继续并将缓冲区返回到 recv 中。
if (ranks.count(rank) > 0) {
// Capture values to return.
*buf = std::get<0>(*rit);
*offset = std::get<1>(*rit);
*nbytes = std::get<2>(*rit);
// Cleanup.
recvs.erase(rit);
if (recvs.empty()) {
pendingRecv_.erase(pit);
}
return true;
}
}
}

return false;
}

loop

其中包含了epoll的使用方法,单独拿出来

创建一个epoll

1
2
3
4
5
Loop::Loop() {
fd_ = epoll_create(1);
GLOO_ENFORCE_NE(fd_, -1, "epoll_create: ", strerror(errno));
loop_.reset(new std::thread(&Loop::run, this));
}

epoll_ctl,用于操作epoll函数所生成的实例。

1
2
#include <sys / epoll.h>
int epoll_ctl(int epfd,int op,int fd,struct epoll_event * event);

该系统调用对文件描述符epfd引用的epoll实例执行控制操作。它要求操作op对目标文件描述符fd执行。op参数的有效值为:

  • EPOLL_CTL_ADD:在文件描述符epfd所引用的epoll实例上注册目标文件描述符fd,并将事件事件与内部文件链接到fd。
  • EPOLL_CTL_MOD:更改与目标文件描述符fd相关联的事件事件。
  • EPOLL_CTL_DEL:从epfd引用的epoll实例中删除(注销)目标文件描述符fd。该事件将被忽略,并且可以为NULL。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void Loop::registerDescriptor(int fd, int events, Handler* h) {
struct epoll_event ev;
ev.events = events;
ev.data.ptr = h;

auto rv = epoll_ctl(fd_, EPOLL_CTL_ADD, fd, &ev);
if (rv == -1 && errno == EEXIST) {
rv = epoll_ctl(fd_, EPOLL_CTL_MOD, fd, &ev);
}
GLOO_ENFORCE_NE(rv, -1, "epoll_ctl: ", strerror(errno));
}

void Loop::unregisterDescriptor(int fd, Handler* h) {
auto rv = epoll_ctl(fd_, EPOLL_CTL_DEL, fd, nullptr);
GLOO_ENFORCE_NE(rv, -1, "epoll_ctl: ", strerror(errno));

// Wait for loop to tick before returning, to make sure the handler
// for this fd is not called once this function returns.
if (std::this_thread::get_id() != loop_->get_id()) {
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock);
TSAN_ANNOTATE_HAPPENS_AFTER(h);
}
}

等待某个epoll完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void Loop::run() {
std::array<struct epoll_event, capacity_> events;
int nfds;

while (!done_) {
// Wakeup everyone waiting for a loop tick to finish.
cv_.notify_all();

// Wait for something to happen
nfds = epoll_wait(fd_, events.data(), events.size(), 10);
if (nfds == 0) {
continue;
}
if (nfds == -1 && errno == EINTR) {
continue;
}

GLOO_ENFORCE_NE(nfds, -1);

for (int i = 0; i < nfds; i++) {
Handler* h = reinterpret_cast<Handler*>(events[i].data.ptr);
h->handleEvents(events[i].events);
TSAN_ANNOTATE_HAPPENS_BEFORE(h);
}
}
}

mpi

mpi相关的通信操作,MPIScope应该是MPI的上下文什么的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::shared_ptr<MPIScope> getMPIScope() {
static std::once_flag once;

// Use weak pointer so that the initializer is destructed when the
// last context referring to it is destructed, not when statics
// are destructed on program termination.
static std::weak_ptr<MPIScope> wptr;
std::shared_ptr<MPIScope> sptr;

// Create MPIScope only once
std::call_once(once, [&]() {
sptr = std::make_shared<MPIScope>();
wptr = sptr;
});

// Create shared_ptr<MPIScope> from weak_ptr
sptr = wptr.lock();
GLOO_ENFORCE(sptr, "Cannot create MPI context after MPI_Finalize()");
return sptr;
}

返回MPI上下文(通信域)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
std::shared_ptr<Context> Context::createManaged() {
auto mpiScope = getMPIScope();
auto context = std::make_shared<Context>(MPI_COMM_WORLD);
context->mpiScope_ = std::move(mpiScope);
return context;
}

Context::Context(const MPI_Comm& comm)
: ::gloo::Context(MPICommRank(comm), MPICommSize(comm)) {
auto error = MPI_Comm_dup(comm, &comm_);
GLOO_ENFORCE(error == MPI_SUCCESS, "MPI_Comm_dup: ", error);
}

Context::~Context() {
MPI_Comm_free(&comm_);
}

为本进程和其他进程创建通信对pair和缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
void Context::connectFullMesh(std::shared_ptr<transport::Device>& dev) {
std::vector<std::vector<char>> addresses(size);
unsigned long maxLength = 0;
int rv;

// Create pair to connect to every other node in the collective
auto transportContext = dev->createContext(rank, size);
transportContext->setTimeout(getTimeout());
for (int i = 0; i < size; i++) {
if (i == rank) {
continue;
}

auto& pair = transportContext->createPair(i);

// Store address for pair for this rank
auto address = pair->address().bytes();
maxLength = std::max(maxLength, address.size());
addresses[i] = std::move(address);
}

// Agree on maximum length so we can prepare buffers
rv = MPI_Allreduce(
MPI_IN_PLACE, &maxLength, 1, MPI_UNSIGNED_LONG, MPI_MAX, comm_);
if (rv != MPI_SUCCESS) {
GLOO_THROW_IO_EXCEPTION("MPI_Allreduce: ", rv);
}

// Prepare input and output
std::vector<char> in(size * maxLength);
std::vector<char> out(size * size * maxLength);
for (int i = 0; i < size; i++) {
if (i == rank) {
continue;
}

auto& address = addresses[i];
memcpy(in.data() + (i * maxLength), address.data(), address.size());
}

// Allgather to collect all addresses of all pairs
rv = MPI_Allgather(
in.data(), in.size(), MPI_BYTE, out.data(), in.size(), MPI_BYTE, comm_);
if (rv != MPI_SUCCESS) {
GLOO_THROW_IO_EXCEPTION("MPI_Allgather: ", rv);
}

// Connect every pair
for (int i = 0; i < size; i++) {
if (i == rank) {
continue;
}

auto offset = (rank + i * size) * maxLength;
std::vector<char> address(maxLength);
memcpy(address.data(), out.data() + offset, maxLength);
transportContext->getPair(i)->connect(address);
}

device_ = dev;
transportContext_ = std::move(transportContext);
}

通信

首先得获取本进程的左方和右方

1
2
3
4
5
6
7
8
9
10
11
12
13
// Helper for ring algorithms
std::unique_ptr<transport::Pair>& Algorithm::getLeftPair() {
auto rank = (context_->size + context_->rank - 1) % context_->size;
GLOO_ENFORCE(context_->getPair(rank), "pair missing (index ", rank, ")");
return context_->getPair(rank);
}

// Helper for ring algorithms
std::unique_ptr<transport::Pair>& Algorithm::getRightPair() {
auto rank = (context_->rank + 1) % context_->size;
GLOO_ENFORCE(context_->getPair(rank), "pair missing (index ", rank, ")");
return context_->getPair(rank);
}

gloo支持reduce的一些操作,对于一些reduce时自定义的方法,gloo也做了兼容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// Type of reduction function.
// 如果reduce类型是内置类型之一,则算法实现可以使用加速版本(如果可用)。
// 例如,如果将 ReductionType 等于 SUM 的 ReductionFunction 传递给 CUDA 感知的 Allreduce,它知道它可以使用 NCCL 实现而不是指定的函数。
//
enum ReductionType {
SUM = 1,
PRODUCT = 2,
MAX = 3,
MIN = 4,

// Use larger number so we have plenty of room to add built-ins
CUSTOM = 1000,
};

template <typename T>
class ReductionFunction {
public:
using Function = void(T*, const T*, size_t n);

static const ReductionFunction<T>* sum;
static const ReductionFunction<T>* product;
static const ReductionFunction<T>* min;
static const ReductionFunction<T>* max;

ReductionFunction(ReductionType type, Function* fn)
: type_(type), fn_(fn) {}

ReductionType type() const {
return type_;
}

void call(T* x, const T* y, size_t n) const {
fn_(x, y, n);
}

protected:
ReductionType type_;
Function* fn_;
};

template <typename T>
const ReductionFunction<T>* ReductionFunction<T>::sum =
new ReductionFunction<T>(SUM, &::gloo::sum<T>);
template <typename T>
const ReductionFunction<T>* ReductionFunction<T>::product =
new ReductionFunction<T>(PRODUCT, &::gloo::product<T>);
template <typename T>
const ReductionFunction<T>* ReductionFunction<T>::min =
new ReductionFunction<T>(MIN, &::gloo::min<T>);
template <typename T>
const ReductionFunction<T>* ReductionFunction<T>::max =
new ReductionFunction<T>(MAX, &::gloo::max<T>);

// Local operation.
// If an algorithm uses multiple local pointers, local operations
// can be used for local reduction, broadcast, gathering, etc.
template <typename T>
class LocalOp {
public:
virtual ~LocalOp() noexcept(false) {}
virtual void runAsync() = 0;
virtual void wait() = 0;

// Synchronous run is equal to asynchronous run and wait.
inline void run() {
runAsync();
wait();
}
};

allgather

AllgatherRing 类似于 MPI_Allgather,所有进程都从所有其他进程接收缓冲区(inPtrs)。 调用者需要传递一个预先分配的接收缓冲区 (outPtr),其大小等于[ 上下文大小 x 发送缓冲区的总大小] (inPtrs),其中 rank = k 的进程的发送缓冲区将被写入 outPtr[k 输入缓冲区数量 count] 连续。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
template <typename T>
class AllgatherRing : public Algorithm {
public:
AllgatherRing(
const std::shared_ptr<Context>& context,
const std::vector<const T*>& inPtrs,
T* outPtr,
int count)
: Algorithm(context),
inPtrs_(inPtrs),
outPtr_(outPtr),
count_(count),
bytes_(count * sizeof(T)),
inputStride_(count_ * inPtrs_.size()),
leftPair_(this->getLeftPair()),
rightPair_(this->getRightPair()) {
auto slot = this->context_->nextSlot();

// std::unique_ptr<transport::Buffer>
sendDataBuf_ = rightPair_->createSendBuffer(
slot, outPtr_, inPtrs_.size() * context_->size * bytes_);
recvDataBuf_ = leftPair_->createRecvBuffer(
slot, outPtr_, inPtrs_.size() * context_->size * bytes_);

auto notificationSlot = this->context_->nextSlot();

// std::unique_ptr<transport::Buffer>
sendNotificationBuf_ =
leftPair_->createSendBuffer(notificationSlot, &dummy_, sizeof(dummy_));
recvNotificationBuf_ =
rightPair_->createRecvBuffer(notificationSlot, &dummy_, sizeof(dummy_));
}

真正运行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
  void run() {
const int rank = this->contextRank_;
const int numRounds = this->contextSize_ - 1;

// Copy local buffers.
for (int i = 0; i < inPtrs_.size(); i++) {
memcpy(outPtr_ + rank * inputStride_ + i * count_, inPtrs_[i], bytes_);
}

// We send input buffers in order.
for (int i = 0; i < inPtrs_.size(); i++) {
// We start every iteration by sending local buffer.
int inRank = rank;

// 10个进程,就是9个round。1号进程,第一个round给0,第二个round给9,第三个round给8...
for (int round = 0; round < numRounds; round++) {
const int sendOffset = inRank * inputStride_ + i * count_;
sendDataBuf_->send(
sendOffset * sizeof(T), bytes_, sendOffset * sizeof(T));
recvDataBuf_->waitRecv();

// Nodes receive data from the left node in every round and forward it
// to the right node.
inRank = (numRounds - round + rank) % this->contextSize_;

// Send notification to node on the left that this node is ready for an
// inbox write.
sendNotificationBuf_->send();

// Wait for notification from node on the right.
recvNotificationBuf_->waitRecv();
}
}
}

private:
const std::vector<const T*> inPtrs_;
T* outPtr_;
const int count_;
const int bytes_;
const int inputStride_;

std::unique_ptr<transport::Pair>& leftPair_;
std::unique_ptr<transport::Pair>& rightPair_;

std::unique_ptr<transport::Buffer> sendDataBuf_;
std::unique_ptr<transport::Buffer> recvDataBuf_;

int dummy_;

std::unique_ptr<transport::Buffer> sendNotificationBuf_;
std::unique_ptr<transport::Buffer> recvNotificationBuf_;
};

一般的allgather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
void allgather(AllgatherOptions& opts) {
const auto& context = opts.context;
transport::UnboundBuffer* in = opts.in.get();
transport::UnboundBuffer* out = opts.out.get();
const auto slot = Slot::build(kAllgatherSlotPrefix, opts.tag);

const auto recvRank = (context->size + context->rank - 1) % context->size;
const auto sendRank = (context->size + context->rank + 1) % context->size;

const size_t inBytes = out->size / context->size;
const size_t outBytes = out->size;

// If the input buffer is specified, this is NOT an in place operation,
// and the output buffer needs to be primed with the input.
if (in != nullptr) {
memcpy(
static_cast<uint8_t*>(out->ptr) + context->rank * in->size,
static_cast<uint8_t*>(in->ptr),
in->size);
}

// Short circuit if there is only a single process.
if (context->size == 1) {
return;
}

// The chunk size may not be divisible by 2; use dynamic lookup.
std::array<size_t, 2> chunkSize;
chunkSize[0] = inBytes / 2;
chunkSize[1] = inBytes - chunkSize[0];
std::array<size_t, 2> chunkOffset;
chunkOffset[0] = 0;
chunkOffset[1] = chunkSize[0];

// 10个进程,1号进程
// send to 2,recv from 0
// send seg = 11 11 10 10 9 9 8 8 ...
// recv seg = 10 10 9 9 8 8 7 7 ...
// i & 0x1 = 0 1 0 1 0 1 0 1 ...

for (auto i = 0; i < (context->size - 1) * 2; i++) {
const size_t sendSegment = context->size + context->rank - (i / 2);
const size_t recvSegment = sendSegment - 1;
size_t sendOffset =
((sendSegment * inBytes) + chunkOffset[i & 0x1]) % outBytes;
size_t recvOffset =
((recvSegment * inBytes) + chunkOffset[i & 0x1]) % outBytes;
size_t size = chunkSize[i & 0x1];
if (i < 2) {
out->send(sendRank, slot, sendOffset, size);
out->recv(recvRank, slot, recvOffset, size);
continue;
}

// Wait for pending operations to complete to synchronize with the
// previous iteration. Because we kick off two operations before
// getting here we always wait for the next-to-last operation.
out->waitSend(opts.timeout);
out->waitRecv(opts.timeout);
out->send(sendRank, slot, sendOffset, size);
out->recv(recvRank, slot, recvOffset, size);
}

// Wait for completes
for (auto i = 0; i < 2; i++) {
out->waitSend(opts.timeout);
out->waitRecv(opts.timeout);
}
}

allgatherv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void allgatherv(AllgathervOptions& opts) {
const auto& context = opts.context;
transport::UnboundBuffer* in = opts.in.get();
transport::UnboundBuffer* out = opts.out.get();
const auto slot = Slot::build(kAllgatherSlotPrefix, opts.tag);

const auto recvRank = (context->size + context->rank - 1) % context->size;
const auto sendRank = (context->size + context->rank + 1) % context->size;

// 计算每个进程对应的长度和偏移
std::vector<size_t> byteCounts;
std::vector<size_t> byteOffsets;
byteCounts.reserve(context->size);
byteOffsets.reserve(context->size);
size_t offset = 0;
for (const auto& elements : opts.elements) {
const auto bytes = elements * opts.elementSize;
byteCounts.push_back(bytes);
byteOffsets.push_back(offset);
offset += bytes;
}

// 如果指定了输入缓冲区,则需要准备输出缓冲区。
if (in != nullptr) {
GLOO_ENFORCE_EQ(byteCounts[context->rank], in->size);
if (byteCounts[context->rank] > 0) {
memcpy(
static_cast<uint8_t*>(out->ptr) + byteOffsets[context->rank],
static_cast<uint8_t*>(in->ptr),
in->size);
}
}

// Short circuit if there is only a single process.
if (context->size == 1) {
return;
}

const auto baseIndex = context->size + context->rank;
for (auto i = 0; i < context->size - 1; i++) {
const size_t sendIndex = (baseIndex - i) % context->size;
const size_t recvIndex = (baseIndex - i - 1) % context->size;

if (i == 0) {
out->send(sendRank, slot, byteOffsets[sendIndex], byteCounts[sendIndex]);
out->recv(recvRank, slot, byteOffsets[recvIndex], byteCounts[recvIndex]);
continue;
}

// Wait for previous operations to complete before kicking off new ones.
out->waitSend(opts.timeout);
out->waitRecv(opts.timeout);
out->send(sendRank, slot, byteOffsets[sendIndex], byteCounts[sendIndex]);
out->recv(recvRank, slot, byteOffsets[recvIndex], byteCounts[recvIndex]);
}

// Wait for final operations to complete.
out->waitSend(opts.timeout);
out->waitRecv(opts.timeout);
}

allreduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
using BufferVector = std::vector<std::unique_ptr<transport::UnboundBuffer>>;
using ReductionFunction = AllreduceOptions::Func;
using ReduceRangeFunction = std::function<void(size_t, size_t)>;
using BroadcastRangeFunction = std::function<void(size_t, size_t)>;

// Forward declaration of ring algorithm implementation.
void ring(
const detail::AllreduceOptionsImpl& opts,
ReduceRangeFunction reduceInputs,
BroadcastRangeFunction broadcastOutputs);

// Forward declaration of bcube algorithm implementation.
void bcube(
const detail::AllreduceOptionsImpl& opts,
ReduceRangeFunction reduceInputs,
BroadcastRangeFunction broadcastOutputs);

// ReductionFunction type describes the function to use for element wise reduction.
//
// Its arguments are:
// 1. non-const output pointer
// 2. const input pointer 1 (may be equal to 1)
// 3. const input pointer 2 (may be equal to 1)
// 4. number of elements to reduce.
//
// 请注意,此函数不是严格类型的,并且采用 void 指针。
// 这样做是为了避免需要模板化选项类和模板化算法实现。
// 我们发现这对编译时间和代码大小的增加几乎没有任何价值。s

// 返回计算输入的局部reduce并将其存储在这些缓冲区中给定范围的输出中的函数。
// 这是在向邻居发送区域或reduce从邻居接收的区域之前完成的。
ReduceRangeFunction genLocalReduceFunction(
const BufferVector& in, // UnboundBuffer的unique_ptr的vector
const BufferVector& out,
size_t elementSize,
ReductionFunction fn) {
// 根据传进来的buffer长度,执行reduce函数
if (in.size() > 0) {
if (in.size() == 1) {
return [&in, &out](size_t offset, size_t length) {
memcpy(
static_cast<uint8_t*>(out[0]->ptr) + offset,
static_cast<const uint8_t*>(in[0]->ptr) + offset,
length);
};
} else {
return [&in, &out, elementSize, fn](size_t offset, size_t length) {
fn(static_cast<uint8_t*>(out[0]->ptr) + offset,
static_cast<const uint8_t*>(in[0]->ptr) + offset,
static_cast<const uint8_t*>(in[1]->ptr) + offset,
length / elementSize);
for (size_t i = 2; i < in.size(); i++) {
fn(static_cast<uint8_t*>(out[0]->ptr) + offset,
static_cast<const uint8_t*>(out[0]->ptr) + offset,
static_cast<const uint8_t*>(in[i]->ptr) + offset,
length / elementSize);
}
};
}
} else {
return [&out, elementSize, fn](size_t offset, size_t length) {
for (size_t i = 1; i < out.size(); i++) {
fn(static_cast<uint8_t*>(out[0]->ptr) + offset,
static_cast<const uint8_t*>(out[0]->ptr) + offset,
static_cast<const uint8_t*>(out[i]->ptr) + offset,
length / elementSize);
}
};
}
}

// 返回对缓冲区中给定范围的输出执行本地广播的函数。 这是在接收到每个全局reduce的块之后执行的。
BroadcastRangeFunction genLocalBroadcastFunction(const BufferVector& out) {
return [&out](size_t offset, size_t length) {
for (size_t i = 1; i < out.size(); i++) {
memcpy(
static_cast<uint8_t*>(out[i]->ptr) + offset,
static_cast<const uint8_t*>(out[0]->ptr) + offset,
length);
}
};
}

void allreduce(const detail::AllreduceOptionsImpl& opts) {
if (opts.elements == 0) {
return;
}

const auto& context = opts.context;
const std::vector<std::unique_ptr<transport::UnboundBuffer>>& in = opts.in;
const std::vector<std::unique_ptr<transport::UnboundBuffer>>& out = opts.out;
const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag);

// 初始化本地归约和广播功能。
// 请注意,如果仅指定单个输出并将其用作输入和输出,则这些是无操作的。
const auto reduceInputs =
genLocalReduceFunction(in, out, opts.elementSize, opts.reduce);
const auto broadcastOutputs = genLocalBroadcastFunction(out);

// Simple circuit if there is only a single process.
if (context->size == 1) {
reduceInputs(0, totalBytes);
broadcastOutputs(0, totalBytes);
return;
}

switch (opts.algorithm) {
case detail::AllreduceOptionsImpl::UNSPECIFIED:
case detail::AllreduceOptionsImpl::RING:
ring(opts, reduceInputs, broadcastOutputs);
break;
case detail::AllreduceOptionsImpl::BCUBE:
bcube(opts, reduceInputs, broadcastOutputs);
break;
default:
GLOO_ENFORCE(false, "Algorithm not handled.");
}
}

allreduce的ring方法

给定的输入被分成与进程数相等的块数。 算法完成后,每个进程按顺序托管一个reduction输出块(rank 0 具有块 0,rank 1 具有块 1,等等)。由于输入可能不能被进程数整除,因此最终的块有部分输出或可能为空。

当一个块沿着环传递并且包含连续更多rank的reduction时,我们必须在为该块执行 I/O 和计算接收到的块和本地块之间的reduction之间交替。为了避免这种交替模式,我们将一个块分成多个段(> = 2),并确保我们有一个段在运行,同时计算另一个段的reduction。

段大小有一个上限,以最大限度地减少内存使用并避免不良的缓存行为。 这意味着在处理非常大的输入时,每个块可能有很多段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
void ring(
const detail::AllreduceOptionsImpl& opts,
ReduceRangeFunction reduceInputs,
BroadcastRangeFunction broadcastOutputs) {
const auto& context = opts.context;
const std::vector<std::unique_ptr<transport::UnboundBuffer>>& out = opts.out;
const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag);
const size_t totalBytes = opts.elements * opts.elementSize;

// Note: context->size > 1
const auto recvRank = (context->size + context->rank + 1) % context->size;
const auto sendRank = (context->size + context->rank - 1) % context->size;

// 确保最大段大小是元素大小的倍数。
// 否则,在向上舍入到元素大小的最接近倍数后,段大小可能会超过最大段大小。
// 例如,如果maxSegmentSize = 10,而elementSize = 4,则向上取整后:segmentSize = 12;
const size_t maxSegmentBytes = opts.elementSize *
std::max((size_t)1, opts.maxSegmentSize / opts.elementSize);

// Compute how many segments make up the input buffer.
//
// 向上舍入到上下文大小的最接近的倍数,以便每个进程有相同数量的段,并且跨进程的执行是对称的。
// 最小值是上下文大小的两倍,因为下面的算法将发送/接收一个段与计算另一个段的reduction。
const size_t numSegments = roundUp(
std::max(
(totalBytes + (maxSegmentBytes - 1)) / maxSegmentBytes,
(size_t)context->size * 2),
(size_t)context->size);

const size_t numSegmentsPerRank = numSegments / context->size;
const size_t segmentBytes =
roundUp((totalBytes + numSegments - 1) / numSegments, opts.elementSize);

// Allocate scratch space to hold two chunks
std::unique_ptr<uint8_t[]> tmpAllocation(new uint8_t[segmentBytes * 2]);
std::unique_ptr<transport::UnboundBuffer> tmpBuffer =
context->createUnboundBuffer(tmpAllocation.get(), segmentBytes * 2);
transport::UnboundBuffer* tmp = tmpBuffer.get();

// 使用动态查找临时缓冲区中的块偏移量。
// 在进行两个操作时,我们需要两个偏移量。 可以使用循环计数器对它们进行索引。
std::array<size_t, 2> segmentOffset;
segmentOffset[0] = 0;
segmentOffset[1] = segmentBytes;

// 计算在reduce/scatter为给定迭代发送和接收的段的偏移量和长度。
auto computeReduceScatterOffsets = [&](size_t i) {
struct {
size_t sendOffset;
size_t recvOffset;
ssize_t sendLength;
ssize_t recvLength;
} result;

// 计算要发送的段索引(到 rank-1)和要接收的段索引(从 rank+1)。
// 乘以块中的字节数以获得偏移量。
// 允许偏移量超出范围(>=totalBytes),计算相关长度时会考虑到这一点。
result.sendOffset =
((((context->rank + 1) * numSegmentsPerRank) + i) * segmentBytes) %
(numSegments * segmentBytes);
result.recvOffset =
((((context->rank + 2) * numSegmentsPerRank) + i) * segmentBytes) %
(numSegments * segmentBytes);

// If the segment is entirely in range, the following statement is
// equal to segmentBytes. If it isn't, it will be less, or even
// negative. This is why the ssize_t typecasts are needed.
result.sendLength = std::min(
(ssize_t)segmentBytes,
(ssize_t)totalBytes - (ssize_t)result.sendOffset);
result.recvLength = std::min(
(ssize_t)segmentBytes,
(ssize_t)totalBytes - (ssize_t)result.recvOffset);

return result;
};

// Ring reduce/scatter.
//
// 迭代次数计算如下:
// - 使用 `numSegments` 作为段的总数,
// - 减去 `numSegmentsPerRank`,因为最终段包含部分结果,在此阶段不得转发。
// - 添加 2,因为我们通过管道发送和接收操作(我们在迭代 0 和 1 上发出发送/接收操作并等待它们在迭代 2 和 3 上完成)。
//
for (auto i = 0; i < (numSegments - numSegmentsPerRank + 2); i++) {
if (i >= 2) {
// 计算两次迭代前的发送和接收偏移量和长度。
// 需要这样我们知道何时等待操作以及何时忽略(当偏移量超出范围时),并知道在哪里减少临时缓冲区的内容。
auto prev = computeReduceScatterOffsets(i - 2);
if (prev.recvLength > 0) {
// Prepare out[0]->ptr to hold the local reduction
reduceInputs(prev.recvOffset, prev.recvLength);
// Wait for segment from neighbor.
tmp->waitRecv(opts.timeout);
// 对收到的段进行reduce
opts.reduce(
static_cast<uint8_t*>(out[0]->ptr) + prev.recvOffset,
static_cast<const uint8_t*>(out[0]->ptr) + prev.recvOffset,
static_cast<const uint8_t*>(tmp->ptr) + segmentOffset[i & 0x1],
prev.recvLength / opts.elementSize);
}
if (prev.sendLength > 0) {
out[0]->waitSend(opts.timeout);
}
}

// 在最后两次迭代之外的所有迭代中发出新的发送和接收操作。
// 那时我们已经发送了我们需要的所有数据,只需要等待最终的段被reduce到输出中。
if (i < (numSegments - numSegmentsPerRank)) {
// Compute send and receive offsets and lengths for this iteration.
auto cur = computeReduceScatterOffsets(i);
if (cur.recvLength > 0) {
tmp->recv(recvRank, slot, segmentOffset[i & 0x1], cur.recvLength);
}
if (cur.sendLength > 0) {
// Prepare out[0]->ptr to hold the local reduction for this segment
if (i < numSegmentsPerRank) {
reduceInputs(cur.sendOffset, cur.sendLength);
}
out[0]->send(sendRank, slot, cur.sendOffset, cur.sendLength);
}
}
}

// Function computes the offsets and lengths of the segments to be
// sent and received for a given iteration during allgather.
auto computeAllgatherOffsets = [&](size_t i) {
struct {
size_t sendOffset;
size_t recvOffset;
ssize_t sendLength;
ssize_t recvLength;
} result;

result.sendOffset =
((((context->rank) * numSegmentsPerRank) + i) * segmentBytes) %
(numSegments * segmentBytes);
result.recvOffset =
((((context->rank + 1) * numSegmentsPerRank) + i) * segmentBytes) %
(numSegments * segmentBytes);

// If the segment is entirely in range, the following statement is
// equal to segmentBytes. If it isn't, it will be less, or even
// negative. This is why the ssize_t typecasts are needed.
result.sendLength = std::min(
(ssize_t)segmentBytes,
(ssize_t)totalBytes - (ssize_t)result.sendOffset);
result.recvLength = std::min(
(ssize_t)segmentBytes,
(ssize_t)totalBytes - (ssize_t)result.recvOffset);

return result;
};

// Ring allgather.
//
// 注意:totalBytes <= (numSegments * segmentBytes),
// 这与在进程间贡献相同的通用 allgather 算法不兼容。
//
for (auto i = 0; i < (numSegments - numSegmentsPerRank + 2); i++) {
if (i >= 2) {
auto prev = computeAllgatherOffsets(i - 2);
if (prev.recvLength > 0) {
out[0]->waitRecv(opts.timeout);
// Broadcast received segments to output buffers.
broadcastOutputs(prev.recvOffset, prev.recvLength);
}
if (prev.sendLength > 0) {
out[0]->waitSend(opts.timeout);
}
}

// 在最后两次迭代之外的所有迭代中发出新的发送和接收操作。
// 那时我们已经发送了我们需要的所有数据,只需要等待最终的段被发送到输出。
if (i < (numSegments - numSegmentsPerRank)) {
auto cur = computeAllgatherOffsets(i);
if (cur.recvLength > 0) {
out[0]->recv(recvRank, slot, cur.recvOffset, cur.recvLength);
}
if (cur.sendLength > 0) {
out[0]->send(sendRank, slot, cur.sendOffset, cur.sendLength);
// Broadcast first segments to outputs buffers.
if (i < numSegmentsPerRank) {
broadcastOutputs(cur.sendOffset, cur.sendLength);
}
}
}
}
}

// 对于给定的上下文大小和所需的组大小,计算每步的实际组大小。
// 请注意,对于所有步骤,每一步的组大小为 n,仅当 n^(#steps) == 大小时。
// 否则,最终组大小为 != n。
std::vector<size_t> computeGroupSizePerStep(size_t size, const size_t n) {
std::vector<size_t> result;
GLOO_ENFORCE_GT(n, 1);
while (size % n == 0) {
result.push_back(n);
size /= n;
}
if (size > 1) {
result.push_back(size);
}
return result;
}

bcube 算法

bcube 算法实现了一种类似超立方体的reduce策略。约束是进程的数量可以分解。如果分解中的最小分量为 2,并且进程数等于 2 的幂,则该算法与递归减半/加倍相同。

分解中的元素数量决定了算法的步数。分解的每个元素决定了每个进程在算法的特定步骤中与之通信的进程数。如果进程数不可分解,则该算法与直接reduce-scatter 后allgather 相同。

例如,如果#processes == 8,并且我们将其分解为 4 * 2,则算法分 2 步运行。在第一步中,2 组 4 个进程之间交换数据,以使所有进程具有部分结果的 1/4。第二步,4组2个进程交换它们的部分结果,使得所有进程都有1/8的结果。然后,反向执行相同的分解以执行 allgather。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
void bcube(
const detail::AllreduceOptionsImpl& opts,
ReduceRangeFunction reduceInputs,
BroadcastRangeFunction broadcastOutputs) {
const auto& context = opts.context;
const auto slot = Slot::build(kAllreduceSlotPrefix, opts.tag);
const auto elementSize = opts.elementSize;
auto& out = opts.out[0];

constexpr auto n = 2;

// 算出这个算法的步数。
const auto groupSizePerStep = computeGroupSizePerStep(context->size, n);

struct group {
// Distance between peers in this group.
size_t peerDistance;

// Segment that this group is responsible for reducing.
size_t bufferOffset;
size_t bufferLength;

// The process ranks that are a member of this group.
std::vector<size_t> ranks;

// Upper bound of the length of the chunk that each process has the
// reduced values for by the end of the reduction for this group.
size_t chunkLength;

// Chunk within the segment that this process is responsible for reducing.
size_t myChunkOffset;
size_t myChunkLength;
};

// 在每个算法步骤计算组的详细信息。
// 我们将它保存在一个向量中,因为我们在reduce/scatter阶段以正序迭代它,在全聚集阶段以反向顺序迭代它。
std::vector<struct group> groups;
{
struct group group;
group.peerDistance = 1;
group.bufferOffset = 0;
group.bufferLength = opts.elements;
for (const size_t groupSize : groupSizePerStep) {
const size_t groupRank = (context->rank / group.peerDistance) % groupSize;
const size_t baseRank = context->rank - (groupRank * group.peerDistance);
group.ranks.reserve(groupSize);
for (size_t i = 0; i < groupSize; i++) {
group.ranks.push_back(baseRank + i * group.peerDistance);
}
// 每隔groupSize个进程是一组,也就是说一个组内的rank都隔着groupSize

// Compute the length of the chunk we're exchanging at this step.
group.chunkLength = ((group.bufferLength + (groupSize - 1)) / groupSize);

// 此过程正在计算当前段中位于 <rank>/<size> 的块的减少量。
//
group.myChunkOffset =
group.bufferOffset + (groupRank * group.chunkLength);
group.myChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) -
int64_t(groupRank * group.chunkLength))));

// Store a const copy of this group in the vector.
groups.push_back(group);

// 使用更新的对等距离和段偏移和长度进行初始化。
struct group nextGroup;
nextGroup.peerDistance = group.peerDistance * groupSize;
nextGroup.bufferOffset = group.myChunkOffset;
nextGroup.bufferLength = group.myChunkLength;
std::swap(group, nextGroup);
}
}

// 块长度向上取整,因此我们需要的最大暂存空间可能大于输出缓冲区的大小。 计算最大值
size_t bufferLength = opts.elements;
for (const auto& group : groups) {
bufferLength =
std::max(bufferLength, group.ranks.size() * group.chunkLength);
}

// 分配暂存空间以从对等方接收数据。
const size_t bufferSize = bufferLength * elementSize;
std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);
std::unique_ptr<transport::UnboundBuffer> tmp =
context->createUnboundBuffer(buffer.get(), bufferSize);

// Reduce/scatter.
for (size_t step = 0; step < groups.size(); step++) {
const auto& group = groups[step];

// 从对等点发出块的接收操作。
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
tmp->recv(
src,
slot,
i * group.chunkLength * elementSize,
group.myChunkLength * elementSize);
}

// 向对等方发出本地块的发送操作。
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto dst = group.ranks[i];
if (dst == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
// 仅在算法的第一步中计算局部reduce。
// 在随后的步骤中,我们已经得到了部分reduce的结果。
if (step == 0) {
reduceInputs(
currentChunkOffset * elementSize, currentChunkLength * elementSize);
}
out->send(
dst,
slot,
currentChunkOffset * elementSize,
currentChunkLength * elementSize);
}

// Wait for send and receive operations to complete.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
tmp->waitRecv();
out->waitSend();
}

// 第一步,准备这个进程负责的chunk
// 使用其输入的简化版本(如果指定了多个)。
if (step == 0) {
reduceInputs(
group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
}

// Reduce chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
opts.reduce(
static_cast<uint8_t*>(out->ptr) + (group.myChunkOffset * elementSize),
static_cast<const uint8_t*>(out->ptr) +
(group.myChunkOffset * elementSize),
static_cast<const uint8_t*>(tmp->ptr) +
(i * group.chunkLength * elementSize),
group.myChunkLength);
}
}

// 有一个块包含最终结果,并且该块已经可以在本地广播到 out[1..N](如果适用)。
// 这样做意味着我们只需要在本地广播到 out[1..N] 所有块,因为我们在 allgather 阶段从对等方接收到它们。
{
const auto& group = groups.back();
broadcastOutputs(
group.myChunkOffset * elementSize, group.myChunkLength * elementSize);
}

// Allgather.
for (auto it = groups.rbegin(); it != groups.rend(); it++) {
const auto& group = *it;

// Issue receive operations for reduced chunks from peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto src = group.ranks[i];
if (src == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
out->recv(
src,
slot,
currentChunkOffset * elementSize,
currentChunkLength * elementSize);
}

// Issue send operations for reduced chunk to peers.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto dst = group.ranks[i];
if (dst == context->rank) {
continue;
}
out->send(
dst,
slot,
group.myChunkOffset * elementSize,
group.myChunkLength * elementSize);
}

// Wait for operations to complete.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
out->waitRecv();
out->waitSend();
}

// Broadcast result to multiple output buffers, if applicable.
for (size_t i = 0; i < group.ranks.size(); i++) {
const auto peer = group.ranks[i];
if (peer == context->rank) {
continue;
}
const size_t currentChunkOffset =
group.bufferOffset + i * group.chunkLength;
const size_t currentChunkLength = std::min(
size_t(group.chunkLength),
size_t(std::max(
int64_t(0),
int64_t(group.bufferLength) - int64_t(i * group.chunkLength))));
broadcastOutputs(
currentChunkOffset * elementSize, currentChunkLength * elementSize);
}
}
}

alltoallv

同alltoall一样,只不过alltoallv的实现多了offset,没有什么高深的算法,只是在send-recv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void alltoallv(AlltoallvOptions& opts) {
const auto& context = opts.context;
transport::UnboundBuffer* in = opts.in.get();
transport::UnboundBuffer* out = opts.out.get();
std::vector<size_t>& inOffsetPerRank = opts.inOffsetPerRank;
std::vector<size_t>& inLengthPerRank = opts.inLengthPerRank;
std::vector<size_t>& outOffsetPerRank = opts.outOffsetPerRank;
std::vector<size_t>& outLengthPerRank = opts.outLengthPerRank;
const auto slot = Slot::build(kAlltoallSlotPrefix, opts.tag);

int myRank = context->rank;
int worldSize = context->size;

// Local copy.
GLOO_ENFORCE(inLengthPerRank[myRank] == outLengthPerRank[myRank]);
size_t myInOffset = inOffsetPerRank[myRank];
size_t myOutOffset = outOffsetPerRank[myRank];
size_t myChunkSize = inLengthPerRank[myRank];
memcpy(
static_cast<char*>(out->ptr) + myOutOffset,
static_cast<char*>(in->ptr) + myInOffset,
myChunkSize);

// Remote copy.
for (int i = 1; i < worldSize; i++) {
int sendRank = (myRank + i) % worldSize;
int recvRank = (myRank + worldSize - i) % worldSize;
in->send(
sendRank, slot, inOffsetPerRank[sendRank], inLengthPerRank[sendRank]);
out->recv(
recvRank, slot, outOffsetPerRank[recvRank], outLengthPerRank[recvRank]);
}

for (int i = 1; i < worldSize; i++) {
in->waitSend(opts.timeout);
out->waitRecv(opts.timeout);
}
}

barrier

如果是共有16进程的话,

0号进程会与15 1,14 2,12 4,8 8进程recv send

1号进程会与0 2,15 3,13 5,9 9进程recv send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void barrier(BarrierOptions& opts) {
const auto& context = opts.context;
auto& buffer = opts.buffer;
const auto slot = Slot::build(kBarrierSlotPrefix, opts.tag);

// Below implements a dissemination barrier, described in "Two algorithms
// for barrier synchronization (1988)" by Hensgen, Finkel and Manber.
// PDF: https://www.inf.ed.ac.uk/teaching/courses/ppls/BarrierPaper.pdf
// DOI: 10.1007/BF01379320

// Instead of iterating over i up to log2(context->size), we immediately
// compute 2^i and compare with context->size.
for (size_t d = 1; d < context->size; d <<= 1) {
buffer->recv((context->size + context->rank - d) % context->size, slot);
buffer->send((context->size + context->rank + d) % context->size, slot);
buffer->waitRecv(opts.timeout);
buffer->waitSend(opts.timeout);
}
}

broadcast

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void broadcast(BroadcastOptions& opts) {
const auto& context = opts.context;
transport::UnboundBuffer* in = opts.in.get();
transport::UnboundBuffer* out = opts.out.get();
const auto slot = Slot::build(kBroadcastSlotPrefix, opts.tag);

if (context->rank == opts.root) {
in = out;
}

// 将rank映射到根进程rank为 0 的新rank。
const size_t vsize = context->size;
const size_t vrank = (context->rank + vsize - opts.root) % vsize;
const size_t dim = log2ceil(vsize);

// 跟踪未决发送操作的数量。
// 发送操作可以异步完成,因为迭代之间存在依赖关系。
// 这与必须在任何发送操作排队之前完成的 recv 操作不同。
size_t numSends = 0;

// 创建全为 1 的掩码,我们从 LSB 开始逐步将位设置为 0。
// 当应用于虚拟rank的掩码等于 0 时,我们知道该进程必须参与。
// 这导致从虚拟rank 0 和 1 开始的指数级参与.
size_t mask = (1 << dim) - 1;

for (size_t i = 0; i < dim; i++) {
// Clear bit `i`. 在第一次迭代中,虚拟rank 0 和 1 参与。
// 在第二次迭代中,0、1、2 和 3 参与,依此类推。
mask ^= (1 << i);
if ((vrank & mask) != 0) {
continue;
}

// The virtual rank of the peer in this iteration has opposite bit `i`.
auto vpeer = vrank ^ (1 << i);
if (vpeer >= vsize) {
continue;
}

// Map virtual rank of peer to actual rank of peer.
auto peer = (vpeer + opts.root) % vsize;
if ((vrank & (1 << i)) == 0) {
in->send(peer, slot);
numSends++;
} else {
out->recv(peer, slot);
out->waitRecv(opts.timeout);
}
}

// Copy local input to output if applicable.
if (context->rank == opts.root && in != out) {
memcpy(out->ptr, in->ptr, out->size);
}

// Wait on pending sends.
for (auto i = 0; i < numSends; i++) {
in->waitSend(opts.timeout);
}
}

avx优化

一些reduce函数使用了avx。_mm256_cvtph_ps将八个半精度(16 位)浮点值转换为单精度浮点值。_mm256_cvtps_ph将八个单精度浮点值转换为半精度(16 位)浮点值。_mm_storeu_si128将计算结果等SSE暂存器的数据保存到内存中。_mm256_mul_ps对第一个源向量 m1 中的八个压缩单精度浮点元素(float32 元素)与第二个源向量 m2 中的八个 float32 元素执行 SIMD 乘法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

//假设 x 和 y 要么都对齐到 32 字节,要么未对齐相同的偏移量,就像在对齐缓冲区内的偏移量处减少时会发生的那样
template <>
void sum<float16>(void* c_, const void* a_, const void* b_, size_t n) {
float16* c = static_cast<float16*>(c_);
const float16* a = static_cast<const float16*>(a_);
const float16* b = static_cast<const float16*>(b_);
size_t i;
for (i = 0; i < (n / 8) * 8; i += 8) {
__m256 va32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&a[i])));
__m256 vb32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&b[i])));
__m128i vc16 = _mm256_cvtps_ph(_mm256_add_ps(va32, vb32), 0);
_mm_storeu_si128((__m128i*)(&c[i]), vc16);
}
// Leftovers
for (; i < n; i++) {
c[i] = a[i] + b[i];
}
}

template <>
void product<float16>(void* c_, const void* a_, const void* b_, size_t n) {
float16* c = static_cast<float16*>(c_);
const float16* a = static_cast<const float16*>(a_);
const float16* b = static_cast<const float16*>(b_);
size_t i;
for (i = 0; i < (n / 8) * 8; i += 8) {
__m256 va32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&a[i])));
__m256 vb32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&b[i])));
__m128i vc16 = _mm256_cvtps_ph(_mm256_mul_ps(va32, vb32), 0);
_mm_storeu_si128((__m128i*)(&c[i]), vc16);
}
// Leftovers
for (; i < n; i++) {
c[i] = a[i] * b[i];
}
}

template <>
void max<float16>(void* c_, const void* a_, const void* b_, size_t n) {
float16* c = static_cast<float16*>(c_);
const float16* a = static_cast<const float16*>(a_);
const float16* b = static_cast<const float16*>(b_);
size_t i;
for (i = 0; i < (n / 8) * 8; i += 8) {
__m256 va32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&a[i])));
__m256 vb32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&b[i])));
__m128i vc16 = _mm256_cvtps_ph(_mm256_max_ps(va32, vb32), 0);
_mm_storeu_si128((__m128i*)(&c[i]), vc16);
}
// Leftovers
for (; i < n; i++) {
c[i] = std::max(a[i], b[i]);
}
}

template <>
void min<float16>(void* c_, const void* a_, const void* b_, size_t n) {
float16* c = static_cast<float16*>(c_);
const float16* a = static_cast<const float16*>(a_);
const float16* b = static_cast<const float16*>(b_);
size_t i;
for (i = 0; i < (n / 8) * 8; i += 8) {
__m256 va32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&a[i])));
__m256 vb32 = _mm256_cvtph_ps(_mm_loadu_si128((__m128i*)(&b[i])));
__m128i vc16 = _mm256_cvtps_ph(_mm256_min_ps(va32, vb32), 0);
_mm_storeu_si128((__m128i*)(&c[i]), vc16);
}
// Leftovers
for (; i < n; i++) {
c[i] = std::min(a[i], b[i]);
}
}

reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
void reduce(ReduceOptions& opts) {
if (opts.elements == 0) {
return;
}
const auto& context = opts.context;
transport::UnboundBuffer* in = opts.in.get();
transport::UnboundBuffer* out = opts.out.get();
const auto slot = Slot::build(kReduceSlotPrefix, opts.tag);

const auto recvRank = (context->size + context->rank + 1) % context->size;
const auto sendRank = (context->size + context->rank - 1) % context->size;

// If input buffer is not specified, the output is also the input
if (in == nullptr) {
in = out;
}

// 如果只有一个进程,则短路。
if (context->size == 1) {
if (in != out) {
memcpy(out->ptr, in->ptr, opts.elements * opts.elementSize);
}
return;
}

// The ring algorithm works as follows.
//
// 给定的输入被分成与进程数相等的块数。
// 算法完成后,每个进程按顺序托管一个reduce输出块(rank 0 具有块 0,rank 1 具有块 1,等等)。
// 由于输入可能不能被进程数整除,因此最终的块可能有部分输出或可能为空。
//
// 当一个块沿着环传递并包含连续更多rank的reduce时,我们必须在为该块执行 I/O 和计算接收到的块和本地块之间的减少之间交替。
// 为了避免这种交替模式,我们将一个块分成多个段(> = 2),并确保我们有一个段在运行,同时计算另一个段的reduce。
// 段大小有一个上限,以最大限度地减少内存使用并避免不良的缓存行为。这意味着在处理非常大的输入时,每个块可能有很多段。
//
// 这里的命名法反映在下面的变量命名中(每个rank一个块,每个块多个段)。
//
const size_t totalBytes = opts.elements * opts.elementSize;

// 确保最大段大小是元素大小的倍数。 否则,在向上舍入到元素大小的最接近倍数后,段大小可能会超过最大段大小。 例如,如果maxSegmentSize = 10,而elementSize = 4,则向上取整后:segmentSize = 12;
const size_t maxSegmentSize =
opts.elementSize * (opts.maxSegmentSize / opts.elementSize);

// 每个段的字节数必须是每个元素的字节数的倍数才能进行缩减; 必要时四舍五入。
const size_t segmentBytes = roundUp(
std::min(
// Rounded division to have >= 2 segments per chunk.
(totalBytes + (context->size * 2 - 1)) / (context->size * 2),
// Configurable segment size limit
maxSegmentSize),
opts.elementSize);

// Compute how many segments make up the input buffer.
//
// 向上舍入到上下文大小的最接近的倍数,以便每个进程有相同数量的段,并且跨进程的执行是对称的。
// 最小值是上下文大小的两倍,因为下面的算法将发送/接收一个段与计算另一个段的缩减重叠。
//
const size_t numSegments = roundUp(
std::max(
(totalBytes + (segmentBytes - 1)) / segmentBytes,
(size_t)context->size * 2),
(size_t)context->size);
const size_t numSegmentsPerRank = numSegments / context->size;
const size_t chunkBytes = numSegmentsPerRank * segmentBytes;

// 分配暂存空间以容纳两个块
std::unique_ptr<uint8_t[]> tmpAllocation(new uint8_t[segmentBytes * 2]);
std::unique_ptr<transport::UnboundBuffer> tmpBuffer =
context->createUnboundBuffer(tmpAllocation.get(), segmentBytes * 2);
transport::UnboundBuffer* tmp = tmpBuffer.get();

// 使用动态查找临时缓冲区中的块偏移量。
// 在进行两个操作时,我们需要两个偏移量。
// 可以使用循环计数器对它们进行索引。
std::array<size_t, 2> segmentOffset;
segmentOffset[0] = 0;
segmentOffset[1] = segmentBytes;

// 函数计算给定块迭代要发送和接收的块的偏移量和长度。
auto computeReduceScatterOffsets = [&](size_t i) {
struct {
size_t sendOffset;
size_t recvOffset;
ssize_t sendLength;
ssize_t recvLength;
} result;

// 计算要发送的段索引(到 rank - 1)和要接收的段索引(从 rank + 1)。
// 乘以块中的字节数以获得偏移量。
// 允许偏移量超出范围(>=totalBytes),计算相关长度时会考虑到这一点。
result.sendOffset =
((((context->rank + 1) * numSegmentsPerRank) + i) * segmentBytes) %
(numSegments * segmentBytes);
result.recvOffset =
((((context->rank + 2) * numSegmentsPerRank) + i) * segmentBytes) %
(numSegments * segmentBytes);

// 如果段完全在范围内,则以下语句等于段字节。
// 如果不是,它会更少,甚至是负面的。 这就是需要 ssize_t 类型转换的原因。
result.sendLength = std::min(
(ssize_t)segmentBytes,
(ssize_t)totalBytes - (ssize_t)result.sendOffset);
result.recvLength = std::min(
(ssize_t)segmentBytes,
(ssize_t)totalBytes - (ssize_t)result.recvOffset);

return result;
};

for (auto i = 0; i < numSegments; i++) {
if (i >= 2) {
// 计算两次迭代前的发送和接收偏移量和长度。
// 需要这样我们知道何时等待操作以及何时忽略(当偏移量超出范围时),
// 并知道在哪里减少临时缓冲区的内容。
auto prev = computeReduceScatterOffsets(i - 2);
if (prev.recvLength > 0) {
tmp->waitRecv(opts.timeout);
opts.reduce(
static_cast<uint8_t*>(out->ptr) + prev.recvOffset,
static_cast<const uint8_t*>(in->ptr) + prev.recvOffset,
static_cast<const uint8_t*>(tmp->ptr) + segmentOffset[i & 0x1],
prev.recvLength / opts.elementSize);
}
if (prev.sendLength > 0) {
if ((i - 2) < numSegmentsPerRank) {
in->waitSend(opts.timeout);
} else {
out->waitSend(opts.timeout);
}
}
}

// 在最后两次迭代之外的所有迭代中发出新的发送和接收操作。
// 那时我们已经发送了我们需要的所有数据,只需要等待最终的段被减少到输出中。
if (i < (numSegments - 2)) {
// Compute send and receive offsets and lengths for this iteration.
auto cur = computeReduceScatterOffsets(i);
if (cur.recvLength > 0) {
tmp->recv(recvRank, slot, segmentOffset[i & 0x1], cur.recvLength);
}
if (cur.sendLength > 0) {
if (i < numSegmentsPerRank) {
in->send(sendRank, slot, cur.sendOffset, cur.sendLength);
} else {
out->send(sendRank, slot, cur.sendOffset, cur.sendLength);
}
}
}
}

// Gather to root rank.
// 注意:totalBytes <= (numSegments * segmentBytes),
// 这与在进程间贡献相同的通用聚集算法不兼容。
if (context->rank == opts.root) {
size_t numRecv = 0;
for (size_t rank = 0; rank < context->size; rank++) {
if (rank == context->rank) {
continue;
}
size_t recvOffset = rank * numSegmentsPerRank * segmentBytes;
ssize_t recvLength = std::min(
(ssize_t)chunkBytes, (ssize_t)totalBytes - (ssize_t)recvOffset);
if (recvLength > 0) {
out->recv(rank, slot, recvOffset, recvLength);
numRecv++;
}
}
for (size_t i = 0; i < numRecv; i++) {
out->waitRecv(opts.timeout);
}
} else {
size_t sendOffset = context->rank * numSegmentsPerRank * segmentBytes;
ssize_t sendLength = std::min(
(ssize_t)chunkBytes, (ssize_t)totalBytes - (ssize_t)sendOffset);
if (sendLength > 0) {
out->send(opts.root, slot, sendOffset, sendLength);
out->waitSend(opts.timeout);
}
}
}