From 0cd932e8ef1ab2e182be1cde7bb53db360af9db0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 22 Dec 2021 15:23:31 +0800 Subject: [PATCH 1/2] refactor tindex write --- source/libs/index/inc/index_tfile.h | 14 ++++--- source/libs/index/src/index_tfile.c | 63 +++++++++++++++-------------- 2 files changed, 42 insertions(+), 35 deletions(-) diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index 4da80a6dbe..d711da1807 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -27,18 +27,22 @@ extern "C" { #endif // tfile header content -// |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->| -// |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->| +// |<---suid--->|<---version--->|<-------colName------>|<---type-->|<--fstOffset->| +// |<-uint64_t->|<---int32_t--->|<--TSDB_COL_NAME_LEN-->|<-uint8_t->|<---int32_t-->| +#pragma pack(push, 1) typedef struct TFileHeader { uint64_t suid; int32_t version; - char colName[128]; // + char colName[TSDB_COL_NAME_LEN]; // uint8_t colType; + int32_t fstOffset; } TFileHeader; +#pragma pack(pop) -#define TFILE_HEADER_SIZE (sizeof(TFileHeader) + sizeof(uint32_t)) -#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t)) +#define TFILE_HEADER_SIZE (sizeof(TFileHeader)) +#define TFILE_HEADER_NO_FST (TFILE_HEADER_SIZE - sizeof(int32_t)) +//#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t)) typedef struct TFileCacheKey { uint64_t suid; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index fb1974a04c..7fd949738a 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -48,21 +48,23 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) { SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t); } } + +static FORCE_INLINE int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { + int32_t fstOffset = offset + sizeof(tw->header.fstOffset); + tw->header.fstOffset = fstOffset; + if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; } + return 0; +} static FORCE_INLINE int tfileWriteHeader(TFileWriter* writer) { - char buf[TFILE_HEADER_SIZE] = {0}; + char buf[TFILE_HEADER_NO_FST] = {0}; char* p = buf; TFileHeader* header = &writer->header; - SERIALIZE_MEM_TO_BUF(p, header, suid); - SERIALIZE_MEM_TO_BUF(p, header, version); - SERIALIZE_VAR_TO_BUF(p, strlen(header->colName), int32_t); + memcpy(buf, (char*)header, sizeof(buf)); - SERIALIZE_STR_MEM_TO_BUF(p, header, colName, strlen(header->colName)); - SERIALIZE_MEM_TO_BUF(p, header, colType); - int offset = p - buf; - int nwrite = writer->ctx->write(writer->ctx, buf, offset); - if (offset != nwrite) { return -1; } - writer->offset = offset; + int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf)); + if (sizeof(buf) != nwrite) { return -1; } + writer->offset = nwrite; return 0; } static int tfileWriteData(TFileWriter* write, TFileValue* tval) { @@ -82,26 +84,12 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) { } static FORCE_INLINE int tfileReadLoadHeader(TFileReader* reader) { // TODO simple tfile header later - char buf[TFILE_HADER_PRE_SIZE]; + char buf[TFILE_HEADER_SIZE]; char* p = buf; - int64_t nread = reader->ctx->read(reader->ctx, buf, TFILE_HADER_PRE_SIZE); - assert(nread == TFILE_HADER_PRE_SIZE); - - TFileHeader* header = &reader->header; - memcpy(&header->suid, p, sizeof(header->suid)); - p += sizeof(header->suid); - - memcpy(&header->version, p, sizeof(header->version)); - p += sizeof(header->version); - - int32_t colLen = 0; - memcpy(&colLen, p, sizeof(colLen)); - assert(colLen < sizeof(header->colName)); - nread = reader->ctx->read(reader->ctx, header->colName, colLen); - assert(nread == colLen); - - nread = reader->ctx->read(reader->ctx, &header->colType, sizeof(header->colType)); + int64_t nread = reader->ctx->read(reader->ctx, buf, TFILE_HEADER_SIZE); + assert(nread == TFILE_HEADER_SIZE); + memcpy(&reader->header, buf, sizeof(buf)); return 0; } @@ -285,7 +273,7 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { return tw; } -int TFileWriterPut(TFileWriter* tw, void* data) { +int tfileWriterPut(TFileWriter* tw, void* data) { // sort by coltype and write to tindex __compar_fn_t fn = getComparFunc(tw->header.colType, 0); taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn); @@ -294,6 +282,21 @@ int TFileWriterPut(TFileWriter* tw, void* data) { char* buf = calloc(1, sizeof(bufLimit)); char* p = buf; int32_t sz = taosArrayGetSize((SArray*)data); + int32_t fstOffset = tw->offset; + + // ugly code, refactor later + for (size_t i = 0; i < sz; i++) { + TFileValue* v = taosArrayGetP((SArray*)data, i); + + int32_t tbsz = taosArrayGetSize(v->tableId); + int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz); + fstOffset += ttsz; + } + // check result or not + tfileWriteFstOffset(tw, fstOffset); + // tw->ctx->header.fstOffset = fstOffset; + // tw->ctx->write(tw->ctx, &fstOffset, sizeof(fstOffset)); + for (size_t i = 0; i < sz; i++) { TFileValue* v = taosArrayGetP((SArray*)data, i); @@ -311,7 +314,7 @@ int TFileWriterPut(TFileWriter* tw, void* data) { tfileSerialTableIdsToBuf(p, v->tableId); offset += ttsz; p = buf + offset; - // set up value offset and + // set up value offset v->offset = tw->offset; tw->offset += ttsz; } From 7d8e5c3a978068d7ac4e8e5aaa7454bfa97dd05f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 22 Dec 2021 15:47:42 +0800 Subject: [PATCH 2/2] refactor tindex write --- source/libs/index/src/index_tfile.c | 225 +++++++++++++++------------- 1 file changed, 120 insertions(+), 105 deletions(-) diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 7fd949738a..39100edd8b 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -31,112 +31,21 @@ typedef struct TFileValue { int32_t offset; } TFileValue; +static int tfileValueCompare(const void* a, const void* b, const void* param); +static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds); + +static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); +static int tfileWriteHeader(TFileWriter* writer); +static int tfileWriteData(TFileWriter* write, TFileValue* tval); + +static int tfileReadLoadHeader(TFileReader* reader); + +static int tfileGetFileList(const char* path, SArray* result); +static void tfileDestroyFileName(void* elem); +static int tfileCompare(const void* a, const void* b); +static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); +static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); // static tfileGetCompareFunc(uint8_t byte) {} -static int tfileValueCompare(const void* a, const void* b, const void* param) { - __compar_fn_t fn = *(__compar_fn_t*)param; - - TFileValue* av = (TFileValue*)a; - TFileValue* bv = (TFileValue*)b; - - return fn(av->colVal, bv->colVal); -} -static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) { - int tbSz = taosArrayGetSize(tableIds); - SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t); - for (size_t i = 0; i < tbSz; i++) { - uint64_t* v = taosArrayGet(tableIds, i); - SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t); - } -} - -static FORCE_INLINE int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { - int32_t fstOffset = offset + sizeof(tw->header.fstOffset); - tw->header.fstOffset = fstOffset; - if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; } - return 0; -} -static FORCE_INLINE int tfileWriteHeader(TFileWriter* writer) { - char buf[TFILE_HEADER_NO_FST] = {0}; - char* p = buf; - - TFileHeader* header = &writer->header; - memcpy(buf, (char*)header, sizeof(buf)); - - int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf)); - if (sizeof(buf) != nwrite) { return -1; } - writer->offset = nwrite; - return 0; -} -static int tfileWriteData(TFileWriter* write, TFileValue* tval) { - TFileHeader* header = &write->header; - uint8_t colType = header->colType; - if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { - FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal)); - if (fstBuilderInsert(write->fb, key, tval->offset)) { - fstSliceDestroy(&key); - return 0; - } - fstSliceDestroy(&key); - return -1; - } else { - // handle other type later - } -} -static FORCE_INLINE int tfileReadLoadHeader(TFileReader* reader) { - // TODO simple tfile header later - char buf[TFILE_HEADER_SIZE]; - char* p = buf; - - int64_t nread = reader->ctx->read(reader->ctx, buf, TFILE_HEADER_SIZE); - assert(nread == TFILE_HEADER_SIZE); - memcpy(&reader->header, buf, sizeof(buf)); - return 0; -} - -static int tfileGetFileList(const char* path, SArray* result) { - DIR* dir = opendir(path); - if (NULL == dir) { return -1; } - - struct dirent* entry; - while ((entry = readdir(dir)) != NULL) { - size_t len = strlen(entry->d_name); - char* buf = calloc(1, len + 1); - memcpy(buf, entry->d_name, len); - taosArrayPush(result, &buf); - } - closedir(dir); - return 0; -} -static void tfileDestroyFileName(void* elem) { - char* p = *(char**)elem; - free(p); -} -static int tfileCompare(const void* a, const void* b) { - const char* aName = *(char**)a; - const char* bName = *(char**)b; - - size_t aLen = strlen(aName); - size_t bLen = strlen(bName); - - return strncmp(aName, bName, aLen > bLen ? aLen : bLen); -} -// tfile name suid-colId-version.tindex -static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) { - if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) { - // read suid & colid & version success - return 0; - } - return -1; -} -static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) { - SERIALIZE_MEM_TO_BUF(buf, key, suid); - SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_MEM_TO_BUF(buf, key, colType); - SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_MEM_TO_BUF(buf, key, version); - SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); -} TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = calloc(1, sizeof(TFileCache)); @@ -369,3 +278,109 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { return 0; } + +static int tfileValueCompare(const void* a, const void* b, const void* param) { + __compar_fn_t fn = *(__compar_fn_t*)param; + + TFileValue* av = (TFileValue*)a; + TFileValue* bv = (TFileValue*)b; + + return fn(av->colVal, bv->colVal); +} +static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) { + int tbSz = taosArrayGetSize(tableIds); + SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t); + for (size_t i = 0; i < tbSz; i++) { + uint64_t* v = taosArrayGet(tableIds, i); + SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t); + } +} + +static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { + int32_t fstOffset = offset + sizeof(tw->header.fstOffset); + tw->header.fstOffset = fstOffset; + if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; } + return 0; +} +static int tfileWriteHeader(TFileWriter* writer) { + char buf[TFILE_HEADER_NO_FST] = {0}; + char* p = buf; + + TFileHeader* header = &writer->header; + memcpy(buf, (char*)header, sizeof(buf)); + + int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf)); + if (sizeof(buf) != nwrite) { return -1; } + writer->offset = nwrite; + return 0; +} +static int tfileWriteData(TFileWriter* write, TFileValue* tval) { + TFileHeader* header = &write->header; + uint8_t colType = header->colType; + if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { + FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal)); + if (fstBuilderInsert(write->fb, key, tval->offset)) { + fstSliceDestroy(&key); + return 0; + } + fstSliceDestroy(&key); + return -1; + } else { + // handle other type later + } +} +static int tfileReadLoadHeader(TFileReader* reader) { + // TODO simple tfile header later + char buf[TFILE_HEADER_SIZE] = {0}; + char* p = buf; + + int64_t nread = reader->ctx->read(reader->ctx, buf, sizeof(buf)); + assert(nread == sizeof(buf)); + memcpy(&reader->header, buf, sizeof(buf)); + return 0; +} + +static int tfileGetFileList(const char* path, SArray* result) { + DIR* dir = opendir(path); + if (NULL == dir) { return -1; } + + struct dirent* entry; + while ((entry = readdir(dir)) != NULL) { + size_t len = strlen(entry->d_name); + char* buf = calloc(1, len + 1); + memcpy(buf, entry->d_name, len); + taosArrayPush(result, &buf); + } + closedir(dir); + return 0; +} +static void tfileDestroyFileName(void* elem) { + char* p = *(char**)elem; + free(p); +} +static int tfileCompare(const void* a, const void* b) { + const char* aName = *(char**)a; + const char* bName = *(char**)b; + + size_t aLen = strlen(aName); + size_t bLen = strlen(bName); + + return strncmp(aName, bName, aLen > bLen ? aLen : bLen); +} +// tfile name suid-colId-version.tindex +static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) { + if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) { + // read suid & colid & version success + return 0; + } + return -1; +} +static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) { + SERIALIZE_MEM_TO_BUF(buf, key, suid); + SERIALIZE_VAR_TO_BUF(buf, '_', char); + SERIALIZE_MEM_TO_BUF(buf, key, colType); + SERIALIZE_VAR_TO_BUF(buf, '_', char); + SERIALIZE_MEM_TO_BUF(buf, key, version); + SERIALIZE_VAR_TO_BUF(buf, '_', char); + SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); +}