refactor tindex write

This commit is contained in:
yihaoDeng 2021-12-22 15:47:42 +08:00
parent 0cd932e8ef
commit 7d8e5c3a97
1 changed files with 120 additions and 105 deletions

View File

@ -31,112 +31,21 @@ typedef struct TFileValue {
int32_t offset;
} TFileValue;
static int tfileValueCompare(const void* a, const void* b, const void* param);
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
static int tfileWriteHeader(TFileWriter* writer);
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
static int tfileReadLoadHeader(TFileReader* reader);
static int tfileGetFileList(const char* path, SArray* result);
static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
// static tfileGetCompareFunc(uint8_t byte) {}
static int tfileValueCompare(const void* a, const void* b, const void* param) {
__compar_fn_t fn = *(__compar_fn_t*)param;
TFileValue* av = (TFileValue*)a;
TFileValue* bv = (TFileValue*)b;
return fn(av->colVal, bv->colVal);
}
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) {
int tbSz = taosArrayGetSize(tableIds);
SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t);
for (size_t i = 0; i < tbSz; i++) {
uint64_t* v = taosArrayGet(tableIds, i);
SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
}
}
static FORCE_INLINE int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
tw->header.fstOffset = fstOffset;
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
return 0;
}
static FORCE_INLINE int tfileWriteHeader(TFileWriter* writer) {
char buf[TFILE_HEADER_NO_FST] = {0};
char* p = buf;
TFileHeader* header = &writer->header;
memcpy(buf, (char*)header, sizeof(buf));
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
if (sizeof(buf) != nwrite) { return -1; }
writer->offset = nwrite;
return 0;
}
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
TFileHeader* header = &write->header;
uint8_t colType = header->colType;
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
if (fstBuilderInsert(write->fb, key, tval->offset)) {
fstSliceDestroy(&key);
return 0;
}
fstSliceDestroy(&key);
return -1;
} else {
// handle other type later
}
}
static FORCE_INLINE int tfileReadLoadHeader(TFileReader* reader) {
// TODO simple tfile header later
char buf[TFILE_HEADER_SIZE];
char* p = buf;
int64_t nread = reader->ctx->read(reader->ctx, buf, TFILE_HEADER_SIZE);
assert(nread == TFILE_HEADER_SIZE);
memcpy(&reader->header, buf, sizeof(buf));
return 0;
}
static int tfileGetFileList(const char* path, SArray* result) {
DIR* dir = opendir(path);
if (NULL == dir) { return -1; }
struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
size_t len = strlen(entry->d_name);
char* buf = calloc(1, len + 1);
memcpy(buf, entry->d_name, len);
taosArrayPush(result, &buf);
}
closedir(dir);
return 0;
}
static void tfileDestroyFileName(void* elem) {
char* p = *(char**)elem;
free(p);
}
static int tfileCompare(const void* a, const void* b) {
const char* aName = *(char**)a;
const char* bName = *(char**)b;
size_t aLen = strlen(aName);
size_t bLen = strlen(bName);
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
}
// tfile name suid-colId-version.tindex
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
// read suid & colid & version success
return 0;
}
return -1;
}
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
SERIALIZE_MEM_TO_BUF(buf, key, suid);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, colType);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, version);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
}
TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache));
@ -369,3 +278,109 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
return 0;
}
static int tfileValueCompare(const void* a, const void* b, const void* param) {
__compar_fn_t fn = *(__compar_fn_t*)param;
TFileValue* av = (TFileValue*)a;
TFileValue* bv = (TFileValue*)b;
return fn(av->colVal, bv->colVal);
}
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) {
int tbSz = taosArrayGetSize(tableIds);
SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t);
for (size_t i = 0; i < tbSz; i++) {
uint64_t* v = taosArrayGet(tableIds, i);
SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
}
}
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
tw->header.fstOffset = fstOffset;
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
return 0;
}
static int tfileWriteHeader(TFileWriter* writer) {
char buf[TFILE_HEADER_NO_FST] = {0};
char* p = buf;
TFileHeader* header = &writer->header;
memcpy(buf, (char*)header, sizeof(buf));
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
if (sizeof(buf) != nwrite) { return -1; }
writer->offset = nwrite;
return 0;
}
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
TFileHeader* header = &write->header;
uint8_t colType = header->colType;
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
if (fstBuilderInsert(write->fb, key, tval->offset)) {
fstSliceDestroy(&key);
return 0;
}
fstSliceDestroy(&key);
return -1;
} else {
// handle other type later
}
}
static int tfileReadLoadHeader(TFileReader* reader) {
// TODO simple tfile header later
char buf[TFILE_HEADER_SIZE] = {0};
char* p = buf;
int64_t nread = reader->ctx->read(reader->ctx, buf, sizeof(buf));
assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf));
return 0;
}
static int tfileGetFileList(const char* path, SArray* result) {
DIR* dir = opendir(path);
if (NULL == dir) { return -1; }
struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
size_t len = strlen(entry->d_name);
char* buf = calloc(1, len + 1);
memcpy(buf, entry->d_name, len);
taosArrayPush(result, &buf);
}
closedir(dir);
return 0;
}
static void tfileDestroyFileName(void* elem) {
char* p = *(char**)elem;
free(p);
}
static int tfileCompare(const void* a, const void* b) {
const char* aName = *(char**)a;
const char* bName = *(char**)b;
size_t aLen = strlen(aName);
size_t bLen = strlen(bName);
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
}
// tfile name suid-colId-version.tindex
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
// read suid & colid & version success
return 0;
}
return -1;
}
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
SERIALIZE_MEM_TO_BUF(buf, key, suid);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, colType);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, version);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
}