版本
ceph版本为17.
ceph如何进行读写接口的实现
Ceph的客户端通过librados的接口进行集群的访问,这里的访问包括:
1)对集群的整体访问
2)对象的访问
两类接口,这套接口(API)包括C、C++和Python的实现,接口通过网络实现对Ceph集群的访问。在客户端层面,可以在自己的程序中调用该接口,从而集成Ceph集群的存储功能,或者在监控程序中实现对Ceph集群状态的监控。
初始化一个ObjectWriteOperation对象,并进行初始化,设置参数,然后进行写操作。
例子:在RGW中,客户端会初始化一个OBjectWriteOperation的对象,然后调用librados中的接口进行操作。
ObjectWriteOperation op;op.create(false);op.setxattr(RGW_ATTR_ID_TAG, bl);op.mtime2(&mtime_ts);op.write_full(*meta.data);op.rmxattr(name.c_str());rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);ioctx.operate(oid, op, flags);
下面针对每个函数调用进行详细的描述
-
Create:初始化op对象
void librados::ObjectWriteOperation::create(bool exclusive),实际调用的是ObjectOperation::create//librados_cxx.cc void librados::ObjectWriteOperation::create(bool exclusive){ceph_assert(impl);::ObjectOperation *o = &impl->o;o->create(exclusive); } //osdc/Object.h void create(bool excl) {OSDOp& o = add_op(CEPH_OSD_OP_CREATE);o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0); } //ops是ObjectOperation中的一个属性,类型为small_vector<OSDOp, osdc_opvec_len>; OSDOp& add_op(int op) {ops.emplace_back();ops.back().op.op = op;out_bl.push_back(nullptr);ceph_assert(ops.size() == out_bl.size());out_handler.emplace_back();ceph_assert(ops.size() == out_handler.size());out_rval.push_back(nullptr);ceph_assert(ops.size() == out_rval.size());out_ec.push_back(nullptr);ceph_assert(ops.size() == out_ec.size());return ops.back(); }
-
setxattr和rmxattr
二者类似,设置op对象的一些必要操作。//librados_cxx.cc void librados::ObjectWriteOperation::setxattr(const char *name, const bufferlist& v) { ceph_assert(impl); ::ObjectOperation *o = &impl->o; o->setxattr(name, v); } //osdc/Object.h void setxattr(const char *name, const ceph::buffer::list& bl) {add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); } //ops是ObjectOperation中的一个属性,类型为small_vector<OSDOp, osdc_opvec_len>; void add_xattr(int op, const char *name, const ceph::buffer::list& data) {OSDOp& osd_op = add_op(op);osd_op.op.xattr.name_len = (name ? strlen(name) : 0);osd_op.op.xattr.value_len = data.length();if (name)osd_op.indata.append(name, osd_op.op.xattr.name_len);osd_op.indata.append(data); }
-
write_full
关键函数,进行对象的写操作。//librados_cxx.cc void librados::ObjectWriteOperation::write_full(const bufferlist& bl){ceph_assert(impl);::ObjectOperation *o = &impl->o;bufferlist c = bl;o->write_full(c); } //osdc/Object.h void write_full(ceph::buffer::list& bl) {add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl); } void add_data(int op, uint64_t off, uint64_t len, ceph::buffer::list& bl) {OSDOp& osd_op = add_op(op);osd_op.op.extent.offset = off;osd_op.op.extent.length = len;osd_op.indata.claim_append(bl); }
-
rgw_rados_operate
当op初步处理完成后,即可进行operate操作。交由存储池的ioctx进行处理int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,librados::ObjectWriteOperation *op, optional_yield y, int flags){if (y) {auto& context = y.get_io_context();auto& yield = y.get_yield_context();boost::system::error_code ec;librados::async_operate(context, ioctx, oid, op, flags, yield[ec]);return -ec.value();}if (is_asio_thread) {ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;}return ioctx.operate(oid, op, flags); } int librados::IoCtx::operate(const std::string& oid, librados::ObjectWriteOperation *o, int flags) {object_t obj(oid);if (unlikely(!o->impl))return -EINVAL;return io_ctx_impl->operate(obj, &o->impl->o, (ceph::real_time *)o->impl->prt, translate_flags(flags)); } //librados/IoctxImpl.cc //其中核心部分是objecter_op和objecter的op_submit函数.objecter是osdc的Object类的对象,后面的操作就涉及到osdc中的操作了。 int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,ceph::real_time *pmtime, int flags) { ceph::real_time ut = (pmtime ? *pmtime :ceph::real_clock::now());/* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP)return -EROFS;if (!o->size())return 0;ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::operate::mylock"); ceph::condition_variable cond; bool done; int r; version_t ver;Context *oncommit = new C_SafeCond(mylock, cond, &done, &r);int op = o->ops[0].op.op; ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid<< " nspace=" << oloc.nspace << dendl; Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,*o, snapc, ut,flags | extra_op_flags,oncommit, &ver); objecter->op_submit(objecter_op);{std::unique_lock l{mylock};cond.wait(l, [&done] { return done;}); } ldout(client->cct, 10) << "Objecter returned from "<< ceph_osd_op_name(op) << " r=" << r << dendl;set_sync_op_version(ver);return r; }
总结
一个op经过初始化create,设置参数,setxattr,后面交给ioctx进行operate,submit给osdc。
下一篇,介绍osdc部分的处理。