From 6ca5e3ae6ea4b641ef076ba26a02a0ebad734553 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 24 May 2022 13:50:33 +0800 Subject: [PATCH 1/7] enh: opt index mutex --- source/libs/index/inc/indexTfile.h | 7 +++--- source/libs/index/src/index.c | 33 ++++++++++++++-------------- source/libs/index/src/indexTfile.c | 18 ++++++++++----- source/libs/index/test/indexTests.cc | 2 +- 4 files changed, 33 insertions(+), 27 deletions(-) diff --git a/source/libs/index/inc/indexTfile.h b/source/libs/index/inc/indexTfile.h index 85ed397b0a..6fca3b1bf5 100644 --- a/source/libs/index/inc/indexTfile.h +++ b/source/libs/index/inc/indexTfile.h @@ -74,9 +74,10 @@ typedef struct TFileReader { } TFileReader; typedef struct IndexTFile { - char* path; - TFileCache* cache; - TFileWriter* tw; + char* path; + TFileCache* cache; + TFileWriter* tw; + TdThreadMutex mtx; } IndexTFile; typedef struct TFileWriterOpt { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 6add788a89..02f1682655 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -557,20 +557,18 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) { ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)}; int64_t ver = CACHE_VERSION(cache); - taosThreadMutexLock(&sIdx->mtx); - TFileReader* trd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key); - if (trd != NULL) { - if (ver < trd->header.version) { - ver = trd->header.version + 1; - } else { - ver += 1; - } - indexInfo("header: %d, ver: %" PRId64 "", trd->header.version, ver); - tfileReaderUnRef(trd); - } else { - indexInfo("not found reader base %p", trd); + + TFileReader* rd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key); + IndexTFile* tf = (IndexTFile*)(sIdx->tindex); + + taosThreadMutexLock(&tf->mtx); + tfileCacheGet(tf->cache, &key); + taosThreadMutexUnlock(&tf->mtx); + + if (rd != NULL) { + ver += MAX(ver, rd->header.version) + 1; + indexInfo("header: %d, ver: %" PRId64 "", rd->header.version, ver); } - taosThreadMutexUnlock(&sIdx->mtx); return ver; } static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { @@ -597,13 +595,14 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { } indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf); + IndexTFile* tf = (IndexTFile*)sIdx->tindex; + TFileHeader* header = &reader->header; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; - taosThreadMutexLock(&sIdx->mtx); - IndexTFile* ifile = (IndexTFile*)sIdx->tindex; - tfileCachePut(ifile->cache, &key, reader); - taosThreadMutexUnlock(&sIdx->mtx); + taosThreadMutexLock(&tf->mtx); + tfileCachePut(tf->cache, &key, reader); + taosThreadMutexUnlock(&tf->mtx); return ret; END: if (tw != NULL) { diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 3d85646bd2..43754193ae 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -151,13 +151,10 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { char buf[128] = {0}; int32_t sz = indexSerialCacheKey(key, buf); assert(sz < sizeof(buf)); - indexInfo("Try to get key: %s", buf); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); if (reader == NULL || *reader == NULL) { - indexInfo("failed to get key: %s", buf); return NULL; } - indexInfo("Get key: %s file: %s", buf, (*reader)->ctx->file.buf); tfileReaderRef(*reader); return *reader; @@ -657,7 +654,7 @@ IndexTFile* indexTFileCreate(const char* path) { tfileCacheDestroy(cache); return NULL; } - + taosThreadMutexInit(&tfile->mtx, NULL); tfile->cache = cache; return tfile; } @@ -665,6 +662,7 @@ void indexTFileDestroy(IndexTFile* tfile) { if (tfile == NULL) { return; } + taosThreadMutexDestroy(&tfile->mtx); tfileCacheDestroy(tfile->cache); taosMemoryFree(tfile); } @@ -680,7 +678,10 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result SIndexTerm* term = query->term; ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; + + taosThreadMutexLock(&pTfile->mtx); TFileReader* reader = tfileCacheGet(pTfile->cache, &key); + taosThreadMutexUnlock(&pTfile->mtx); if (reader == NULL) { return 0; } @@ -780,8 +781,13 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) { if (tf == NULL) { return NULL; } - ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; - return tfileCacheGet(tf->cache, &key); + TFileReader* rd = NULL; + ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; + + taosThreadMutexLock(&tf->mtx); + rd = tfileCacheGet(tf->cache, &key); + taosThreadMutexUnlock(&tf->mtx); + return rd; } static int tfileUidCompare(const void* a, const void* b) { diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index f848cee86b..5c6fe8bf91 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -794,10 +794,10 @@ class IndexObj { } int sz = taosArrayGetSize(result); indexMultiTermQueryDestroy(mq); - taosArrayDestroy(result); assert(sz == 1); uint64_t* ret = (uint64_t*)taosArrayGet(result, 0); assert(val = *ret); + taosArrayDestroy(result); return sz; } From 8ef6eb4c5ff6e2f6a87adccfe6239b65d86eec1f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 24 May 2022 15:17:14 +0800 Subject: [PATCH 2/7] enh: refactor index code --- include/util/tlog.h | 1 + source/common/src/tglobal.c | 10 ++++++--- source/libs/index/inc/indexCache.h | 6 ++--- source/libs/index/inc/indexInt.h | 18 +++++++-------- source/libs/index/inc/indexTfile.h | 8 +++---- source/libs/index/src/index.c | 2 +- source/libs/index/src/indexCache.c | 20 ++++++++++------- source/libs/index/src/indexTfile.c | 22 +++++++++---------- source/libs/index/test/indexTests.cc | 4 ++-- .../libs/index/test/index_executor_tests.cpp | 6 +---- source/libs/index/test/jsonUT.cc | 2 +- source/libs/transport/src/trans.c | 14 +++++++----- source/libs/transport/src/transSrv.c | 2 +- source/util/src/tlog.c | 9 ++++---- 14 files changed, 67 insertions(+), 57 deletions(-) diff --git a/include/util/tlog.h b/include/util/tlog.h index be31aa8115..d853d77b71 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -62,6 +62,7 @@ extern int32_t fsDebugFlag; extern int32_t metaDebugFlag; extern int32_t fnDebugFlag; extern int32_t smaDebugFlag; +extern int32_t idxDebugFlag; int32_t taosInitLog(const char *logName, int32_t maxFiles); void taosCloseLog(); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1b61a0bc60..08238ff44d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -79,9 +79,10 @@ uint16_t tsTelemPort = 80; // schemaless char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null"; -char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value. - //If set to empty system will generate table name using MD5 hash. -bool tsSmlDataFormat = true; // true means that the name and order of cols in each line are the same(only for influx protocol) +char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value. + // If set to empty system will generate table name using MD5 hash. +bool tsSmlDataFormat = + true; // true means that the name and order of cols in each line are the same(only for influx protocol) // query int32_t tsQueryPolicy = 1; @@ -302,6 +303,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "wDebugFlag", wDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1; @@ -479,6 +481,7 @@ static void taosSetClientLogCfg(SConfig *pCfg) { rpcDebugFlag = cfgGetItem(pCfg, "rpcDebugFlag")->i32; tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32; jniDebugFlag = cfgGetItem(pCfg, "jniDebugFlag")->i32; + idxDebugFlag = cfgGetItem(pCfg, "idxDebugFlag")->i32; } static void taosSetServerLogCfg(SConfig *pCfg) { @@ -493,6 +496,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) { fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32; fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32; smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32; + idxDebugFlag = cfgGetItem(pCfg, "idxDebugFlag")->i32; } static int32_t taosSetClientCfg(SConfig *pCfg) { diff --git a/source/libs/index/inc/indexCache.h b/source/libs/index/inc/indexCache.h index aff2e0e836..6cbe2532cc 100644 --- a/source/libs/index/inc/indexCache.h +++ b/source/libs/index/inc/indexCache.h @@ -38,7 +38,7 @@ typedef struct IndexCache { MemTable *mem, *imm; SIndex* index; char* colName; - int32_t version; + int64_t version; int64_t occupiedMem; int8_t type; uint64_t suid; @@ -47,12 +47,12 @@ typedef struct IndexCache { TdThreadCond finished; } IndexCache; -#define CACHE_VERSION(cache) atomic_load_32(&cache->version) +#define CACHE_VERSION(cache) atomic_load_64(&cache->version) typedef struct CacheTerm { // key char* colVal; - int32_t version; + int64_t version; // value uint64_t uid; int8_t colType; diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 0bdcb131b6..81d43daf13 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -34,6 +34,15 @@ extern "C" { #endif +// clang-format off +#define indexFatal(...) do { if (idxDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0) +#define indexError(...) do { if (idxDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0) +#define indexWarn(...) do { if (idxDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0) +#define indexInfo(...) do { if (idxDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0) +#define indexDebug(...) do { if (idxDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0) +#define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0) +// clang-format on + typedef enum { LT, LE, GT, GE } RangeType; typedef enum { kTypeValue, kTypeDeletion } STermValueType; @@ -134,15 +143,6 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); -// clang-format off -#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0) -#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0) -#define indexWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0) -#define indexInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0) -#define indexDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0) -#define indexTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0) -// clang-format on - #define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0) #define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F) diff --git a/source/libs/index/inc/indexTfile.h b/source/libs/index/inc/indexTfile.h index 6fca3b1bf5..af32caa821 100644 --- a/source/libs/index/inc/indexTfile.h +++ b/source/libs/index/inc/indexTfile.h @@ -28,12 +28,12 @@ extern "C" { // tfile header content // |<---suid--->|<---version--->|<-------colName------>|<---type-->|<--fstOffset->| -// |<-uint64_t->|<---int32_t--->|<--TSDB_COL_NAME_LEN-->|<-uint8_t->|<---int32_t-->| +// |<-uint64_t->|<---int64_t--->|<--TSDB_COL_NAME_LEN-->|<-uint8_t->|<---int32_t-->| #pragma pack(push, 1) typedef struct TFileHeader { uint64_t suid; - int32_t version; + int64_t version; char colName[TSDB_COL_NAME_LEN]; // uint8_t colType; int32_t fstOffset; @@ -102,14 +102,14 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* read TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName); -TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName); +TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName); TFileReader* tfileReaderCreate(WriterCtx* ctx); void tfileReaderDestroy(TFileReader* reader); int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr); void tfileReaderRef(TFileReader* reader); void tfileReaderUnRef(TFileReader* reader); -TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type); +TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type); void tfileWriterClose(TFileWriter* tw); TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header); void tfileWriterDestroy(TFileWriter* tw); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 02f1682655..5d44c063a5 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -567,7 +567,7 @@ static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) { if (rd != NULL) { ver += MAX(ver, rd->header.version) + 1; - indexInfo("header: %d, ver: %" PRId64 "", rd->header.version, ver); + indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver); } return ver; } diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index d704e3876e..6e52c4b1ba 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -80,7 +80,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); pCt->colVal = term->colVal; - pCt->version = atomic_load_32(&pCache->version); + pCt->version = atomic_load_64(&pCache->version); char* key = indexCacheTermGet(pCt); @@ -133,7 +133,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); pCt->colVal = term->colVal; - pCt->version = atomic_load_32(&pCache->version); + pCt->version = atomic_load_64(&pCache->version); char* key = indexCacheTermGet(pCt); @@ -185,7 +185,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); pCt->colVal = term->colVal; - pCt->version = atomic_load_32(&pCache->version); + pCt->version = atomic_load_64(&pCache->version); char* exBuf = NULL; if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { @@ -259,7 +259,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); pCt->colVal = term->colVal; - pCt->version = atomic_load_32(&pCache->version); + pCt->version = atomic_load_64(&pCache->version); int8_t dType = INDEX_TYPE_GET_TYPE(term->colType); int skip = 0; @@ -356,7 +356,7 @@ void indexCacheDebug(IndexCache* cache) { CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); if (ct != NULL) { // TODO, add more debug info - indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version); + indexInfo("{colVal: %s, version: %" PRId64 "} \t", ct->colVal, ct->version); } } tSkipListDestroyIter(iter); @@ -377,7 +377,7 @@ void indexCacheDebug(IndexCache* cache) { CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); if (ct != NULL) { // TODO, add more debug info - indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version); + indexInfo("{colVal: %s, version: %" PRId64 "} \t", ct->colVal, ct->version); } } tSkipListDestroyIter(iter); @@ -529,7 +529,7 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1)); memcpy(ct->colVal, term->colVal, term->nColVal); } - ct->version = atomic_add_fetch_32(&pCache->version, 1); + ct->version = atomic_add_fetch_64(&pCache->version, 1); // set value ct->uid = uid; ct->operaType = term->operType; @@ -663,7 +663,11 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) { // compare colVal int32_t cmp = strcmp(lt->colVal, rt->colVal); if (cmp == 0) { - return rt->version - lt->version; + if (rt->version == lt->version) { + cmp = 0; + } else { + cmp = rt->version < lt->version ? -1 : 1; + } } return cmp; } diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 43754193ae..73ef22faed 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -54,9 +54,9 @@ static SArray* tfileGetFileList(const char* path); static int tfileRmExpireFile(SArray* result); static void tfileDestroyFileName(void* elem); static int tfileCompare(const void* a, const void* b); -static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version); -static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version); -static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version); +static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version); +static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version); +static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version); /* * search from tfile */ @@ -509,7 +509,7 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul return ret; } -TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) { +TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) { char fullname[256] = {0}; tfileGenFileFullName(fullname, path, suid, colName, version); // indexInfo("open write file name %s", fullname); @@ -526,7 +526,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c return tfileWriterCreate(wcx, &tfh); } -TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) { +TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName) { char fullname[256] = {0}; tfileGenFileFullName(fullname, path, suid, colName, version); @@ -1019,7 +1019,7 @@ void tfileReaderUnRef(TFileReader* reader) { static SArray* tfileGetFileList(const char* path) { char buf[128] = {0}; uint64_t suid; - uint32_t version; + int64_t version; SArray* files = taosArrayInit(4, sizeof(void*)); TdDirPtr pDir = taosOpenDir(path); @@ -1059,19 +1059,19 @@ static int tfileCompare(const void* a, const void* b) { return strcmp(as, bs); } -static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version) { - if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%d.tindex", suid, col, version)) { +static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version) { + if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%" PRId64 ".tindex", suid, col, version)) { // read suid & colid & version success return 0; } return -1; } // tfile name suid-colId-version.tindex -static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version) { - sprintf(filename, "%" PRIu64 "-%s-%d.tindex", suid, col, version); +static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version) { + sprintf(filename, "%" PRIu64 "-%s-%" PRId64 ".tindex", suid, col, version); return; } -static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version) { +static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version) { char filename[128] = {0}; tfileGenFileName(filename, suid, col, version); sprintf(fullname, "%s/%s", path, filename); diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 5c6fe8bf91..a8e555c5b9 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -279,7 +279,7 @@ static void initLog() { const int32_t maxLogFileNum = 10; tsAsyncLog = 0; - sDebugFlag = 143; + idxDebugFlag = 143; strcpy(tsLogDir, logDir.c_str()); taosRemoveDir(tsLogDir); taosMkDir(tsLogDir); @@ -387,7 +387,7 @@ class TFileObj { std::string path(path_); int colId = 2; char buf[64] = {0}; - sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId_, header.version); + sprintf(buf, "%" PRIu64 "-%d-%" PRId64 ".tindex", header.suid, colId_, header.version); path.append("/").append(buf); fileName_ = path; diff --git a/source/libs/index/test/index_executor_tests.cpp b/source/libs/index/test/index_executor_tests.cpp index b0c2a983d1..b88ffe5b8b 100644 --- a/source/libs/index/test/index_executor_tests.cpp +++ b/source/libs/index/test/index_executor_tests.cpp @@ -24,11 +24,7 @@ #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" -#include "executor.h" -#include "executorimpl.h" -#include "indexoperator.h" -#include "os.h" - +#include "index.h" #include "stub.h" #include "taos.h" #include "tcompare.h" diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index 8a837c5700..cd5a5d9b0f 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -24,7 +24,7 @@ static void initLog() { const int32_t maxLogFileNum = 10; tsAsyncLog = 0; - sDebugFlag = 143; + idxDebugFlag = 143; strcpy(tsLogDir, logDir.c_str()); taosRemoveDir(tsLogDir); taosMkDir(tsLogDir); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 9e71c87fa5..99ceffc904 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -27,6 +27,13 @@ void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHan void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; +static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { + *ip = taosGetIpv4FromFqdn(localFqdn); + if (*ip == 0xFFFFFFF) { + terrno = TSDB_CODE_RPC_FQDN_ERROR; + } + return terrno; +} void* rpcOpen(const SRpcInit* pInit) { SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { @@ -35,7 +42,6 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->label) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1); } - // register callback handle pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; @@ -48,10 +54,8 @@ void* rpcOpen(const SRpcInit* pInit) { uint32_t ip = 0; if (pInit->connType == TAOS_CONN_SERVER) { - ip = taosGetIpv4FromFqdn(pInit->localFqdn); - if (ip == 0xFFFFFFFF) { - tError("invalid fqdn: %s", pInit->localFqdn); - terrno = TSDB_CODE_RPC_FQDN_ERROR; + if (transValidLocalFqdn(pInit->localFqdn, &ip) != 0) { + tError("invalid fqdn: %s, errmsg: %s", pInit->localFqdn, terrstr()); taosMemoryFree(pRpc); return NULL; } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 36f5cf9815..09d47960d4 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -923,7 +923,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } if (false == taosValidIpAndPort(srv->ip, srv->port)) { terrno = TAOS_SYSTEM_ERROR(errno); - tError("invalid ip/port, reason: %s", terrstr()); + tError("invalid ip/port, %d:%d, reason: %s", srv->ip, srv->port, terrstr()); goto End; } if (false == addHandleToAcceptloop(srv)) { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index c1fc2c48c0..5da5d496d1 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -39,7 +39,7 @@ #define LOG_BUF_MUTEX(x) ((x)->buffMutex) typedef struct { - char *buffer; + char * buffer; int32_t buffStart; int32_t buffEnd; int32_t buffSize; @@ -58,7 +58,7 @@ typedef struct { int32_t openInProgress; pid_t pid; char logName[LOG_FILE_NAME_LEN]; - SLogBuff *logHandle; + SLogBuff * logHandle; TdThreadMutex logMutex; } SLogObj; @@ -96,6 +96,7 @@ int32_t fsDebugFlag = 135; int32_t metaDebugFlag = 135; int32_t fnDebugFlag = 135; int32_t smaDebugFlag = 135; +int32_t idxDebugFlag = 135; int64_t dbgEmptyW = 0; int64_t dbgWN = 0; @@ -103,7 +104,7 @@ int64_t dbgSmallWN = 0; int64_t dbgBigWN = 0; int64_t dbgWSize = 0; -static void *taosAsyncOutputLog(void *param); +static void * taosAsyncOutputLog(void *param); static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen); static SLogBuff *taosLogBuffNew(int32_t bufSize); static void taosCloseLogByFd(TdFilePtr pFile); @@ -701,7 +702,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { int32_t compressSize = 163840; int32_t ret = 0; int32_t len = 0; - char *data = taosMemoryMalloc(compressSize); + char * data = taosMemoryMalloc(compressSize); // gzFile dstFp = NULL; // srcFp = fopen(srcFileName, "r"); From 28318721fccfb1b5ceb183fd622715b717f6b87f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 24 May 2022 16:19:22 +0800 Subject: [PATCH 3/7] enh: opt index mutex --- source/common/src/tglobal.c | 3 ++- source/libs/transport/src/trans.c | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 08238ff44d..d0a2ddd9bb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -293,6 +293,7 @@ int32_t taosAddClientLogCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "idxDebugFlag", 0, 0, 255, 1) != 0) return -1; return 0; } @@ -303,12 +304,12 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "wDebugFlag", wDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "fnDebugFlag", fnDebugFlag, 0, 255, 0) != 0) return -1; if (cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 0) != 0) return -1; return 0; } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 99ceffc904..9fc3d4a56d 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -29,7 +29,7 @@ void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleas static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { *ip = taosGetIpv4FromFqdn(localFqdn); - if (*ip == 0xFFFFFFF) { + if (*ip == 0xFFFFFFFF) { terrno = TSDB_CODE_RPC_FQDN_ERROR; } return terrno; From c8ba20ac54a60730b69f020dde4b5f65eb4161fd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 May 2022 19:14:00 +0800 Subject: [PATCH 4/7] enh: valid fqdn --- source/libs/transport/src/trans.c | 3 ++- source/util/src/tlog.c | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 9fc3d4a56d..018d5f98c0 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -31,8 +31,9 @@ static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { *ip = taosGetIpv4FromFqdn(localFqdn); if (*ip == 0xFFFFFFFF) { terrno = TSDB_CODE_RPC_FQDN_ERROR; + return -1; } - return terrno; + return 0; } void* rpcOpen(const SRpcInit* pInit) { SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 5da5d496d1..7339498cd0 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -760,6 +760,7 @@ void taosSetAllDebugFlag(int32_t flag) { fsDebugFlag = flag; fnDebugFlag = flag; smaDebugFlag = flag; + idxDebugFlag = flag; uInfo("all debug flag are set to %d", flag); } From d067104ec1aade4fd2540f778b87f4fb26ef8aab Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 May 2022 20:38:25 +0800 Subject: [PATCH 5/7] fix: index memory error --- source/libs/index/src/index.c | 6 +++--- source/libs/index/src/indexTfile.c | 14 ++++++++++---- source/libs/index/test/indexTests.cc | 4 ++-- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 5d44c063a5..1faf1decb4 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -558,17 +558,17 @@ static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) { ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)}; int64_t ver = CACHE_VERSION(cache); - TFileReader* rd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key); - IndexTFile* tf = (IndexTFile*)(sIdx->tindex); + IndexTFile* tf = (IndexTFile*)(sIdx->tindex); taosThreadMutexLock(&tf->mtx); - tfileCacheGet(tf->cache, &key); + TFileReader* rd = tfileCacheGet(tf->cache, &key); taosThreadMutexUnlock(&tf->mtx); if (rd != NULL) { ver += MAX(ver, rd->header.version) + 1; indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver); } + tfileReaderUnRef(rd); return ver; } static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 73ef22faed..3de556e8b5 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -165,11 +165,11 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { // remove last version index reader TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); if (p != NULL && *p != NULL) { - TFileReader* oldReader = *p; + TFileReader* oldRdr = *p; taosHashRemove(tcache->tableCache, buf, sz); - indexInfo("found %s, remove file %s", buf, oldReader->ctx->file.buf); - oldReader->remove = true; - tfileReaderUnRef(oldReader); + indexInfo("found %s, should remove file %s", buf, oldRdr->ctx->file.buf); + oldRdr->remove = true; + tfileReaderUnRef(oldRdr); } taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); tfileReaderRef(reader); @@ -212,6 +212,12 @@ void tfileReaderDestroy(TFileReader* reader) { // T_REF_INC(reader); fstDestroy(reader->fst); writerCtxDestroy(reader->ctx, reader->remove); + if (reader->remove) { + indexInfo("%s is removed", reader->ctx->file.buf); + } else { + indexInfo("%s is not removed", reader->ctx->file.buf); + } + taosMemoryFree(reader); } static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index a8e555c5b9..2d06002af8 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -953,8 +953,8 @@ TEST_F(IndexEnv2, testIndex_TrigeFlush) { } static void single_write_and_search(IndexObj* idx) { - int target = idx->SearchOne("tag1", "Hello"); - target = idx->SearchOne("tag2", "Test"); + // int target = idx->SearchOne("tag1", "Hello"); + // target = idx->SearchOne("tag2", "Test"); } static void multi_write_and_search(IndexObj* idx) { idx->PutOne("tag1", "Hello"); From 45c8c4884828947fac9b8e9dd95ab00d03dc3162 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 May 2022 21:11:20 +0800 Subject: [PATCH 6/7] fix: index memory error --- source/libs/index/src/index.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 1faf1decb4..d60bde718d 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -565,7 +565,7 @@ static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) { taosThreadMutexUnlock(&tf->mtx); if (rd != NULL) { - ver += MAX(ver, rd->header.version) + 1; + ver = (ver > rd->header.version ? ver : rd->header.verion) + 1; indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver); } tfileReaderUnRef(rd); From c6a2b246b0a1e27f9b2ca6cd1abb28a03c783332 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 25 May 2022 21:23:38 +0800 Subject: [PATCH 7/7] fix: index memory error --- source/libs/index/src/index.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index d60bde718d..500f570649 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -565,7 +565,7 @@ static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) { taosThreadMutexUnlock(&tf->mtx); if (rd != NULL) { - ver = (ver > rd->header.version ? ver : rd->header.verion) + 1; + ver = (ver > rd->header.version ? ver : rd->header.version) + 1; indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver); } tfileReaderUnRef(rd);