---c.开始下发object_map.aio_update的请求,这是一个状态更新的函数,不是很重要的环节,这里不再多说,当更新的请求完成时会自动回调到b申请的回调函数。
6. 进入到AioRequest::complete() 函数中。
void AioRequest::complete(int r) { if (should_complete(r)) //---a }---a.should_complete函数是一个纯虚函数,需要在继承类AbstractWrite中实现,来7. 看看AbstractWrite:: should_complete()
bool AbstractWrite::should_complete(int r) { switch (m_state) { case LIBRBD_AIO_WRITE_PRE: //----a { send_write(); //----b----a.在send_pre中已经设置m_state的状态为LIBRBD_AIO_WRITE_PRE,所以会走这个分支。
----b. send_write()函数中,会继续进行处理,
7.1.下面来看这个send_write函数
void AbstractWrite::send_write() { m_state = LIBRBD_AIO_WRITE_FLAT; //----a add_write_ops(&m_write); // ----b int r = m_ictx->data_ctx.aio_operate(m_oid, rados_completion, &m_write); }---a.重新设置m_state的状态为 LIBRBD_AIO_WRITE_FLAT。
---b.填充m_write,将请求转化为m_write。
---c.下发m_write ,使用data_ctx.aio_operate 函数处理。继续调用io_ctx_impl->aio_operate()函数,继续调用objecter->mutate().
8. objecter->mutate()
ceph_tid_t mutate(……..) { Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags, onack, oncommit, objver); //----d return op_submit(o); }---d.将请求转化为Op请求,继续使用op_submit下发这个请求。在op_submit中继续调用_op_submit_with_budget处理请求。继续调用_op_submit处理。
8.1 _op_submit 的处理过程。这里值得细看
ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) { check_for_latest_map = _calc_target(&op->target, &op->last_force_resend); //---a int r = _get_session(op->target.osd, &s, lc); //---b _session_op_assign(s, op); //----c _send_op(op, m); //----d}
----a. _calc_target,通过计算当前object的保存的osd,然后将主osd保存在target中,rbd写数据都是先发送到主osd,主osd再将数据发送到其他的副本osd上。这里对于怎么来选取osd集合与主osd的关系就不再多说,在《ceph的数据存储之路(3)》中已经讲述这个过程的原理了,代码部分不难理解。
----b. _get_session,该函数是用来与主osd建立通信的,建立通信后,可以通过该通道发送给主osd。再来看看这个函数是怎么处理的
9. _get_session
int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc) { map<int,OSDSession*>::iterator p = osd_sessions.find(osd); //----a OSDSession *s = new OSDSession(cct, osd); //----b osd_sessions[osd] = s;//--c s->con = messenger->get_connection(osdmap->get_inst(osd));//-d}
----a.首先在osd_sessions中查找是否已经存在一个连接可以直接使用,第一次通信是没有的。
----b.重新申请一个OSDSession,并且使用osd等信息进行初始化。
---c. 将新申请的OSDSession添加到osd_sessions中保存,以备下次使用。
----d.调用messager的get_connection方法。在该方法中继续想办法与目标osd建立连接。
10. messager 是由子类simpleMessager实现的,下面来看下SimpleMessager中get_connection的实现方法
ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest) { Pipe *pipe = _lookup_pipe(dest.addr); //-----a if (pipe) { } else { pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL); //----b }}
----a.首先要查找这个pipe,第一次通信,自然这个pipe是不存在的。
----b. connect_rank 会根据这个目标osd的addr进行创建。看下connect_rank做了什么。
11. SimpleMessenger::connect_rank
Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, int type, PipeConnection *con, Message *first) { Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING, static_cast<PipeConnection*>(con)); //----a pipe->set_peer_type(type); //----b pipe->set_peer_addr(addr); //----c pipe->policy = get_policy(type); //----d pipe->start_writer(); //----e return pipe; //----f }----a.首先需要创建这个pipe,并且pipe同pipecon进行关联。
----b,----c,-----d。都是进行一些参数的设置。
----e.开始启动pipe的写线程,这里pipe的写线程的处理函数pipe->writer(),该函数中会尝试连接osd。并且建立socket连接通道。