Ceph源码解析:读写流程(4)

---c.开始下发object_map.aio_update的请求,这是一个状态更新的函数,不是很重要的环节,这里不再多说,当更新的请求完成时会自动回调到b申请的回调函数。

6. 进入到AioRequest::complete() 函数中。

void AioRequest::complete(int r) {    if (should_complete(r))  //---a }

---a.should_complete函数是一个纯虚函数,需要在继承类AbstractWrite中实现,来7. 看看AbstractWrite:: should_complete()

bool AbstractWrite::should_complete(int r) {    switch (m_state)    {        case LIBRBD_AIO_WRITE_PRE:  //----a        {          send_write(); //----b

----a.在send_pre中已经设置m_state的状态为LIBRBD_AIO_WRITE_PRE,所以会走这个分支。

----b. send_write()函数中,会继续进行处理,

7.1.下面来看这个send_write函数

void AbstractWrite::send_write() {      m_state = LIBRBD_AIO_WRITE_FLAT;  //----a      add_write_ops(&m_write);    // ----b      int r = m_ictx->data_ctx.aio_operate(m_oid, rados_completion, &m_write); }

---a.重新设置m_state的状态为 LIBRBD_AIO_WRITE_FLAT。

---b.填充m_write,将请求转化为m_write。

---c.下发m_write  ,使用data_ctx.aio_operate 函数处理。继续调用io_ctx_impl->aio_operate()函数,继续调用objecter->mutate().

8. objecter->mutate()

ceph_tid_t mutate(……..)  {    Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags, onack, oncommit, objver);  //----d    return op_submit(o); }

---d.将请求转化为Op请求,继续使用op_submit下发这个请求。在op_submit中继续调用_op_submit_with_budget处理请求。继续调用_op_submit处理。

8.1 _op_submit 的处理过程。这里值得细看

ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) {     check_for_latest_map = _calc_target(&op->target, &op->last_force_resend); //---a    int r = _get_session(op->target.osd, &s, lc);  //---b    _session_op_assign(s, op); //----c    _send_op(op, m); //----d

}

----a. _calc_target,通过计算当前object的保存的osd,然后将主osd保存在target中,rbd写数据都是先发送到主osd,主osd再将数据发送到其他的副本osd上。这里对于怎么来选取osd集合与主osd的关系就不再多说,在《ceph的数据存储之路(3)》中已经讲述这个过程的原理了,代码部分不难理解。

----b. _get_session,该函数是用来与主osd建立通信的,建立通信后,可以通过该通道发送给主osd。再来看看这个函数是怎么处理的

9. _get_session

int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc) {    map<int,OSDSession*>::iterator p = osd_sessions.find(osd);  //----a    OSDSession *s = new OSDSession(cct, osd); //----b    osd_sessions[osd] = s;//--c    s->con = messenger->get_connection(osdmap->get_inst(osd));//-d

}

----a.首先在osd_sessions中查找是否已经存在一个连接可以直接使用,第一次通信是没有的。

----b.重新申请一个OSDSession,并且使用osd等信息进行初始化。

---c. 将新申请的OSDSession添加到osd_sessions中保存,以备下次使用。

----d.调用messager的get_connection方法。在该方法中继续想办法与目标osd建立连接。

10. messager 是由子类simpleMessager实现的,下面来看下SimpleMessager中get_connection的实现方法

ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest) {    Pipe *pipe = _lookup_pipe(dest.addr);    //-----a    if (pipe)      {    }      else      {      pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL); //----b    }

}

----a.首先要查找这个pipe,第一次通信,自然这个pipe是不存在的。

----b. connect_rank 会根据这个目标osd的addr进行创建。看下connect_rank做了什么。

11. SimpleMessenger::connect_rank

Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,  int type, PipeConnection *con,    Message *first) {      Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING, static_cast<PipeConnection*>(con));      //----a    pipe->set_peer_type(type); //----b    pipe->set_peer_addr(addr); //----c    pipe->policy = get_policy(type); //----d    pipe->start_writer();  //----e    return pipe; //----f }

----a.首先需要创建这个pipe,并且pipe同pipecon进行关联。

----b,----c,-----d。都是进行一些参数的设置。

----e.开始启动pipe的写线程,这里pipe的写线程的处理函数pipe->writer(),该函数中会尝试连接osd。并且建立socket连接通道。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/bccb0f79f96239680a8dd2c94f779512.html