Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion proto
149 changes: 105 additions & 44 deletions src/client/fuse/fuse_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

#include "client/fuse/fuse_op.h"

#include <sys/types.h>

#include <cstddef>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <memory>
#include <string>
Expand All @@ -38,6 +42,7 @@ static dingofs::client::vfs::VFSWrapper* g_vfs = nullptr;

USING_FLAG(client_fuse_file_info_direct_io)
USING_FLAG(client_fuse_file_info_keep_cache)
USING_FLAG(client_fuse_enable_readdir_cache)

using dingofs::Status;
using dingofs::client::vfs::Attr;
Expand Down Expand Up @@ -114,51 +119,80 @@ Attr Stat2Attr(struct stat* stat) {
} // namespace

static void ReplyError(fuse_req_t req, const Status& s) {
fuse_reply_err(req, s.ToSysErrNo());
int ret = fuse_reply_err(req, s.ToSysErrNo());
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_err fail, ret({}).", errno);
}
}

static void ReplyEntry(fuse_req_t req, const Attr& attr) {
fuse_entry_param e;
memset(&e, 0, sizeof(e));
Attr2FuseEntry(attr, &e);
fuse_reply_entry(req, &e);

int ret = fuse_reply_entry(req, &e);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_entry fail, ret({}).", errno);
}
}

static void ReplyAttr(fuse_req_t req, const Attr& attr) {
struct stat stat;
memset(&stat, 0, sizeof(stat));
Attr2Stat(attr, &stat);
fuse_reply_attr(req, &stat, g_vfs->GetAttrTimeout(attr.type));

int ret = fuse_reply_attr(req, &stat, g_vfs->GetAttrTimeout(attr.type));
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_attr fail, ret({}).", errno);
}
}

static void ReplyReadlink(fuse_req_t req, const std::string& link) {
fuse_reply_readlink(req, link.c_str());
int ret = fuse_reply_readlink(req, link.c_str());
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_readlink fail, ret({}).",
errno);
}
}

static void ReplyOpen(fuse_req_t req, struct fuse_file_info* fi) {
fuse_reply_open(req, fi);
int ret = fuse_reply_open(req, fi);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_open fail, ret({}).", errno);
}
}

static void ReplyCreate(fuse_req_t req, struct fuse_file_info* fi,
const Attr& attr) {
fuse_entry_param e;
memset(&e, 0, sizeof(fuse_entry_param));
Attr2FuseEntry(attr, &e);
fuse_reply_create(req, &e, fi);

int ret = fuse_reply_create(req, &e, fi);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_create fail, ret({}).", errno);
}
}

static void ReplyData(fuse_req_t req, char* buffer, size_t size) {
struct fuse_bufvec bufvec = FUSE_BUFVEC_INIT(size);
bufvec.buf[0].mem = buffer;
fuse_reply_data(req, &bufvec, FUSE_BUF_SPLICE_MOVE);

int ret = fuse_reply_data(req, &bufvec, FUSE_BUF_SPLICE_MOVE);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_create fail, ret({}).", errno);
}
}

static void ReplyData(fuse_req_t req,
dingofs::client::vfs::DataBuffer& data_buffer) {
std::vector<dingofs::client::vfs::IOVec> iovecs = data_buffer.GatherIOVecs();
auto iovecs = data_buffer.GatherIOVecs();

if (iovecs.empty()) {
fuse_reply_buf(req, nullptr, 0);
int ret = fuse_reply_buf(req, nullptr, 0);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_data fail, ret({}).", errno);
}
return;
}

Expand All @@ -185,24 +219,39 @@ static void ReplyData(fuse_req_t req,
tmp_fuse_bufvec->buf[i].size = iovecs[i].iov_len;
}

fuse_reply_data(req, tmp_fuse_bufvec.get(), FUSE_BUF_SPLICE_MOVE);
int ret = fuse_reply_data(req, tmp_fuse_bufvec.get(), FUSE_BUF_SPLICE_MOVE);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_data fail, ret({}).", errno);
}
}

static void ReplyWrite(fuse_req_t req, size_t size) {
fuse_reply_write(req, size);
int ret = fuse_reply_write(req, size);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_write fail, ret({}).", errno);
}
}

static void ReplyBuf(fuse_req_t req, char* buffer, size_t size) {
fuse_reply_buf(req, buffer, size);
int ret = fuse_reply_buf(req, buffer, size);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_buf fail, ret({}).", errno);
}
}

// Reply with needed buffer size
static void ReplyXattr(fuse_req_t req, size_t size) {
fuse_reply_xattr(req, size);
int ret = fuse_reply_xattr(req, size);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_xattr fail, ret({}).", errno);
}
}

static void ReplyIoctl(fuse_req_t req, const char* out_buf, size_t out_bufsz) {
fuse_reply_ioctl(req, 0, out_buf, out_bufsz);
int ret = fuse_reply_ioctl(req, 0, out_buf, out_bufsz);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_ioctl fail, ret({}).", errno);
}
}

static void ReplyStatfs(fuse_req_t req, const FsStat& stat) {
Expand Down Expand Up @@ -253,7 +302,10 @@ static void ReplyStatfs(fuse_req_t req, const FsStat& stat) {
stbuf.f_flag = 0;
stbuf.f_namemax = g_vfs->GetMaxNameLength();

fuse_reply_statfs(req, &stbuf);
int ret = fuse_reply_statfs(req, &stbuf);
if (ret != 0) {
LOG(ERROR) << fmt::format("[fuse] fuse_reply_statfs fail, ret({}).", errno);
}
}

int InitFuseClient(const char* argv0, const struct MountOption* mount_option) {
Expand Down Expand Up @@ -531,28 +583,28 @@ void FuseOpOpenDir(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) {
} else {
fi->fh = fh;

fi->cache_readdir = FLAGS_client_fuse_file_info_keep_cache ? 1 : 0;
fi->keep_cache = FLAGS_client_fuse_file_info_keep_cache ? 1 : 0;
fi->cache_readdir = FLAGS_client_fuse_enable_readdir_cache ? 1 : 0;
fi->keep_cache = FLAGS_client_fuse_enable_readdir_cache ? 1 : 0;

ReplyOpen(req, fi);
}
}

void FuseOpReadDir(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
struct fuse_file_info* fi) {
VLOG(1) << fmt::format("read dir, ino({}) fh({}) off({}) size({})", ino,
VLOG(1) << fmt::format("read dir, ino({}) fh({}) off({}) size({}).", ino,
fi->fh, off, size);

CHECK_GE(off, 0) << "offset is illegal, offset: " << off;

off_t next_off = off;
size_t writed_size = 0;
std::string buffer(size, '\0');
Status s = g_vfs->ReadDir(
ino, fi->fh, off, false,
[&](const dingofs::client::vfs::DirEntry& dir_entry,
uint64_t off) -> bool {
(void)off;
VLOG(1) << fmt::format("read dir entry({}/{})", dir_entry.name,
[&](const dingofs::client::vfs::DirEntry& dir_entry, uint64_t) -> bool {
VLOG(1) << fmt::format("read dir({}) off[{},{}) fh({}) entry({}/{}).",
ino, off, next_off, fi->fh, dir_entry.name,
dir_entry.ino);

struct stat stat;
Expand All @@ -566,9 +618,9 @@ void FuseOpReadDir(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
dir_entry.name.c_str(), &stat, ++off);
if (entsize > rest_size) {
VLOG(1) << fmt::format(
"read dir entry is full, ino({}) fh({}) off({}) size({}/{}) "
"entry_size({})",
ino, fi->fh, off, buffer.size(), size, entsize);
"read dir entry is full, ino({}) fh({}) off[{},{}) size({}) "
"entry_size({}) rest_size({}).",
ino, fi->fh, off, next_off, buffer.size(), entsize, rest_size);
return false;
}

Expand All @@ -579,64 +631,73 @@ void FuseOpReadDir(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,

if (!s.ok()) {
LOG(ERROR) << fmt::format(
"read dir fail, ino({}) fh({}) off({}) size({}) error({})", ino, fi->fh,
off, size, s.ToString());
"read dir fail, ino({}) fh({}) off[{},{}) size({}) error({}).", ino,
fi->fh, off, next_off, size, s.ToString());
ReplyError(req, s);

} else {
buffer.resize(writed_size);

VLOG(1) << fmt::format("read dir success, ino({}) fh({}) off({}) size({}) ",
ino, fi->fh, off, buffer.size());
VLOG(1) << fmt::format(
"read dir success, ino({}) fh({}) off[{},{}) size({}).", ino, fi->fh,
off, next_off, writed_size);

ReplyBuf(req, buffer.data(), buffer.size());
}
}

void FuseOpReadDirPlus(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
struct fuse_file_info* fi) {
VLOG(1) << fmt::format("read dir, ino({}) fh({}) off({}) size({})", ino,
VLOG(1) << fmt::format("read dir, ino({}) fh({}) off({}) size({}).", ino,
fi->fh, off, size);

CHECK_GE(off, 0) << "offset is illegal, offset: " << off;

off_t next_off = off;
size_t writed_size = 0;
std::string buffer(size, '\0');
Status s = g_vfs->ReadDir(
ino, fi->fh, off, true,
[&](const dingofs::client::vfs::DirEntry& dir_entry, int32_t) -> bool {
(void)off;
VLOG(1) << fmt::format("read dir entry({}/{}) attr({})", dir_entry.name,
dir_entry.ino, Attr2Str(dir_entry.attr));
[ino, off, &next_off, &req, &fi, &buffer, &writed_size](
const dingofs::client::vfs::DirEntry& dir_entry, uint64_t) -> bool {
VLOG(1) << fmt::format(
"read dir({}) off[{},{}) fh({}) entry({}/{}) attr({}).", ino, off,
next_off, fi->fh, dir_entry.name, dir_entry.ino,
Attr2Str(dir_entry.attr));

fuse_entry_param fuse_entry;
memset(&fuse_entry, 0, sizeof(fuse_entry_param));
Attr2FuseEntry(dir_entry.attr, &fuse_entry);

size_t rest_size = buffer.size() - writed_size;

size_t entsize =
fuse_add_direntry_plus(req, buffer.data() + writed_size, rest_size,
dir_entry.name.c_str(), &fuse_entry, ++off);
size_t entsize = fuse_add_direntry_plus(
req, buffer.data() + writed_size, rest_size, dir_entry.name.c_str(),
&fuse_entry, next_off + 1);
if (entsize > rest_size) {
VLOG(1) << fmt::format(
"read dir entry is full, ino({}) fh({}) off({}) size({}/{}) "
"entry_size({})",
ino, fi->fh, off, buffer.size(), size, entsize);
"read dir entry is full, ino({}) fh({}) off[{},{}) size({}) "
"entry_size({}) rest_size({}).",
ino, fi->fh, off, next_off, buffer.size(), entsize, rest_size);
return false;
}
++next_off;
writed_size += entsize;

return true;
});

if (!s.ok()) {
LOG(ERROR) << fmt::format(
"read dir fail, ino({}) fh({}) off({}) size({}) error({})", ino, fi->fh,
off, size, s.ToString());
"read dir fail, ino({}) fh({}) off[{},{}) size({}) error({}).", ino,
fi->fh, off, next_off, size, s.ToString());
ReplyError(req, s);

} else {
buffer.resize(writed_size);
VLOG(1) << fmt::format("read dir success, ino({}) fh({}) off({}) size({}) ",
ino, fi->fh, off, buffer.size());
VLOG(1) << fmt::format(
"read dir success, ino({}) fh({}) off[{},{}) size({}).", ino, fi->fh,
off, next_off, writed_size);

ReplyBuf(req, buffer.data(), buffer.size());
}
Expand Down
6 changes: 1 addition & 5 deletions src/client/vfs/data/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ Status File::PreCheck() {
Status File::Write(ContextSPtr ctx, const char* buf, uint64_t size,
uint64_t offset, uint64_t* out_wsize) {
DINGOFS_RETURN_NOT_OK(PreCheck());
Status s = file_writer_->Write(ctx, buf, size, offset, out_wsize);
if (s.ok()) {
file_reader_->Invalidate();
}
return s;
return file_writer_->Write(ctx, buf, size, offset, out_wsize);
}

Status File::Read(ContextSPtr ctx, DataBuffer* data_buffer, uint64_t size,
Expand Down
Loading