From f0c1950b63e5ce7b817183baa9b9bb8aeaa79ba9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 7 Sep 2022 19:39:44 +0800 Subject: [PATCH 01/23] refactor code --- source/dnode/vnode/src/meta/metaTable.c | 14 ++-- source/libs/index/src/index.c | 14 ++-- source/libs/index/src/indexFst.c | 7 +- source/libs/index/src/indexFstFile.c | 10 +-- source/libs/index/src/indexFstRegister.c | 2 +- source/libs/index/src/indexFstSparse.c | 10 +-- source/libs/index/src/indexTfile.c | 3 +- source/libs/index/src/indexUtil.c | 2 +- source/libs/index/test/indexBench.cc | 85 ++++++++++++++++++++++++ 9 files changed, 118 insertions(+), 29 deletions(-) create mode 100644 source/libs/index/test/indexBench.cc diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 2c11b9bf0f..92cf90d328 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -416,20 +416,22 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe me.ctbEntry.pTags = pReq->ctb.pTag; #ifdef TAG_FILTER_DEBUG - SArray* pTagVals = NULL; - int32_t code = tTagToValArray((STag*)pReq->ctb.pTag, &pTagVals); + SArray *pTagVals = NULL; + int32_t code = tTagToValArray((STag *)pReq->ctb.pTag, &pTagVals); for (int i = 0; i < taosArrayGetSize(pTagVals); i++) { - STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); + STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i); if (IS_VAR_DATA_TYPE(pTagVal->type)) { - char* buf = taosMemoryCalloc(pTagVal->nData + 1, 1); + char *buf = taosMemoryCalloc(pTagVal->nData + 1, 1); memcpy(buf, pTagVal->pData, pTagVal->nData); - metaDebug("metaTag table:%s varchar index:%d cid:%d type:%d value:%s", pReq->name, i, pTagVal->cid, pTagVal->type, buf); + metaDebug("metaTag table:%s varchar index:%d cid:%d type:%d value:%s", pReq->name, i, pTagVal->cid, + pTagVal->type, buf); taosMemoryFree(buf); } else { double val = 0; GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64); - metaDebug("metaTag table:%s number index:%d cid:%d type:%d value:%f", pReq->name, i, pTagVal->cid, pTagVal->type, val); + metaDebug("metaTag table:%s number index:%d cid:%d type:%d value:%f", pReq->name, i, pTagVal->cid, + pTagVal->type, val); } } #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index d9a6b80f3d..1f6d793417 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -303,7 +303,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy buf = strndup(INDEX_DATA_NULL_STR, (int32_t)strlen(INDEX_DATA_NULL_STR)); len = (int32_t)strlen(INDEX_DATA_NULL_STR); } else { - const char* emptyStr = " "; + static const char* emptyStr = " "; buf = strndup(emptyStr, (int32_t)strlen(emptyStr)); len = (int32_t)strlen(emptyStr); } @@ -585,6 +585,12 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { idxTRsltDestroy(tr); int ret = idxGenTFile(sIdx, pCache, result); + if (ret != 0) { + indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000); + } else { + int64_t cost = taosGetTimestampUs() - st; + indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); + } idxDestroyFinalRslt(result); idxCacheDestroyImm(pCache); @@ -595,12 +601,6 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { tfileReaderUnRef(pReader); idxCacheUnRef(pCache); - int64_t cost = taosGetTimestampUs() - st; - if (ret != 0) { - indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000); - } else { - indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); - } atomic_store_32(&pCache->merging, 0); if (quit) { idxPost(sIdx); diff --git a/source/libs/index/src/indexFst.c b/source/libs/index/src/indexFst.c index 15152cef55..2aa8345e03 100644 --- a/source/libs/index/src/indexFst.c +++ b/source/libs/index/src/indexFst.c @@ -19,11 +19,12 @@ #include "tchecksum.h" #include "tcoding.h" -static void fstPackDeltaIn(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) { +static FORCE_INLINE void fstPackDeltaIn(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, + uint8_t nBytes) { CompiledAddr deltaAddr = (transAddr == EMPTY_ADDRESS) ? EMPTY_ADDRESS : nodeAddr - transAddr; idxFilePackUintIn(wrt, deltaAddr, nBytes); } -static uint8_t fstPackDetla(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) { +static FORCE_INLINE uint8_t fstPackDetla(IdxFstFile* wrt, CompiledAddr nodeAddr, CompiledAddr transAddr) { uint8_t nBytes = packDeltaSize(nodeAddr, transAddr); fstPackDeltaIn(wrt, nodeAddr, transAddr, nBytes); return nBytes; @@ -39,7 +40,7 @@ FstUnFinishedNodes* fstUnFinishedNodesCreate() { fstUnFinishedNodesPushEmpty(nodes, false); return nodes; } -static void unFinishedNodeDestroyElem(void* elem) { +static void FORCE_INLINE unFinishedNodeDestroyElem(void* elem) { FstBuilderNodeUnfinished* b = (FstBuilderNodeUnfinished*)elem; fstBuilderNodeDestroy(b->node); taosMemoryFree(b->last); diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index e6d1edfeda..2a33ddd477 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -30,14 +30,14 @@ typedef struct { static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); } -static void idxGenLRUKey(char* buf, const char* path, int32_t blockId) { +static FORCE_INLINE void idxGenLRUKey(char* buf, const char* path, int32_t blockId) { char* p = buf; SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path)); SERIALIZE_VAR_TO_BUF(p, '_', char); idxInt2str(blockId, p, 0); return; } -static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) { +static FORCE_INLINE int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) { if (ctx->type == TFILE) { int nwr = taosWriteFile(ctx->file.pFile, buf, len); assert(nwr == len); @@ -47,7 +47,7 @@ static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) { ctx->offset += len; return len; } -static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) { +static FORCE_INLINE int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) { int nRead = 0; if (ctx->type == TFILE) { #ifdef USE_MMAP @@ -111,7 +111,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of } while (len > 0); return total; } -static int idxFileCtxGetSize(IFileCtx* ctx) { +static FORCE_INLINE int idxFileCtxGetSize(IFileCtx* ctx) { if (ctx->type == TFILE) { int64_t file_size = 0; taosStatFile(ctx->file.buf, &file_size, NULL); @@ -119,7 +119,7 @@ static int idxFileCtxGetSize(IFileCtx* ctx) { } return 0; } -static int idxFileCtxDoFlush(IFileCtx* ctx) { +static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) { if (ctx->type == TFILE) { taosFsyncFile(ctx->file.pFile); } else { diff --git a/source/libs/index/src/indexFstRegister.c b/source/libs/index/src/indexFstRegister.c index 34efee0d0d..e0abcadc78 100644 --- a/source/libs/index/src/indexFstRegister.c +++ b/source/libs/index/src/indexFstRegister.c @@ -16,7 +16,7 @@ #include "indexFstRegistry.h" #include "os.h" -uint64_t fstRegistryHash(FstRegistry* registry, FstBuilderNode* bNode) { +static FORCE_INLINE uint64_t fstRegistryHash(FstRegistry* registry, FstBuilderNode* bNode) { // TODO(yihaoDeng): refactor later const uint64_t FNV_PRIME = 1099511628211; uint64_t h = 14695981039346656037u; diff --git a/source/libs/index/src/indexFstSparse.c b/source/libs/index/src/indexFstSparse.c index ebc0cb3637..8746b04eab 100644 --- a/source/libs/index/src/indexFstSparse.c +++ b/source/libs/index/src/indexFstSparse.c @@ -15,7 +15,7 @@ #include "indexFstSparse.h" -static void sparSetUtil(int32_t *buf, int32_t cap) { +static FORCE_INLINE void sparSetInitBuf(int32_t *buf, int32_t cap) { for (int32_t i = 0; i < cap; i++) { buf[i] = -1; } @@ -28,8 +28,8 @@ FstSparseSet *sparSetCreate(int32_t sz) { ss->dense = (int32_t *)taosMemoryMalloc(sz * sizeof(int32_t)); ss->sparse = (int32_t *)taosMemoryMalloc(sz * sizeof(int32_t)); - sparSetUtil(ss->dense, sz); - sparSetUtil(ss->sparse, sz); + sparSetInitBuf(ss->dense, sz); + sparSetInitBuf(ss->sparse, sz); ss->cap = sz; @@ -90,7 +90,7 @@ void sparSetClear(FstSparseSet *ss) { if (ss == NULL) { return; } - sparSetUtil(ss->dense, ss->cap); - sparSetUtil(ss->sparse, ss->cap); + sparSetInitBuf(ss->dense, ss->cap); + sparSetInitBuf(ss->sparse, ss->cap); ss->size = 0; } diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 48a0c631cf..c3be0ea6f5 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -1034,7 +1034,8 @@ static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int sprintf(filename, "%" PRIu64 "-%s-%" PRId64 ".tindex", suid, col, version); return; } -static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version) { +static void FORCE_INLINE tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, + int64_t version) { char filename[128] = {0}; tfileGenFileName(filename, suid, col, version); sprintf(fullname, "%s/%s", path, filename); diff --git a/source/libs/index/src/indexUtil.c b/source/libs/index/src/indexUtil.c index 3d083c1817..cdfb79016f 100644 --- a/source/libs/index/src/indexUtil.c +++ b/source/libs/index/src/indexUtil.c @@ -21,7 +21,7 @@ typedef struct MergeIndex { int len; } MergeIndex; -static int iBinarySearch(SArray *arr, int s, int e, uint64_t k) { +static FORCE_INLINE int iBinarySearch(SArray *arr, int s, int e, uint64_t k) { uint64_t v; int32_t m; while (s <= e) { diff --git a/source/libs/index/test/indexBench.cc b/source/libs/index/test/indexBench.cc new file mode 100644 index 0000000000..cce582e2f0 --- /dev/null +++ b/source/libs/index/test/indexBench.cc @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include +#include +#include +#include +#include "index.h" +using namespace std; + +static void initLog() { + const char *defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; + + tsAsyncLog = 0; + idxDebugFlag = 143; + strcpy(tsLogDir, logDir.c_str()); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); + + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} + +struct WriteBatch { + SIndexMultiTerm *terms; +}; +class Idx { + public: + Idx(int _cacheSize = 1024 * 1024 * 4, const char *_path = "tindex") { + opts.cacheSize = _cacheSize; + path = TD_TMP_DIR_PATH _path; + } + int SetUp(bool remove) { + initLog(); + + if (remove) taosRemoveDir(path); + + int ret = indexJsonOpen(&opts, path, &index); + return ret; + } + int Write(WriteBatch *batch) { + // write batch + return 0; + } + int Read(const char *json, void *key, int64_t *id) { + // read batch + return 0; + } + + void TearDown() { indexJsonClose(index); } + + std::string path; + + SIndexOpts opts; + SIndex *index; +}; + +int BenchWrite(Idx *idx, int nCol, int Limit) { return 0; } + +int BenchRead(Idx *idx) { return 0; } + +int main() { + Idx *idx = new Idx; + if (idx->SetUp(true) != 0) { + std::cout << "failed to setup index" << std::endl; + return 0; + } else { + std::cout << "succ to setup index" << std::endl; + } + BenchWrite(idx, 10000, 100); +} From 3acd4d3afc8a3398bf69932eb0f076a28eaa7b8c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 7 Sep 2022 19:41:51 +0800 Subject: [PATCH 02/23] refactor code --- source/libs/index/src/index.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 1f6d793417..f507e1b3be 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -586,7 +586,7 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { int ret = idxGenTFile(sIdx, pCache, result); if (ret != 0) { - indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000); + indexError("failed to merge"); } else { int64_t cost = taosGetTimestampUs() - st; indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); From ee7a18db29610256ec5abe8361c59c7f34c37fd8 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 8 Sep 2022 20:25:56 +0800 Subject: [PATCH 03/23] fix: add bench --- source/libs/index/test/CMakeLists.txt | 32 +++++++++++-- source/libs/index/test/indexBench.cc | 67 ++++++++++++++++++++++++--- 2 files changed, 88 insertions(+), 11 deletions(-) diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index b3eca28003..f3ddb6f629 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -5,6 +5,7 @@ IF(NOT TD_DARWIN) add_executable(idxUtilUT "") add_executable(idxJsonUT "") add_executable(idxFstUtilUT "") + add_executable(idxBench "") target_sources(idxTest PRIVATE @@ -32,6 +33,10 @@ IF(NOT TD_DARWIN) PRIVATE "fstUtilUT.cc" ) + target_sources(idxBench + PRIVATE + "indexBench.cc" + ) target_include_directories (idxTest PUBLIC @@ -80,6 +85,16 @@ IF(NOT TD_DARWIN) "${TD_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) + target_include_directories (idxJsonUT + PUBLIC + "${TD_SOURCE_DIR}/include/libs/index" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" + ) + target_include_directories (idxBench + PUBLIC + "${TD_SOURCE_DIR}/include/libs/index" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" + ) target_link_libraries (idxTest os @@ -102,11 +117,7 @@ IF(NOT TD_DARWIN) gtest_main index ) - target_include_directories (idxJsonUT - PUBLIC - "${TD_SOURCE_DIR}/include/libs/index" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" - ) + target_link_libraries (idxTest os util @@ -151,6 +162,13 @@ IF(NOT TD_DARWIN) gtest_main index ) + target_link_libraries (idxBench + os + util + common + gtest_main + index + ) add_test( NAME idxJsonUT @@ -174,4 +192,8 @@ IF(NOT TD_DARWIN) NAME idxFstUT COMMAND idxFstUT ) + add_test( + NAME idxBench + COMMAND idxBench + ) ENDIF () diff --git a/source/libs/index/test/indexBench.cc b/source/libs/index/test/indexBench.cc index cce582e2f0..d95aec1e5a 100644 --- a/source/libs/index/test/indexBench.cc +++ b/source/libs/index/test/indexBench.cc @@ -18,8 +18,18 @@ #include #include #include "index.h" +#include "indexCache.h" +#include "indexFst.h" +#include "indexFstUtil.h" +#include "indexInt.h" +#include "indexTfile.h" +#include "indexUtil.h" +#include "tskiplist.h" +#include "tutil.h" using namespace std; +static std::string logDir = TD_TMP_DIR_PATH "log"; + static void initLog() { const char *defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10; @@ -42,18 +52,20 @@ class Idx { public: Idx(int _cacheSize = 1024 * 1024 * 4, const char *_path = "tindex") { opts.cacheSize = _cacheSize; - path = TD_TMP_DIR_PATH _path; + path += TD_TMP_DIR_PATH; + path += _path; } int SetUp(bool remove) { initLog(); - if (remove) taosRemoveDir(path); + if (remove) taosRemoveDir(path.c_str()); - int ret = indexJsonOpen(&opts, path, &index); + int ret = indexJsonOpen(&opts, path.c_str(), &index); return ret; } - int Write(WriteBatch *batch) { + int Write(WriteBatch *batch, uint64_t uid) { // write batch + indexJsonPut(index, batch->terms, uid); return 0; } int Read(const char *json, void *key, int64_t *id) { @@ -69,7 +81,49 @@ class Idx { SIndex *index; }; -int BenchWrite(Idx *idx, int nCol, int Limit) { return 0; } +SIndexTerm *indexTermCreateT(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char *colName, + int32_t nColName, const char *colVal, int32_t nColVal) { + char buf[256] = {0}; + int16_t sz = nColVal; + memcpy(buf, (uint16_t *)&sz, 2); + memcpy(buf + 2, colVal, nColVal); + if (colType == TSDB_DATA_TYPE_BINARY) { + return indexTermCreate(suid, oper, colType, colName, nColName, buf, sizeof(buf)); + } else { + return indexTermCreate(suid, oper, colType, colName, nColName, colVal, nColVal); + } + return NULL; +} +int initWriteBatch(WriteBatch *wb, int batchSize) { + SIndexMultiTerm *terms = indexMultiTermCreate(); + + std::string colName; + std::string colVal; + + for (int i = 0; i < 64; i++) { + colName += '0' + i; + colVal += '0' + i; + } + + for (int i = 0; i < batchSize; i++) { + colVal[i % colVal.size()] = '0' + i % 128; + colName[i % colName.size()] = '0' + i % 128; + SIndexTerm *term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + indexMultiTermAdd(terms, term); + } + + wb->terms = terms; + return 0; +} + +int BenchWrite(Idx *idx, int batchSize, int limit) { + for (int i = 0; i < limit; i += batchSize) { + WriteBatch wb; + idx->Write(&wb, i); + } + return 0; +} int BenchRead(Idx *idx) { return 0; } @@ -81,5 +135,6 @@ int main() { } else { std::cout << "succ to setup index" << std::endl; } - BenchWrite(idx, 10000, 100); + BenchWrite(idx, 100, 10000); + return 1; } From 349dd69f949172eead5f6f5c0a081da9bcb8a615 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 8 Sep 2022 22:50:50 +0800 Subject: [PATCH 04/23] fix compile error --- source/libs/index/test/indexTests.cc | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 5b76de2ef8..08bf84ff60 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -271,20 +271,20 @@ void validateFst() { } delete m; } + static std::string logDir = TD_TMP_DIR_PATH "log"; +static void initLog() { + const char* defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; -static void initLog() { - const char* defaultLogFileNamePrefix = "taoslog"; - const int32_t maxLogFileNum = 10; + tsAsyncLog = 0; + idxDebugFlag = 143; + strcpy(tsLogDir, logDir.c_str()); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); - tsAsyncLog = 0; - idxDebugFlag = 143; - strcpy(tsLogDir, logDir.c_str()); - taosRemoveDir(tsLogDir); - taosMkDir(tsLogDir); - - if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { - printf("failed to open log file in directory:%s\n", tsLogDir); + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); } } class IndexEnv : public ::testing::Test { From 57e34dc308da411cbc795494c1026f55c3fe7ee7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Sep 2022 11:28:00 +0800 Subject: [PATCH 05/23] fix(rpc): handle write failure --- source/libs/transport/src/transCli.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dc2d937c49..0361be2b13 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -779,7 +779,13 @@ void cliSend(SCliConn* pConn) { uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); - uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); + + int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); + if (status != 0) { + tGError("%s conn %p failed to sent msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), + uv_err_name(status)); + cliHandleExcept(pConn); + } return; _RETURN: return; From f6eda114bd9eb2b27161922175f48aee85673d52 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Sep 2022 11:32:13 +0800 Subject: [PATCH 06/23] fix(rpc): handle write failure --- source/libs/index/test/indexBench.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/index/test/indexBench.cc b/source/libs/index/test/indexBench.cc index d95aec1e5a..f9dcc3e08c 100644 --- a/source/libs/index/test/indexBench.cc +++ b/source/libs/index/test/indexBench.cc @@ -135,6 +135,6 @@ int main() { } else { std::cout << "succ to setup index" << std::endl; } - BenchWrite(idx, 100, 10000); + // BenchWrite(idx, 100, 10000); return 1; } From ecfaf229e3e6672e4796f4a19010fa6f61d23326 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Sep 2022 13:55:16 +0800 Subject: [PATCH 07/23] fix: add bench --- source/libs/index/test/indexBench.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/source/libs/index/test/indexBench.cc b/source/libs/index/test/indexBench.cc index f9dcc3e08c..b828be0ffe 100644 --- a/source/libs/index/test/indexBench.cc +++ b/source/libs/index/test/indexBench.cc @@ -128,13 +128,13 @@ int BenchWrite(Idx *idx, int batchSize, int limit) { int BenchRead(Idx *idx) { return 0; } int main() { - Idx *idx = new Idx; - if (idx->SetUp(true) != 0) { - std::cout << "failed to setup index" << std::endl; - return 0; - } else { - std::cout << "succ to setup index" << std::endl; - } - // BenchWrite(idx, 100, 10000); + // Idx *idx = new Idx; + // if (idx->SetUp(true) != 0) { + // std::cout << "failed to setup index" << std::endl; + // return 0; + // } else { + // std::cout << "succ to setup index" << std::endl; + // } + // BenchWrite(idx, 100, 10000); return 1; } From edf3f6320d7b26b970cbbb25215bf6e486ec814d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Sep 2022 14:28:57 +0800 Subject: [PATCH 08/23] fix: fix compile error --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0361be2b13..ba7be0aaa8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1440,7 +1440,7 @@ int transReleaseCliHandle(void* handle) { return -1; } - STransMsg tmsg = {.info.handle = handle}; + STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527}; TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); From 11a8237975a0ed5ad115774f7a5a4a4719335cc1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Sep 2022 14:47:49 +0800 Subject: [PATCH 09/23] fix: fix compile error --- source/libs/transport/src/transCli.c | 34 +++++++++++++++------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index ba7be0aaa8..d83d7e4162 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -196,22 +196,24 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) -#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ - do { \ - int i = 0, sz = transQueueSize(&conn->cliMsgs); \ - for (; i < sz; i++) { \ - pMsg = transQueueGet(&conn->cliMsgs, i); \ - if (pMsg != NULL && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ - break; \ - } \ - } \ - if (i == sz) { \ - pMsg = NULL; \ - tDebug("msg not found, %" PRIu64 "", ahandle); \ - } else { \ - pMsg = transQueueRm(&conn->cliMsgs, i); \ - tDebug("msg found, %" PRIu64 "", ahandle); \ - } \ +#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ + do { \ + int i = 0, sz = transQueueSize(&conn->cliMsgs); \ + for (; i < sz; i++) { \ + pMsg = transQueueGet(&conn->cliMsgs, i); \ + if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ + break; \ + } else if (pMsg->msg.info.ahandle == (void*)0x9527) { \ + break; \ + } \ + } \ + if (i == sz) { \ + pMsg = NULL; \ + tDebug("msg not found, %" PRIu64 "", ahandle); \ + } else { \ + pMsg = transQueueRm(&conn->cliMsgs, i); \ + tDebug("msg found, %" PRIu64 "", ahandle); \ + } \ } while (0) #define CONN_GET_NEXT_SENDMSG(conn) \ do { \ From bfcb1816620a4cd9f47266187cef2fb995bcd393 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Sep 2022 18:30:02 +0800 Subject: [PATCH 10/23] fix: make CI happy --- source/libs/transport/src/transCli.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d83d7e4162..66143ed73a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1445,9 +1445,13 @@ int transReleaseCliHandle(void* handle) { STransMsg tmsg = {.info.handle = handle, .info.ahandle = (void*)0x9527}; TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); + STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + pCtx->ahandle = tmsg.info.ahandle; + SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cmsg->msg = tmsg; cmsg->type = Release; + cmsg->ctx = pCtx; STraceId* trace = &tmsg.info.traceId; tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid); From fadd7317af9e6c7633b79a637062bf9fc8254624 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Sep 2022 18:40:32 +0800 Subject: [PATCH 11/23] fix: make CI happy --- source/libs/transport/src/transCli.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 66143ed73a..fb5a9d642c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -203,8 +203,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { pMsg = transQueueGet(&conn->cliMsgs, i); \ if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ break; \ - } else if (pMsg->msg.info.ahandle == (void*)0x9527) { \ - break; \ } \ } \ if (i == sz) { \ From 58c410118f1a7c464b1e49810d8b2c3971bd1c89 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Sep 2022 19:09:24 +0800 Subject: [PATCH 12/23] fix: make CI happy --- source/libs/transport/src/transCli.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fb5a9d642c..06b126a604 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -425,7 +425,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { transMsg.info.ahandle); } } else { - transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL; + transMsg.info.ahandle = (pMsg->type != Release && pCtx) ? pCtx->ahandle : NULL; } if (pCtx == NULL || pCtx->pSem == NULL) { @@ -934,7 +934,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { // persist conn already release by server STransMsg resp; cliBuildExceptResp(pMsg, &resp); - pTransInst->cfp(pTransInst->parent, &resp, NULL); + if (pMsg->type != Release) { + pTransInst->cfp(pTransInst->parent, &resp, NULL); + } destroyCmsg(pMsg); return; } From 93dd2101c144da93a5562a4261d32f1525fd382a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Sep 2022 10:27:41 +0800 Subject: [PATCH 13/23] fix: make CI happy --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 06b126a604..edc9dcdbaa 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -162,7 +162,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); static void cliReleaseUnfinishedMsg(SCliConn* conn) { for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); - if (msg != NULL && msg->ctx != NULL) { + if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { conn->ctx.freeFunc(msg->ctx->ahandle); } From eac5d2c4978524bb61a7dab2330b328307b2d277 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Sep 2022 17:12:22 +0800 Subject: [PATCH 14/23] fix: make CI happy --- source/libs/transport/src/transCli.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index edc9dcdbaa..289e093623 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -376,8 +376,10 @@ void cliHandleResp(SCliConn* conn) { return; } - if (cliAppCb(conn, &transMsg, pMsg) != 0) { - return; + if (pMsg->type != Release) { + if (cliAppCb(conn, &transMsg, pMsg) != 0) { + return; + } } destroyCmsg(pMsg); @@ -435,8 +437,10 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { continue; } } - if (cliAppCb(pConn, &transMsg, pMsg) != 0) { - return; + if (pMsg->type != Release) { + if (cliAppCb(pConn, &transMsg, pMsg) != 0) { + return; + } } destroyCmsg(pMsg); tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn)); @@ -1457,7 +1461,7 @@ int transReleaseCliHandle(void* handle) { tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid); if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { - taosMemoryFree(cmsg); + destroyCmsg(cmsg); return -1; } return 0; From d1d35c48e4f7d4efb2e01034477b6ffb4ebc9cca Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Sep 2022 17:24:23 +0800 Subject: [PATCH 15/23] fix: make CI happy --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 289e093623..e0b47a98bc 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -376,7 +376,7 @@ void cliHandleResp(SCliConn* conn) { return; } - if (pMsg->type != Release) { + if (pMsg && pMsg->type != Release) { if (cliAppCb(conn, &transMsg, pMsg) != 0) { return; } @@ -437,7 +437,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { continue; } } - if (pMsg->type != Release) { + if (pMsg && pMsg->type != Release) { if (cliAppCb(pConn, &transMsg, pMsg) != 0) { return; } From 4745b4ff814c9e4493357431b0c1520164d666cf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Sep 2022 18:18:12 +0800 Subject: [PATCH 16/23] fix: make CI happy --- source/libs/scheduler/src/schTask.c | 80 +++++++++++++++-------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9cab39c301..c5f161b66a 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -52,7 +52,7 @@ void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) { int32_t nodeNum = taosArrayGetSize(pJob->nodeList); pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM); } - + pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1); } @@ -139,13 +139,15 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3 } if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it - SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); + SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, + pTask->execId, pTask->waitRetry); return TSDB_CODE_SUCCESS; } SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId)); if (NULL == nodeInfo) { // ignore it - SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); + SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, + pTask->execId, pTask->waitRetry); return TSDB_CODE_SUCCESS; } @@ -314,7 +316,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { if (!schMgmt.cfg.enableReSchedule) { return TSDB_CODE_SUCCESS; } - + if (SCH_IS_DATA_BIND_TASK(pTask)) { return TSDB_CODE_SUCCESS; } @@ -341,7 +343,8 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 } if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) { - SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, pTask->maxExecTimes, pTask->execId); + SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, + pTask->maxExecTimes, pTask->execId); schHandleJobFailure(pJob, rspCode); return TSDB_CODE_SUCCESS; } @@ -548,7 +551,8 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo if ((pTask->retryTimes + 1) > pTask->maxRetryTimes) { *needRetry = false; - SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes, pTask->maxRetryTimes); + SCH_TASK_DLOG("task no more retry since reach max retry times, retryTimes:%d/%d", pTask->retryTimes, + pTask->maxRetryTimes); return TSDB_CODE_SUCCESS; } @@ -564,25 +568,25 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } -/* - if (SCH_IS_DATA_BIND_TASK(pTask)) { - if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { - *needRetry = false; - SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId, - SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); - return TSDB_CODE_SUCCESS; - } - } else { - int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); + /* + if (SCH_IS_DATA_BIND_TASK(pTask)) { + if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId, + SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); + return TSDB_CODE_SUCCESS; + } + } else { + int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); - if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) { - *needRetry = false; - SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d", - pTask->candidateIdx, candidateNum); - return TSDB_CODE_SUCCESS; + if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d", + pTask->candidateIdx, candidateNum); + return TSDB_CODE_SUCCESS; + } } - } -*/ + */ *needRetry = true; SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode)); @@ -630,8 +634,9 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId, naddr->epSet.inUse, naddr->epSet.numOfEps, - SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); + SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId, + naddr->epSet.inUse, naddr->epSet.numOfEps, SCH_GET_CUR_EP(naddr)->fqdn, + SCH_GET_CUR_EP(naddr)->port); ++addNum; } @@ -711,10 +716,10 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { if (candidateNum <= 1) { goto _return; } - + switch (schMgmt.cfg.schPolicy) { case SCH_LOAD_SEQ: - case SCH_ALL: + case SCH_ALL: default: if (++pTask->candidateIdx >= candidateNum) { pTask->candidateIdx = 0; @@ -732,7 +737,7 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { _return: SCH_TASK_DLOG("switch task candiateIdx to %d/%d", pTask->candidateIdx, candidateNum); - + return TSDB_CODE_SUCCESS; } @@ -759,7 +764,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { return; } - int32_t i = 0; + int32_t i = 0; SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); while (nodeInfo) { if (nodeInfo->handle) { @@ -821,16 +826,16 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t schLaunchTaskImpl(void *param) { SSchTaskCtx *pCtx = (SSchTaskCtx *)param; - SSchJob *pJob = schAcquireJob(pCtx->jobRid); + SSchJob *pJob = schAcquireJob(pCtx->jobRid); if (NULL == pJob) { - taosMemoryFree(param); qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid); + taosMemoryFree(param); SCH_RET(TSDB_CODE_SCH_JOB_IS_DROPPING); } - + SSchTask *pTask = pCtx->pTask; - int8_t status = 0; - int32_t code = 0; + int8_t status = 0; + int32_t code = 0; atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); pTask->execId++; @@ -891,13 +896,12 @@ _return: SCH_RET(code); } -int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { - +int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx)); if (NULL == param) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + param->jobRid = pJob->refId; param->pTask = pTask; @@ -906,7 +910,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { } else { SCH_ERR_RET(schLaunchTaskImpl(param)); } - + return TSDB_CODE_SUCCESS; } From 16a5f01293681c127d5206fe329fc2541eef45eb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Sep 2022 19:08:34 +0800 Subject: [PATCH 17/23] fix: fix compile error --- source/libs/transport/src/transCli.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e0b47a98bc..1417960f11 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -290,6 +290,7 @@ bool cliMaySendCachedMsg(SCliConn* conn) { SCliMsg* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); cliSend(conn); + return true; } return false; _RETURN: From 26d86477f983b6d83d5f4647265d71c313565ce5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Sep 2022 20:11:17 +0800 Subject: [PATCH 18/23] fix: fix compile error --- source/libs/transport/src/transCli.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1417960f11..17bc132ef3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -289,8 +289,12 @@ bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { SCliMsg* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); - cliSend(conn); - return true; + if (pCliMsg == NULL) + return false; + else { + cliSend(conn); + return true; + } } return false; _RETURN: @@ -707,6 +711,9 @@ static bool cliHandleNoResp(SCliConn* conn) { if (cliMaySendCachedMsg(conn) == false) { SCliThrd* thrd = conn->hostThrd; addConnToPool(thrd->pool, conn); + res = false; + } else { + res = true; } } } From 440d14e36cbeebc2236ddf26a8a8c8ed76874419 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Sep 2022 23:27:05 +0800 Subject: [PATCH 19/23] fix: fix compile error --- source/libs/transport/src/transCli.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 17bc132ef3..8c87b554b8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1438,11 +1438,11 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) { return ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; } SCliThrd* pThrd = transGetWorkThrdFromHandle(handle, validHandle); - if (*validHandle == true && pThrd == NULL) { - int idx = cliRBChoseIdx(trans); - if (idx < 0) return NULL; - pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; - } + // if (pThrd == NULL) { + // int idx = cliRBChoseIdx(trans); + // if (idx < 0) return NULL; + // pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; + // } return pThrd; } int transReleaseCliHandle(void* handle) { @@ -1484,7 +1484,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran bool valid = false; SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid); - if (pThrd == NULL && valid == false) { + if (pThrd == NULL) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; From f26047cfe86d35a15b271b7d23e9a412d1142cdf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 11 Sep 2022 19:44:50 +0800 Subject: [PATCH 20/23] fix: fix compile error --- source/libs/transport/src/transCli.c | 29 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8c87b554b8..1c35557866 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1419,37 +1419,37 @@ void transUnrefCliHandle(void* handle) { cliDestroyConn((SCliConn*)handle, true); } } -static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(int64_t handle, bool* validHandle) { +static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t handle) { SCliThrd* pThrd = NULL; SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); if (exh == NULL) { return NULL; } - *validHandle = true; + if (exh->pThrd == NULL && trans != NULL) { + int idx = cliRBChoseIdx(trans); + if (idx < 0) return NULL; + exh->pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; + } + pThrd = exh->pThrd; transReleaseExHandle(transGetRefMgt(), handle); return pThrd; } -SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) { +SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { if (handle == 0) { int idx = cliRBChoseIdx(trans); if (idx < 0) return NULL; return ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; } - SCliThrd* pThrd = transGetWorkThrdFromHandle(handle, validHandle); - // if (pThrd == NULL) { - // int idx = cliRBChoseIdx(trans); - // if (idx < 0) return NULL; - // pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; - // } + SCliThrd* pThrd = transGetWorkThrdFromHandle(trans, handle); return pThrd; } int transReleaseCliHandle(void* handle) { int idx = -1; bool valid = false; - SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle, &valid); + SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); if (pThrd == NULL) { return -1; } @@ -1482,8 +1482,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran return -1; } - bool valid = false; - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid); + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); @@ -1527,9 +1526,8 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs return -1; } - bool valid = false; - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid); - if (pThrd == NULL && valid == false) { + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); + if (pThrd == NULL) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; @@ -1613,6 +1611,7 @@ int64_t transAllocHandle() { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); exh->refId = transAddExHandle(transGetRefMgt(), exh); tDebug("pre alloc refId %" PRId64 "", exh->refId); + return exh->refId; } #endif From 308bfe7fb8bc03d217e0fc2fa82008468306da01 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 11 Sep 2022 22:01:07 +0800 Subject: [PATCH 21/23] fix: fix compile error --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1c35557866..cc282bd9f0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -437,7 +437,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (pCtx == NULL || pCtx->pSem == NULL) { if (transMsg.info.ahandle == NULL) { - if (REQUEST_NO_RESP(&pMsg->msg)) destroyCmsg(pMsg); + if (REQUEST_NO_RESP(&pMsg->msg) || pMsg->type == Release) destroyCmsg(pMsg); once = true; continue; } From 0a337e82e30d5ae571e054f588a98db30f853767 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 11 Sep 2022 22:11:44 +0800 Subject: [PATCH 22/23] fix: fix compile error --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index cc282bd9f0..bdf103887c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -381,7 +381,7 @@ void cliHandleResp(SCliConn* conn) { return; } - if (pMsg && pMsg->type != Release) { + if (pMsg == NULL || (pMsg && pMsg->type != Release)) { if (cliAppCb(conn, &transMsg, pMsg) != 0) { return; } @@ -442,7 +442,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { continue; } } - if (pMsg && pMsg->type != Release) { + if (pMsg == NULL || (pMsg && pMsg->type != Release)) { if (cliAppCb(pConn, &transMsg, pMsg) != 0) { return; } From fb3c8491415ab63ddbb5d7611732bd5ba86312d5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 11 Sep 2022 23:01:37 +0800 Subject: [PATCH 23/23] fix: fix compile error --- source/libs/index/test/CMakeLists.txt | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index f3ddb6f629..2bc7353aa5 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -5,7 +5,6 @@ IF(NOT TD_DARWIN) add_executable(idxUtilUT "") add_executable(idxJsonUT "") add_executable(idxFstUtilUT "") - add_executable(idxBench "") target_sources(idxTest PRIVATE @@ -33,10 +32,6 @@ IF(NOT TD_DARWIN) PRIVATE "fstUtilUT.cc" ) - target_sources(idxBench - PRIVATE - "indexBench.cc" - ) target_include_directories (idxTest PUBLIC @@ -90,11 +85,6 @@ IF(NOT TD_DARWIN) "${TD_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) - target_include_directories (idxBench - PUBLIC - "${TD_SOURCE_DIR}/include/libs/index" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" - ) target_link_libraries (idxTest os @@ -162,13 +152,6 @@ IF(NOT TD_DARWIN) gtest_main index ) - target_link_libraries (idxBench - os - util - common - gtest_main - index - ) add_test( NAME idxJsonUT @@ -192,8 +175,4 @@ IF(NOT TD_DARWIN) NAME idxFstUT COMMAND idxFstUT ) - add_test( - NAME idxBench - COMMAND idxBench - ) ENDIF ()