Project

General

Profile

Bug #39238 » 0002-msg-async-rdma-support-qp-that-isn-t-associated-with.patch

Changcheng Liu, 04/11/2019 09:29 AM

View differences:

src/common/legacy_config_opts.h
OPTION(ms_async_rdma_receive_buffers, OPT_U32)
// max number of wr in srq
OPTION(ms_async_rdma_receive_queue_len, OPT_U32)
// support srq
OPTION(ms_async_rdma_support_srq, OPT_BOOL)
OPTION(ms_async_rdma_port_num, OPT_U32)
OPTION(ms_async_rdma_polling_us, OPT_U32)
OPTION(ms_async_rdma_local_gid, OPT_STR) // GID format: "fe80:0000:0000:0000:7efe:90ff:fe72:6efe", no zero folding
src/common/options.cc
.set_default(4096)
.set_description(""),
Option("ms_async_rdma_support_srq", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
.set_default(true)
.set_description(""),
Option("ms_async_rdma_port_num", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(1)
.set_description(""),
src/msg/async/rdma/Infiniband.cc
memset(&qpia, 0, sizeof(qpia));
qpia.send_cq = txcq->get_cq();
qpia.recv_cq = rxcq->get_cq();
qpia.srq = srq; // use the same shared receive queue
if (srq) {
qpia.srq = srq; // use the same shared receive queue
} else {
qpia.cap.max_recv_wr = max_recv_wr;
qpia.cap.max_recv_sge = 1;
}
qpia.cap.max_send_wr = max_send_wr; // max outstanding send requests
qpia.cap.max_send_sge = 1; // max send scatter-gather elements
qpia.cap.max_inline_data = MAX_INLINE_DATA; // max bytes of immediate data on send q
......
pd = new ProtectionDomain(cct, device);
assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
rx_queue_len = device->device_attr->max_srq_wr;
support_srq = cct->_conf->ms_async_rdma_support_srq;
if (support_srq)
rx_queue_len = device->device_attr->max_srq_wr;
else
rx_queue_len = device->device_attr->max_qp_wr;
if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl;
......
memory_manager = new MemoryManager(cct, device, pd);
memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);
srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
post_chunks_to_srq(rx_queue_len); //add to srq
if (support_srq) {
srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);
post_chunks_to_rq(rx_queue_len, NULL); //add to srq
}
}
Infiniband::~Infiniband()
{
if (!initialized)
return;
ibv_destroy_srq(srq);
if (support_srq)
ibv_destroy_srq(srq);
delete memory_manager;
delete pd;
}
......
return qp;
}
int Infiniband::post_chunks_to_srq(int num)
int Infiniband::post_chunks_to_rq(int num, ibv_qp *qp)
{
int ret, i = 0;
ibv_sge isge[num];
......
i++;
}
ibv_recv_wr *badworkrequest;
ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
assert(ret == 0);
if (support_srq) {
ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
assert(ret == 0);
} else {
assert(qp);
ret = ibv_post_recv(qp, &rx_work_request[0], &badworkrequest);
assert(ret == 0);
}
return i;
}
src/msg/async/rdma/Infiniband.h
MemPoolContext rxbuf_pool_ctx;
mem_pool rxbuf_pool;
void* huge_pages_malloc(size_t size);
void huge_pages_free(void *ptr);
};
......
bool initialized = false;
const std::string &device_name;
uint8_t port_num;
bool support_srq = false;
public:
explicit Infiniband(CephContext *c);
......
ibv_qp_type type, struct rdma_cm_id *cm_id);
ibv_srq* create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge);
// post rx buffers to srq, return number of buffers actually posted
int post_chunks_to_srq(int num);
int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
void post_chunk_to_pool(Chunk* chunk) {
get_memory_manager()->release_rx_buffer(chunk);
}
......
Chunk *get_tx_chunk_by_buffer(const char *c) { return memory_manager->get_tx_chunk_by_buffer(c); }
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);
uint32_t get_rx_queue_len() const { return rx_queue_len; }
};
#endif
src/msg/async/rdma/RDMAConnectedSocketImpl.cc
} else {
read += chunk->read(buf+read, response->byte_len);
dispatcher->post_chunk_to_pool(chunk);
update_post_backlog();
}
}
}
......
ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
if ((*c)->over()) {
dispatcher->post_chunk_to_pool(*c);
update_post_backlog();
ldout(cct, 25) << __func__ << " one chunk over." << dendl;
}
if (read == len) {
......
worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
}, true);
}
void RDMAConnectedSocketImpl::post_chunks_to_rq(int num)
{
post_backlog += num - infiniband->post_chunks_to_rq(num, qp->get_qp());
}
void RDMAConnectedSocketImpl::update_post_backlog()
{
if (post_backlog)
post_backlog -= post_backlog - dispatcher->post_chunks_to_rq(post_backlog, qp->get_qp());
}
src/msg/async/rdma/RDMAIWARPConnectedSocketImpl.cc
cm_channel = info->cm_channel;
status = RDMA_ID_CREATED;
remote_qpn = info->qp_num;
worker->center.submit_to(worker->center.get_id(), [this]() {
worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
status = CHANNEL_FD_CREATED;
}, false);
if (alloc_resource()) {
close_notify();
return;
}
worker->center.submit_to(worker->center.get_id(), [this]() {
worker->center.create_file_event(cm_channel->fd, EVENT_READABLE, cm_con_handler);
status = CHANNEL_FD_CREATED;
}, false);
status = RESOURCE_ALLOCATED;
local_qpn = qp->get_local_qp_number();
my_msg.qpn = local_qpn;
......
void RDMAIWARPConnectedSocketImpl::close() {
error = ECONNRESET;
active = false;
if (status >= CONNECTED)
if (status >= CONNECTED) {
rdma_disconnect(cm_id);
}
close_notify();
}
......
break;
case RDMA_CM_EVENT_ESTABLISHED:
ldout(cct, 20) << __func__ << " qp_num=" << cm_id->qp->qp_num << dendl;
status = CONNECTED;
if (!is_server) {
remote_qpn = event->param.conn.qp_num;
......
if (!qp) {
return -1;
}
if (!cct->_conf->ms_async_rdma_support_srq)
dispatcher->post_chunks_to_rq(infiniband->get_rx_queue_len(), qp->get_qp());
dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
src/msg/async/rdma/RDMAStack.cc
}
}
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
void RDMADispatcher::post_chunk_to_pool(Chunk* chunk)
{
Mutex::Locker l(lock);
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
// handle a case when we have a limited number of
// rx buffers and we could not post a required amount when polling
if (post_backlog > 0) {
ldout(cct, 20) << __func__ << " post_backlog is " << post_backlog << dendl;
post_backlog -= get_stack()->get_infiniband().post_chunks_to_srq(post_backlog);
}
}
int RDMADispatcher::post_chunks_to_rq(int num, ibv_qp *qp)
{
Mutex::Locker l(lock);
return get_stack()->get_infiniband().post_chunks_to_rq(num, qp);
}
void RDMADispatcher::polling()
......
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
post_backlog += rx_ret - get_stack()->get_infiniband().post_chunks_to_srq(rx_ret);
for (int i = 0; i < rx_ret; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
assert(wc[i].opcode == IBV_WC_RECV);
if (response->status == IBV_WC_SUCCESS) {
assert(wc[i].opcode == IBV_WC_RECV);
conn = get_conn_lockless(response->qp_num);
if (!conn) {
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
} else {
polled[conn].push_back(*response);
conn->post_chunks_to_rq(1);
polled[conn].push_back(*response);
}
} else {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << ")" << dendl;
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();
if (response->status != IBV_WC_WR_FLUSH_ERR) {
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();
}
get_stack()->get_infiniband().post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}
}
for (auto &&i : polled)
......
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
<< get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;
}
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected()) {
ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
conn->fault();
} else {
ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
if (conn && conn->is_connected()) {
ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;
conn->fault();
} else {
ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
}
}
}
src/msg/async/rdma/RDMAStack.h
bool done = false;
std::atomic<uint64_t> num_dead_queue_pair = {0};
std::atomic<uint64_t> num_qp_conn = {0};
int post_backlog = 0;
Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
// qp_num -> InfRcConnection
// The main usage of `qp_conns` is looking up connection by qp_num,
......
std::atomic<uint64_t> inflight = {0};
void post_chunk_to_pool(Chunk* chunk);
void post_chunk_to_pool(Chunk* chunk);
int post_chunks_to_rq(int num, ibv_qp *qp=NULL);
};
class RDMAWorker : public Worker {
......
int tcp_fd = -1;
bool active;// qp is active ?
bool pending;
int post_backlog = 0;
void notify();
ssize_t read_buffers(char* buf, size_t len);
......
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt);
bool is_pending() {return pending;}
void set_pending(bool val) {pending = val;}
void post_chunks_to_rq(int num);
void update_post_backlog();
class C_handle_connection : public EventCallback {
RDMAConnectedSocketImpl *csi;
(3-3/9)