Skip to content

Commit

Permalink
Add initial support for calculating Adler32 on-the-fly
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Johnson committed Feb 4, 2025
1 parent 16892b5 commit 737888c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 4 deletions.
12 changes: 10 additions & 2 deletions src/XrdCeph/XrdCephOss.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ XrdCephOss::~XrdCephOss() {
// declared and used in XrdCephPosix.cc
extern unsigned int g_maxCephPoolIdx;
extern unsigned int g_cephAioWaitThresh;
extern bool g_useAdler32;
extern bool g_useCRC32;

int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
int NoGo = 0;
Expand Down Expand Up @@ -359,9 +361,15 @@ int XrdCephOss::Configure(const char *configfn, XrdSysError &Eroute) {
m_configPoolnames = var;
} else {
Eroute.Emsg("Config", "Missing value for ceph.reportingpools in config file", configfn);
return 1;
return 1;
}
}
}
if (!strcmp(var, "ceph.useadler32")) {
g_useAdler32 = true;
} // useadler32
if (!strcmp(var, "ceph.usecrc32")) {
g_useCRC32 = true;
} // usecrc32
}
// Now check if any errors occured during file i/o
int retc = Config.LastError();
Expand Down
66 changes: 64 additions & 2 deletions src/XrdCeph/XrdCephPosix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,38 @@
#include "XrdCeph/XrdCephPosix.hh"
#include "XrdCeph/XrdCephBulkAioRead.hh"
#include "XrdSfs/XrdSfsFlags.hh" // for the OFFLINE flag status
#include "XrdCks/XrdCksData.hh"

#include <XrdCks/XrdCksAssist.hh>

using namespace std;

int setXrdCksAttr(const int fd, const char* cstype, const char* ckSumbuf) {

int rc = -1;

std::vector<char> attrData = XrdCksAttrData(cstype, ckSumbuf, time(0));

rc = ceph_posix_fsetxattr(fd, XrdCksAttrName(cstype).c_str(),
attrData.data(), attrData.size(), 0);

return rc;
}


std::vector<char> checksumData(const char* algName, const int algLen, const char* ckBuf) {

XrdCksData xd;
xd.Set(algName);
xd.Set(ckBuf, algLen);
xd.fmTime = time(0);
xd.csTime = xd.fmTime;

auto attrData = std::vector<char>( (char *)&xd, ((char *)&xd)+sizeof(xd));

return attrData;

}

/// small struct for directory listing
struct DirIterator {
Expand Down Expand Up @@ -111,6 +142,10 @@ XrdSysMutex g_init_mutex;
//JW Counter for number of times a given cluster is resolved.
std::map<unsigned int, unsigned long long> g_idxCntr;

//IJJ: Falg whether to calculate Adler32 checksum
bool g_useAdler32;
bool g_useCRC32;

/// Accessor to next ceph pool index
/// Note that this is not thread safe, but we do not care
/// as we only want a rough load balancing
Expand Down Expand Up @@ -750,6 +785,9 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode
}
}
// At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it
if (g_useAdler32) {
fr.adler32 = adler32(0L, Z_NULL, 0);
}
int fd = insertFileRef(fr);
logwrapper((char*)"File descriptor %d associated to file %s opened in write mode", fd, pathname);
return fd;
Expand Down Expand Up @@ -778,6 +816,20 @@ int ceph_posix_close(int fd) {
fr->asyncWrCompletionCount, fr->asyncWrStartCount, fr->bytesAsyncWritePending,
fr->asyncRdCompletionCount, fr->asyncRdStartCount, fr->bytesWritten, fr->maxOffsetWritten,
fr->longestAsyncWriteTime, fr->longestCallbackInvocation, (lastAsyncAge));
if (g_useAdler32) {
char ckBuf[8+1];

snprintf(ckBuf, 8+1, "%08lx", fr->adler32);
ckBuf[8] = '\0';
logwrapper((char*)"ceph_close: Adler32 checksum = %s", ckBuf);

int rc = setXrdCksAttr(fd, "adler32", (const char*)ckBuf);

if (rc != 0) {
logwrapper((char*)"ceph_close: Can't set attribute XrdCks.adler32 for checksum");
}

}
deleteFileRef(fd, *fr);
return 0;
} else {
Expand Down Expand Up @@ -839,6 +891,9 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) {
fr->wrcount++;
fr->bytesWritten+=count;
if (fr->offset) fr->maxOffsetWritten = std::max(fr->offset - 1, fr->maxOffsetWritten);
if (g_useAdler32) {
fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count);
}
return count;
} else {
return -EBADF;
Expand All @@ -849,7 +904,8 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
CephFileRef* fr = getFileRef(fd);
if (fr) {
// TODO implement proper logging level for this plugin - this should be only debug
//logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count);
//logwrapper((char*)"ceph_posix_pwrite: for fd %d, count=%d", fd, count);

if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
return -EBADF;
}
Expand All @@ -865,6 +921,9 @@ ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset)
fr->wrcount++;
fr->bytesWritten+=count;
if (offset + count) fr->maxOffsetWritten = std::max(offset + count - 1, fr->maxOffsetWritten);
if (g_useAdler32) {
fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count);
}
return count;
} else {
return -EBADF;
Expand Down Expand Up @@ -909,7 +968,7 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
const char *buf = (const char*)aiop->sfsAio.aio_buf;
size_t offset = aiop->sfsAio.aio_offset;
// TODO implement proper logging level for this plugin - this should be only debug
//logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count);
logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count);
if ((fr->flags & (O_WRONLY|O_RDWR)) == 0) {
return -EBADF;
}
Expand Down Expand Up @@ -939,6 +998,9 @@ ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
fr->asyncWrStartCount++;
::gettimeofday(&fr->lastAsyncSubmission, nullptr);
fr->bytesAsyncWritePending+=count;
if (g_useAdler32) {
fr->adler32 = adler32(fr->adler32, (const Bytef*)buf, count);
}
return rc;
} else {
return -EBADF;
Expand Down
4 changes: 4 additions & 0 deletions src/XrdCeph/XrdCephPosix.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "XrdSys/XrdSysPthread.hh"
#include "XrdOuc/XrdOucIOVec.hh"
#include <zlib.h>

// simple logging for XrdCeph buffering code
#define XRDCEPHLOGLEVEL 1
Expand Down Expand Up @@ -107,6 +108,7 @@ struct CephFile {
unsigned int nbStripes;
unsigned long long stripeUnit;
unsigned long long objectSize;

};

struct CephFileRef : CephFile {
Expand All @@ -127,6 +129,8 @@ struct CephFileRef : CephFile {
::timeval lastAsyncSubmission;
double longestAsyncWriteTime;
double longestCallbackInvocation;
uLong adler32;
uLong crc32;
};

#endif // __XRD_CEPH_POSIX__

0 comments on commit 737888c

Please sign in to comment.