Bug #39238 » 0002-msg-async-rdma-support-qp-that-isn-t-associated-with.patch
src/common/legacy_config_opts.h | ||
---|---|---|
OPTION(ms_async_rdma_receive_buffers, OPT_U32)
|
||
// max number of wr in srq
|
||
OPTION(ms_async_rdma_receive_queue_len, OPT_U32)
|
||
// support srq
|
||
OPTION(ms_async_rdma_support_srq, OPT_BOOL)
|
||
OPTION(ms_async_rdma_port_num, OPT_U32)
|
||
OPTION(ms_async_rdma_polling_us, OPT_U32)
|
||
OPTION(ms_async_rdma_local_gid, OPT_STR) // GID format: "fe80:0000:0000:0000:7efe:90ff:fe72:6efe", no zero folding
|
src/common/options.cc | ||
---|---|---|
.set_default(4096)
|
||
.set_description(""),
|
||
Option("ms_async_rdma_support_srq", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
|
||
.set_default(true)
|
||
.set_description(""),
|
||
Option("ms_async_rdma_port_num", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
|
||
.set_default(1)
|
||
.set_description(""),
|
src/msg/async/rdma/Infiniband.cc | ||
---|---|---|
memset(&qpia, 0, sizeof(qpia));
|
||
qpia.send_cq = txcq->get_cq();
|
||
qpia.recv_cq = rxcq->get_cq();
|
||
qpia.srq = srq; // use the same shared receive queue
|
||
if (srq) {
|
||
qpia.srq = srq; // use the same shared receive queue
|
||
} else {
|
||
qpia.cap.max_recv_wr = max_recv_wr;
|
||
qpia.cap.max_recv_sge = 1;
|
||
}
|
||
qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
|
||
qpia.cap.max_send_sge = 1; // max send scatter-gather elements
|
||
qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
|
||
... | ... | |
pd = new ProtectionDomain(cct, device);
|
||
assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
|
||
rx_queue_len = device->device_attr->max_srq_wr;
|
||
support_srq = cct->_conf->ms_async_rdma_support_srq;
|
||
if (support_srq)
|
||
rx_queue_len = device->device_attr->max_srq_wr;
|
||
else
|
||
rx_queue_len = device->device_attr->max_qp_wr;
|
||
if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
|
||
rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
|
||
ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl;
|
||
... | ... | |
memory_manager = new MemoryManager(cct, device, pd);
|
||
memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
|
||
srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
|
||
post_chunks_to_srq(rx_queue_len); //add to srq
|
||
if (support_srq) {
|
||
srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
|
||
post_chunks_to_rq(rx_queue_len, NULL); //add to srq
|
||
}
|
||
}
|
||
Infiniband::~Infiniband()
|
||
{
|
||
if (!initialized)
|
||
return;
|
||
ibv_destroy_srq(srq);
|
||
if (support_srq)
|
||
ibv_destroy_srq(srq);
|
||
delete memory_manager;
|
||
delete pd;
|
||
}
|
||
... | ... | |
return qp;
|
||
}
|
||
int Infiniband::post_chunks_to_srq(int num)
|
||
int Infiniband::post_chunks_to_rq(int num, ibv_qp *qp)
|
||
{
|
||
int ret, i = 0;
|
||
ibv_sge isge[num];
|
||
... | ... | |
i++;
|
||
}
|
||
ibv_recv_wr *badworkrequest;
|
||
ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
|
||
assert(ret == 0);
|
||
if (support_srq) {
|
||
ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
|
||
assert(ret == 0);
|
||
} else {
|
||
assert(qp);
|
||
ret = ibv_post_recv(qp, &rx_work_request[0], &badworkrequest);
|
||
assert(ret == 0);
|
||
}
|
||
return i;
|
||
}
|
||
src/msg/async/rdma/Infiniband.h | ||
---|---|---|
MemPoolContext rxbuf_pool_ctx;
|
||
mem_pool rxbuf_pool;
|
||
void* huge_pages_malloc(size_t size);
|
||
void huge_pages_free(void *ptr);
|
||
};
|
||
... | ... | |
bool initialized = false;
|
||
const std::string &device_name;
|
||
uint8_t port_num;
|
||
bool support_srq = false;
|
||
public:
|
||
explicit Infiniband(CephContext *c);
|
||
... | ... | |
ibv_qp_type type, struct rdma_cm_id *cm_id);
|
||
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
|
||
// post rx buffers to srq, return number of buffers actually posted
|
||
int post_chunks_to_srq(int num);
|
||
int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
|
||
void post_chunk_to_pool(Chunk* chunk) {
|
||
get_memory_manager()->release_rx_buffer(chunk);
|
||
}
|
||
... | ... | |
Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
|
||
static const char* wc_status_to_string(int status);
|
||
static const char* qp_state_string(int status);
|
||
uint32_t get_rx_queue_len() const { return rx_queue_len; }
|
||
};
|
||
#endif
|
src/msg/async/rdma/RDMAConnectedSocketImpl.cc | ||
---|---|---|
} else {
|
||
read += chunk->read(buf+read, response->byte_len);
|
||
dispatcher->post_chunk_to_pool(chunk);
|
||
update_post_backlog();
|
||
}
|
||
}
|
||
}
|
||
... | ... | |
ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
|
||
if ((*c)->over()) {
|
||
dispatcher->post_chunk_to_pool(*c);
|
||
update_post_backlog();
|
||
ldout(cct, 25) << __func__ << " one chunk over." << dendl;
|
||
}
|
||
if (read == len) {
|
||
... | ... | |
worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
|
||
}, true);
|
||
}
|
||
void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
|
||
{
|
||
post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp());
|
||
}
|
||
void RDMAConnectedSocketImpl::update_post_backlog()
|
||
{
|
||
if (post_backlog)
|
||
post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp->get_qp());
|
||
}
|
src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc | ||
---|---|---|
cm_channel = info->cm_channel;
|
||
status = RDMA_ID_CREATED;
|
||
remote_qpn = info->qp_num;
|
||
worker->center.submit_to(worker->center.get_id(), [this]() {
|
||
worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
|
||
status = CHANNEL_FD_CREATED;
|
||
}, false);
|
||
if (alloc_resource()) {
|
||
close_notify();
|
||
return;
|
||
}
|
||
worker->center.submit_to(worker->center.get_id(), [this]() {
|
||
worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
|
||
status = CHANNEL_FD_CREATED;
|
||
}, false);
|
||
status = RESOURCE_ALLOCATED;
|
||
local_qpn = qp->get_local_qp_number();
|
||
my_msg.qpn = local_qpn;
|
||
... | ... | |
void RDMAIWARPConnectedSocketImpl::close() {
|
||
error = ECONNRESET;
|
||
active = false;
|
||
if (status >= CONNECTED)
|
||
if (status >= CONNECTED) {
|
||
rdma_disconnect(cm_id);
|
||
}
|
||
close_notify();
|
||
}
|
||
... | ... | |
break;
|
||
case RDMA_CM_EVENT_ESTABLISHED:
|
||
ldout(cct, 20) << __func__ << " qp_num=" << cm_id->qp->qp_num << dendl;
|
||
status = CONNECTED;
|
||
if (!is_server) {
|
||
remote_qpn = event->param.conn.qp_num;
|
||
... | ... | |
if (!qp) {
|
||
return -1;
|
||
}
|
||
if (!cct->_conf->ms_async_rdma_support_srq)
|
||
dispatcher->post_chunks_to_rq(infiniband->get_rx_queue_len(), qp->get_qp());
|
||
dispatcher->register_qp(qp, this);
|
||
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
|
||
dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
|
src/msg/async/rdma/RDMAStack.cc | ||
---|---|---|
}
|
||
}
|
||
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
|
||
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
|
||
{
|
||
Mutex::Locker l(lock);
|
||
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
|
||
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
|
||
// handle a case when we have a limited number of
|
||
// rx buffers and we could not post a required amount when polling
|
||
if (post_backlog > 0) {
|
||
ldout(cct, 20) << __func__ << " post_backlog is " << post_backlog << dendl;
|
||
post_backlog -= get_stack()->get_infiniband().post_chunks_to_srq(post_backlog);
|
||
}
|
||
}
|
||
int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
|
||
{
|
||
Mutex::Locker l(lock);
|
||
return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
|
||
}
|
||
void RDMADispatcher::polling()
|
||
... | ... | |
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
|
||
post_backlog += rx_ret - get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
|
||
for (int i = 0; i < rx_ret; ++i) {
|
||
ibv_wc* response = &wc[i];
|
||
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
|
||
ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
|
||
assert(wc[i].opcode == IBV_WC_RECV);
|
||
if (response->status == IBV_WC_SUCCESS) {
|
||
assert(wc[i].opcode == IBV_WC_RECV);
|
||
conn = get_conn_lockless(response->qp_num);
|
||
if (!conn) {
|
||
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
|
||
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
|
||
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
|
||
} else {
|
||
polled[conn].push_back(*response);
|
||
conn->post_chunks_to_rq(1);
|
||
polled[conn].push_back(*response);
|
||
}
|
||
} else {
|
||
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
|
||
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
|
||
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
|
||
<< ") status(" << response->status << ":"
|
||
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
|
||
conn = get_conn_lockless(response->qp_num);
|
||
if (conn && conn->is_connected())
|
||
conn->fault();
|
||
if (response->status != IBV_WC_WR_FLUSH_ERR) {
|
||
conn = get_conn_lockless(response->qp_num);
|
||
if (conn && conn->is_connected())
|
||
conn->fault();
|
||
}
|
||
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
|
||
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
|
||
}
|
||
}
|
||
for (auto &&i : polled)
|
||
... | ... | |
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
|
||
<< response->wr_id << ") status(" << response->status << "): "
|
||
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
|
||
}
|
||
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
|
||
RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
|
||
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
|
||
RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
|
||
if (conn && conn->is_connected()) {
|
||
ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
|
||
conn->fault();
|
||
} else {
|
||
ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
|
||
if (conn && conn->is_connected()) {
|
||
ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;
|
||
conn->fault();
|
||
} else {
|
||
ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
|
||
}
|
||
}
|
||
}
|
||
src/msg/async/rdma/RDMAStack.h | ||
---|---|---|
bool done = false;
|
||
std::atomic<uint64_t> num_dead_queue_pair = {0};
|
||
std::atomic<uint64_t> num_qp_conn = {0};
|
||
int post_backlog = 0;
|
||
Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
|
||
// qp_num -> InfRcConnection
|
||
// The main usage of `qp_conns` is looking up connection by qp_num,
|
||
... | ... | |
std::atomic<uint64_t> inflight = {0};
|
||
void post_chunk_to_pool(Chunk* chunk);
|
||
void post_chunk_to_pool(Chunk* chunk);
|
||
int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
|
||
};
|
||
class RDMAWorker : public Worker {
|
||
... | ... | |
int tcp_fd = -1;
|
||
bool active;// qp is active ?
|
||
bool pending;
|
||
int post_backlog = 0;
|
||
void notify();
|
||
ssize_t read_buffers(char* buf, size_t len);
|
||
... | ... | |
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
|
||
bool is_pending() {return pending;}
|
||
void set_pending(bool val) {pending = val;}
|
||
void post_chunks_to_rq(int num);
|
||
void update_post_backlog();
|
||
class C_handle_connection : public EventCallback {
|
||
RDMAConnectedSocketImpl *csi;
|