From 8a010a58f3e7997c1c68eefb2ec10e5ce5ea5fbf Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 20 Sep 2022 16:13:08 +0800 Subject: [PATCH 01/12] enh: added memory allocators for parser and planner --- include/libs/nodes/nodes.h | 6 ++ source/libs/nodes/src/nodesUtilFuncs.c | 115 +++++++++++++++++++--- source/libs/parser/src/parAstCreater.c | 3 +- source/libs/parser/test/parTestUtil.cpp | 52 +++++----- source/libs/planner/src/planSpliter.c | 1 + source/libs/planner/test/planTestMain.cpp | 8 ++ source/libs/planner/test/planTestUtil.cpp | 49 ++++----- source/libs/planner/test/planTestUtil.h | 11 ++- source/libs/scalar/src/scalar.c | 2 +- 9 files changed, 181 insertions(+), 66 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 6500d3d183..1e2a53f598 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -275,6 +275,12 @@ typedef struct SNodeList { SListCell* pTail; } SNodeList; +typedef struct SNodeAllocator SNodeAllocator; + +int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator); +void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator); +void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator); + SNode* nodesMakeNode(ENodeType type); void nodesDestroyNode(SNode* pNode); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 805ddb9e42..be94495856 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -22,8 +22,98 @@ #include "tdatablock.h" #include "thash.h" -static SNode* makeNode(ENodeType type, size_t size) { - SNode* p = taosMemoryCalloc(1, size); +typedef struct SNodeMemChunk { + int32_t availableSize; + int32_t usedSize; + char* pBuf; + struct SNodeMemChunk* pNext; +} SNodeMemChunk; + +typedef struct SNodeAllocator { + int32_t chunkSize; + int32_t chunkNum; + SNodeMemChunk* pCurrChunk; + SNodeMemChunk* pChunks; +} SNodeAllocator; + +static threadlocal SNodeAllocator* pNodeAllocator; + +static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { + SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize); + if (NULL == pNewChunk) { + return NULL; + } + pNewChunk->pBuf = (char*)(pNewChunk + 1); + pNewChunk->availableSize = pAllocator->chunkSize; + pNewChunk->usedSize = 0; + pNewChunk->pNext = NULL; + if (NULL != pAllocator->pCurrChunk) { + pAllocator->pCurrChunk->pNext = pNewChunk; + } + pAllocator->pCurrChunk = pNewChunk; + if (NULL == pAllocator->pChunks) { + pAllocator->pChunks = pNewChunk; + } + ++(pAllocator->chunkNum); + return pNewChunk; +} + +static void* nodesCalloc(int32_t num, int32_t size) { + if (NULL == pNodeAllocator) { + return taosMemoryCalloc(num, size); + } + + if (pNodeAllocator->pCurrChunk->usedSize + size > pNodeAllocator->pCurrChunk->availableSize) { + if (NULL == callocNodeChunk(pNodeAllocator)) { + return NULL; + } + } + void* p = pNodeAllocator->pCurrChunk->pBuf + pNodeAllocator->pCurrChunk->usedSize; + pNodeAllocator->pCurrChunk->usedSize += size; + return p; +} + +static void nodesFree(void* p) { + if (NULL == pNodeAllocator) { + taosMemoryFree(p); + } + return; +} + +int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) { + *pAllocator = taosMemoryCalloc(1, sizeof(SNodeAllocator)); + if (NULL == *pAllocator) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*pAllocator)->chunkSize = chunkSize; + if (NULL == callocNodeChunk(*pAllocator)) { + taosMemoryFreeClear(*pAllocator); + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; +} + +void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator) { + if (NULL == pAllocator) { + return; + } + + nodesDebug("alloc chunkNum: %d, chunkTotakSize: %d", pAllocator->chunkNum, + pAllocator->chunkNum * pAllocator->chunkSize); + + SNodeMemChunk* pChunk = pAllocator->pChunks; + while (NULL != pChunk) { + SNodeMemChunk* pTemp = pChunk->pNext; + taosMemoryFree(pChunk); + pChunk = pTemp; + } + taosMemoryFree(pAllocator); +} + +void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator) { pNodeAllocator = pAllocator; } + +static SNode* makeNode(ENodeType type, int32_t size) { + SNode* p = nodesCalloc(1, size); if (NULL == p) { return NULL; } @@ -824,6 +914,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pLogicNode->pWStartTs); nodesDestroyNode(pLogicNode->pValues); nodesDestroyList(pLogicNode->pFillExprs); + nodesDestroyList(pLogicNode->pNotFillExprs); break; } case QUERY_NODE_LOGIC_PLAN_SORT: { @@ -1021,12 +1112,12 @@ void nodesDestroyNode(SNode* pNode) { default: break; } - taosMemoryFreeClear(pNode); + nodesFree(pNode); return; } SNodeList* nodesMakeList() { - SNodeList* p = taosMemoryCalloc(1, sizeof(SNodeList)); + SNodeList* p = nodesCalloc(1, sizeof(SNodeList)); if (NULL == p) { return NULL; } @@ -1037,7 +1128,7 @@ int32_t nodesListAppend(SNodeList* pList, SNode* pNode) { if (NULL == pList || NULL == pNode) { return TSDB_CODE_FAILED; } - SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); + SListCell* p = nodesCalloc(1, sizeof(SListCell)); if (NULL == p) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY; @@ -1104,7 +1195,7 @@ int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) { } pTarget->pTail = pSrc->pTail; pTarget->length += pSrc->length; - taosMemoryFreeClear(pSrc); + nodesFree(pSrc); return TSDB_CODE_SUCCESS; } @@ -1124,7 +1215,7 @@ int32_t nodesListPushFront(SNodeList* pList, SNode* pNode) { if (NULL == pList || NULL == pNode) { return TSDB_CODE_FAILED; } - SListCell* p = taosMemoryCalloc(1, sizeof(SListCell)); + SListCell* p = nodesCalloc(1, sizeof(SListCell)); if (NULL == p) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY; @@ -1152,7 +1243,7 @@ SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) { } SListCell* pNext = pCell->pNext; nodesDestroyNode(pCell->pNode); - taosMemoryFreeClear(pCell); + nodesFree(pCell); --(pList->length); return pNext; } @@ -1172,7 +1263,7 @@ void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc) { pPos->pPrev = pSrc->pTail; pTarget->length += pSrc->length; - taosMemoryFreeClear(pSrc); + nodesFree(pSrc); } SNode* nodesListGetNode(SNodeList* pList, int32_t index) { @@ -1204,7 +1295,7 @@ void nodesDestroyList(SNodeList* pList) { while (NULL != pNext) { pNext = nodesListErase(pList, pNext); } - taosMemoryFreeClear(pList); + nodesFree(pList); } void nodesClearList(SNodeList* pList) { @@ -1216,9 +1307,9 @@ void nodesClearList(SNodeList* pList) { while (NULL != pNext) { SListCell* tmp = pNext; pNext = pNext->pNext; - taosMemoryFreeClear(tmp); + nodesFree(tmp); } - taosMemoryFreeClear(pList); + nodesFree(pList); } void* nodesGetValueFromNode(SValueNode* pNode) { diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 6f11c653a4..379bd975b4 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -247,7 +247,8 @@ SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) { pExpr->userAlias[len] = '\0'; } } - taosMemoryFreeClear(pNode); + pRawExpr->pNode = NULL; + nodesDestroyNode(pNode); return pRealizedExpr; } diff --git a/source/libs/parser/test/parTestUtil.cpp b/source/libs/parser/test/parTestUtil.cpp index 360b904c17..14c991917b 100644 --- a/source/libs/parser/test/parTestUtil.cpp +++ b/source/libs/parser/test/parTestUtil.cpp @@ -119,12 +119,18 @@ class ParserTestBaseImpl { TEST_INTERFACE_ASYNC_API }; - static void _destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { + static void destoryParseContext(SParseContext* pCxt) { + taosArrayDestroy(pCxt->pTableMetaPos); + taosArrayDestroy(pCxt->pTableVgroupPos); + delete pCxt; + } + + static void destoryParseMetaCacheWarpper(SParseMetaCache* pMetaCache, bool request) { destoryParseMetaCache(pMetaCache, request); delete pMetaCache; } - static void _destroyQuery(SQuery** pQuery) { + static void destroyQuery(SQuery** pQuery) { if (nullptr == pQuery) { return; } @@ -303,10 +309,10 @@ class ParserTestBaseImpl { setParseContext(sql, &cxt); if (qIsInsertValuesSql(cxt.pSql, cxt.sqlLen)) { - unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); + unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery); doParseInsertSql(&cxt, query.get(), nullptr); } else { - unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); + unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery); doParse(&cxt, query.get()); SQuery* pQuery = *(query.get()); @@ -335,7 +341,7 @@ class ParserTestBaseImpl { SParseContext cxt = {0}; setParseContext(sql, &cxt); - unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); + unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery); doParseSql(&cxt, query.get()); SQuery* pQuery = *(query.get()); @@ -354,26 +360,26 @@ class ParserTestBaseImpl { void runAsyncInternalFuncs(const string& sql, int32_t expect, ParserStage checkStage) { reset(expect, checkStage, TEST_INTERFACE_ASYNC_INTERNAL); try { - SParseContext cxt = {0}; - setParseContext(sql, &cxt, true); + unique_ptr > cxt(new SParseContext(), destoryParseContext); + setParseContext(sql, cxt.get(), true); - unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); + unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery); bool request = true; unique_ptr > metaCache( - new SParseMetaCache(), bind(_destoryParseMetaCache, _1, cref(request))); - bool isInsertValues = qIsInsertValuesSql(cxt.pSql, cxt.sqlLen); + new SParseMetaCache(), bind(destoryParseMetaCacheWarpper, _1, cref(request))); + bool isInsertValues = qIsInsertValuesSql(cxt->pSql, cxt->sqlLen); if (isInsertValues) { - doParseInsertSyntax(&cxt, query.get(), metaCache.get()); + doParseInsertSyntax(cxt.get(), query.get(), metaCache.get()); } else { - doParse(&cxt, query.get()); - doCollectMetaKey(&cxt, *(query.get()), metaCache.get()); + doParse(cxt.get(), query.get()); + doCollectMetaKey(cxt.get(), *(query.get()), metaCache.get()); } SQuery* pQuery = *(query.get()); unique_ptr catalogReq(new SCatalogReq(), MockCatalogService::destoryCatalogReq); - doBuildCatalogReq(&cxt, metaCache.get(), catalogReq.get()); + doBuildCatalogReq(cxt.get(), metaCache.get(), catalogReq.get()); string err; thread t1([&]() { @@ -386,13 +392,13 @@ class ParserTestBaseImpl { doPutMetaDataToCache(catalogReq.get(), metaData.get(), metaCache.get(), isInsertValues); if (isInsertValues) { - doParseInsertSql(&cxt, query.get(), metaCache.get()); + doParseInsertSql(cxt.get(), query.get(), metaCache.get()); } else { - doAuthenticate(&cxt, pQuery, metaCache.get()); + doAuthenticate(cxt.get(), pQuery, metaCache.get()); - doTranslate(&cxt, pQuery, metaCache.get()); + doTranslate(cxt.get(), pQuery, metaCache.get()); - doCalculateConstant(&cxt, pQuery); + doCalculateConstant(cxt.get(), pQuery); } } catch (const TerminateFlag& e) { // success and terminate @@ -423,13 +429,13 @@ class ParserTestBaseImpl { void runAsyncApis(const string& sql, int32_t expect, ParserStage checkStage) { reset(expect, checkStage, TEST_INTERFACE_ASYNC_API); try { - SParseContext cxt = {0}; - setParseContext(sql, &cxt); + unique_ptr > cxt(new SParseContext(), destoryParseContext); + setParseContext(sql, cxt.get()); unique_ptr catalogReq(new SCatalogReq(), MockCatalogService::destoryCatalogReq); - unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), _destroyQuery); - doParseSqlSyntax(&cxt, query.get(), catalogReq.get()); + unique_ptr query((SQuery**)taosMemoryCalloc(1, sizeof(SQuery*)), destroyQuery); + doParseSqlSyntax(cxt.get(), query.get(), catalogReq.get()); SQuery* pQuery = *(query.get()); string err; @@ -438,7 +444,7 @@ class ParserTestBaseImpl { unique_ptr metaData(new SMetaData(), MockCatalogService::destoryMetaData); doGetAllMeta(catalogReq.get(), metaData.get()); - doAnalyseSqlSemantic(&cxt, catalogReq.get(), metaData.get(), pQuery); + doAnalyseSqlSemantic(cxt.get(), catalogReq.get(), metaData.get(), pQuery); } catch (const TerminateFlag& e) { // success and terminate } catch (const runtime_error& e) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index beb938b161..3af3786363 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -997,6 +997,7 @@ static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSub code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort); } if (TSDB_CODE_SUCCESS == code) { + nodesDestroyNode((SNode*)pScan); code = nodesListMakeStrictAppend(&pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT)); } diff --git a/source/libs/planner/test/planTestMain.cpp b/source/libs/planner/test/planTestMain.cpp index 8f6fc832a2..e86bf90f51 100644 --- a/source/libs/planner/test/planTestMain.cpp +++ b/source/libs/planner/test/planTestMain.cpp @@ -22,6 +22,7 @@ #include "mockCatalog.h" #include "parser.h" #include "planTestUtil.h" +#include "tglobal.h" class PlannerEnv : public testing::Environment { public: @@ -30,6 +31,7 @@ class PlannerEnv : public testing::Environment { initMetaDataEnv(); generateMetaData(); initLog(TD_TMP_DIR_PATH "td"); + initCfg(); } virtual void TearDown() { @@ -67,6 +69,8 @@ class PlannerEnv : public testing::Environment { std::cout << "failed to init log file" << std::endl; } } + + void initCfg() { tsQueryPlannerTrace = true; } }; static void parseArg(int argc, char* argv[]) { @@ -79,6 +83,7 @@ static void parseArg(int argc, char* argv[]) { {"limitSql", required_argument, NULL, 'i'}, {"log", required_argument, NULL, 'l'}, {"queryPolicy", required_argument, NULL, 'q'}, + {"useNodeAllocator", required_argument, NULL, 'a'}, {0, 0, 0, 0} }; // clang-format on @@ -99,6 +104,9 @@ static void parseArg(int argc, char* argv[]) { case 'q': setQueryPolicy(optarg); break; + case 'a': + setUseNodeAllocator(optarg); + break; default: break; } diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 2b8e3d9864..65aed6cadc 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -41,6 +41,7 @@ using namespace testing; enum DumpModule { DUMP_MODULE_NOTHING = 1, + DUMP_MODULE_SQL, DUMP_MODULE_PARSER, DUMP_MODULE_LOGIC, DUMP_MODULE_OPTIMIZED, @@ -56,10 +57,13 @@ int32_t g_skipSql = 0; int32_t g_limitSql = 0; int32_t g_logLevel = 131; int32_t g_queryPolicy = QUERY_POLICY_VNODE; +bool g_useNodeAllocator = false; void setDumpModule(const char* pModule) { if (NULL == pModule) { g_dumpModule = DUMP_MODULE_ALL; + } else if (0 == strncasecmp(pModule, "sql", strlen(pModule))) { + g_dumpModule = DUMP_MODULE_SQL; } else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) { g_dumpModule = DUMP_MODULE_PARSER; } else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) { @@ -79,10 +83,11 @@ void setDumpModule(const char* pModule) { } } -void setSkipSqlNum(const char* pNum) { g_skipSql = stoi(pNum); } -void setLimitSqlNum(const char* pNum) { g_limitSql = stoi(pNum); } -void setLogLevel(const char* pLogLevel) { g_logLevel = stoi(pLogLevel); } -void setQueryPolicy(const char* pQueryPolicy) { g_queryPolicy = stoi(pQueryPolicy); } +void setSkipSqlNum(const char* pArg) { g_skipSql = stoi(pArg); } +void setLimitSqlNum(const char* pArg) { g_limitSql = stoi(pArg); } +void setLogLevel(const char* pArg) { g_logLevel = stoi(pArg); } +void setQueryPolicy(const char* pArg) { g_queryPolicy = stoi(pArg); } +void setUseNodeAllocator(const char* pArg) { g_useNodeAllocator = stoi(pArg); } int32_t getLogLevel() { return g_logLevel; } @@ -124,6 +129,12 @@ class PlannerTestBaseImpl { } void runImpl(const string& sql, int32_t queryPolicy) { + SNodeAllocator* pAllocator = NULL; + if (g_useNodeAllocator) { + nodesCreateNodeAllocator(32 * 1024, &pAllocator); + nodesResetThreadLevelAllocator(pAllocator); + } + reset(); tsQueryPolicy = queryPolicy; try { @@ -155,8 +166,13 @@ class PlannerTestBaseImpl { dump(g_dumpModule); } catch (...) { dump(DUMP_MODULE_ALL); + nodesDestroyNodeAllocator(pAllocator); + nodesResetThreadLevelAllocator(NULL); throw; } + + nodesDestroyNodeAllocator(pAllocator); + nodesResetThreadLevelAllocator(NULL); } void prepare(const string& sql) { @@ -216,6 +232,8 @@ class PlannerTestBaseImpl { doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan); unique_ptr plan(pPlan, (void (*)(SQueryPlan*))nodesDestroyNode); + checkPlanMsg((SNode*)pPlan); + dump(g_dumpModule); } catch (...) { dump(DUMP_MODULE_ALL); @@ -252,7 +270,6 @@ class PlannerTestBaseImpl { string splitLogicPlan_; string scaledLogicPlan_; string physiPlan_; - string physiPlanMsg_; vector physiSubplans_; }; @@ -276,17 +293,16 @@ class PlannerTestBaseImpl { res_.splitLogicPlan_.clear(); res_.scaledLogicPlan_.clear(); res_.physiPlan_.clear(); - res_.physiPlanMsg_.clear(); res_.physiSubplans_.clear(); } void dump(DumpModule module) { - cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl; - if (DUMP_MODULE_NOTHING == module) { return; } + cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl; + if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) { if (res_.prepareAst_.empty()) { cout << "+++++++++++++++++++++syntax tree : " << endl; @@ -411,8 +427,6 @@ class PlannerTestBaseImpl { SNode* pSubplan; FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); } } - res_.physiPlanMsg_ = toMsg((SNode*)(*pPlan)); - cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl; } void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { @@ -451,27 +465,16 @@ class PlannerTestBaseImpl { string toString(const SNode* pRoot) { char* pStr = NULL; int32_t len = 0; - - auto start = chrono::steady_clock::now(); DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len) - if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pRoot)) { - cout << "nodesNodeToString: " - << chrono::duration_cast(chrono::steady_clock::now() - start).count() << "us" << endl; - } - string str(pStr); taosMemoryFreeClear(pStr); return str; } - string toMsg(const SNode* pRoot) { + void checkPlanMsg(const SNode* pRoot) { char* pStr = NULL; int32_t len = 0; - - auto start = chrono::steady_clock::now(); DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len) - cout << "nodesNodeToMsg: " - << chrono::duration_cast(chrono::steady_clock::now() - start).count() << "us" << endl; string copyStr(pStr, len); SNode* pNode = NULL; @@ -491,9 +494,7 @@ class PlannerTestBaseImpl { nodesDestroyNode(pNode); taosMemoryFreeClear(pNewStr); - string str(pStr, len); taosMemoryFreeClear(pStr); - return str; } caseEnv caseEnv_; diff --git a/source/libs/planner/test/planTestUtil.h b/source/libs/planner/test/planTestUtil.h index b0ddd726a6..be8b51f769 100644 --- a/source/libs/planner/test/planTestUtil.h +++ b/source/libs/planner/test/planTestUtil.h @@ -41,11 +41,12 @@ class PlannerTestBase : public testing::Test { std::unique_ptr impl_; }; -extern void setDumpModule(const char* pModule); -extern void setSkipSqlNum(const char* pNum); -extern void setLimitSqlNum(const char* pNum); -extern void setLogLevel(const char* pLogLevel); -extern void setQueryPolicy(const char* pQueryPolicy); +extern void setDumpModule(const char* pArg); +extern void setSkipSqlNum(const char* pArg); +extern void setLimitSqlNum(const char* pArg); +extern void setLogLevel(const char* pArg); +extern void setQueryPolicy(const char* pArg); +extern void setUseNodeAllocator(const char* pArg); extern int32_t getLogLevel(); #endif // PLAN_TEST_UTIL_H diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index cd1f6624bd..05730c62ac 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -847,7 +847,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { memcpy(res->datum.p, output.columnData->pData, len); } else if (IS_VAR_DATA_TYPE(type)) { //res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); - res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1); + res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, 1); res->node.resType.bytes = varDataTLen(output.columnData->pData); memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); } else { From 46b0c2c9c4f597adf0ea811f450e3c8826fcde5b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 20 Sep 2022 18:52:45 +0800 Subject: [PATCH 02/12] enh: added memory allocators for parser and planner --- include/common/tglobal.h | 2 ++ source/client/inc/clientInt.h | 1 + source/client/src/clientEnv.c | 1 + source/client/src/clientImpl.c | 11 +++++++ source/client/src/clientMain.c | 7 ++++ source/common/src/tglobal.c | 44 ++++++++++++++++---------- source/libs/nodes/src/nodesUtilFuncs.c | 18 ++++++++--- 7 files changed, 63 insertions(+), 21 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2de4ffdc17..234937ea7e 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -95,6 +95,8 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in extern int32_t tsQueryPolicy; extern int32_t tsQuerySmaOptimize; extern bool tsQueryPlannerTrace; +extern int32_t tsQueryNodeChunkSize; +extern bool tsQueryUseNodeAllocator; // client extern int32_t tsMinSlidingTime; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b8fa9580e7..f042c3ad7c 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -250,6 +250,7 @@ typedef struct SRequestObj { bool inRetry; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; + SNodeAllocator* pNodeAllocator; } SRequestObj; typedef struct SSyncQueryParam { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b739aedca0..8fded03472 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -349,6 +349,7 @@ void doDestroyRequest(void *p) { taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->targetTableList); + nodesDestroyNodeAllocator(pRequest->pNodeAllocator); destroyQueryExecRes(&pRequest->body.resInfo.execRes); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0e1d82b273..bc1cdc247e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -194,6 +194,17 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, return TSDB_CODE_TSC_OUT_OF_MEMORY; } + if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) { + if (TSDB_CODE_SUCCESS != nodesCreateNodeAllocator(tsQueryNodeChunkSize, &((*pRequest)->pNodeAllocator))) { + tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self, + (*pRequest)->requestId, pTscObj->id, sql); + + destroyRequest(*pRequest); + *pRequest = NULL; + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); return TSDB_CODE_SUCCESS; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 73636e7372..e19d88fcf3 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -700,6 +700,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->metric.ctgEnd = taosGetTimestampUs(); + nodesResetThreadLevelAllocator(pRequest->pNodeAllocator); + if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); pRequest->stableQuery = pQuery->stableQuery; @@ -726,9 +728,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->requestId); launchAsyncQuery(pRequest, pQuery, pResultMeta); qDestroyQuery(pQuery); + nodesResetThreadLevelAllocator(NULL); } else { destorySqlParseWrapper(pWrapper); qDestroyQuery(pQuery); + nodesResetThreadLevelAllocator(NULL); if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); @@ -801,6 +805,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } SQuery *pQuery = NULL; + nodesResetThreadLevelAllocator(pRequest->pNodeAllocator); pRequest->metric.syntaxStart = taosGetTimestampUs(); @@ -844,6 +849,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { &pRequest->body.queryJob); pCxt = NULL; if (code == TSDB_CODE_SUCCESS) { + nodesResetThreadLevelAllocator(NULL); return; } @@ -851,6 +857,7 @@ _error: tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); taosMemoryFree(pCxt); + nodesResetThreadLevelAllocator(NULL); terrno = code; pRequest->code = code; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ddda8f8c9a..e119dae743 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -92,6 +92,8 @@ bool tsSmlDataFormat = int32_t tsQueryPolicy = 1; int32_t tsQuerySmaOptimize = 0; bool tsQueryPlannerTrace = false; +int32_t tsQueryNodeChunkSize = 32 * 1024; +bool tsQueryUseNodeAllocator = true; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -284,6 +286,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1; + if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1; + if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, true) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; @@ -385,9 +389,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; -// tsNumOfQnodeFetchThreads = tsNumOfCores / 2; -// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); -// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; + // tsNumOfQnodeFetchThreads = tsNumOfCores / 2; + // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); + // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); @@ -527,15 +531,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } -/* - pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); - if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfQnodeFetchThreads = numOfCores / 2; - tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); - pItem->i32 = tsNumOfQnodeFetchThreads; - pItem->stype = stype; - } -*/ + /* + pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsNumOfQnodeFetchThreads = numOfCores / 2; + tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); + pItem->i32 = tsNumOfQnodeFetchThreads; + pItem->stype = stype; + } + */ pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { @@ -643,6 +647,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; + tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32; + tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval; return 0; } @@ -693,7 +699,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; -// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; @@ -941,10 +947,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; -/* - } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { - tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; -*/ + /* + } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { + tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + */ } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { @@ -976,6 +982,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32; } else if (strcasecmp("queryPlannerTrace", name) == 0) { tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; + } else if (strcasecmp("queryNodeChunkSize", name) == 0) { + tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32; + } else if (strcasecmp("queryUseNodeAllocator", name) == 0) { + tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval; } break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index be94495856..8b9b723a2a 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -58,9 +58,9 @@ static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { return pNewChunk; } -static void* nodesCalloc(int32_t num, int32_t size) { +static void* nodesCallocImpl(int32_t size) { if (NULL == pNodeAllocator) { - return taosMemoryCalloc(num, size); + return taosMemoryCalloc(1, size); } if (pNodeAllocator->pCurrChunk->usedSize + size > pNodeAllocator->pCurrChunk->availableSize) { @@ -73,9 +73,19 @@ static void* nodesCalloc(int32_t num, int32_t size) { return p; } +static void* nodesCalloc(int32_t num, int32_t size) { + void* p = nodesCallocImpl(num * size + 1); + if (NULL == p) { + return NULL; + } + *(char*)p = (NULL != pNodeAllocator) ? 1 : 0; + return (char*)p + 1; +} + static void nodesFree(void* p) { - if (NULL == pNodeAllocator) { - taosMemoryFree(p); + char* ptr = (char*)p - 1; + if (0 == *ptr) { + taosMemoryFree(ptr); } return; } From 2f475399a698301987f90f3ceea3bafddde2417c Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 21 Sep 2022 10:57:41 +0800 Subject: [PATCH 03/12] enh: added memory allocators for parser and planner --- include/libs/nodes/nodes.h | 8 +++- include/libs/scheduler/scheduler.h | 1 + source/client/inc/clientInt.h | 3 +- source/client/src/clientEnv.c | 3 +- source/client/src/clientImpl.c | 4 +- source/client/src/clientMain.c | 12 +++--- source/libs/nodes/src/nodesUtilFuncs.c | 58 +++++++++++++++++++++++++- source/libs/scheduler/inc/schInt.h | 3 +- source/libs/scheduler/src/schJob.c | 2 + 9 files changed, 81 insertions(+), 13 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 1e2a53f598..3d5b04ddf4 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -278,9 +278,15 @@ typedef struct SNodeList { typedef struct SNodeAllocator SNodeAllocator; int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator); -void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator); +void nodesDestroyNodeAllocator(void* pAllocator); void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator); +int32_t nodesAllocatorInit(); +int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId); +void nodesDestroyAllocator(int64_t refId); +void nodesResetAllocator(int64_t refId); +int64_t nodesIncAllocatorRefCount(int64_t refId); + SNode* nodesMakeNode(ENodeType type); void nodesDestroyNode(SNode* pNode); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index e6973cd390..738d057e6a 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -67,6 +67,7 @@ typedef struct SSchedulerReq { SRequestConnInfo *pConn; SArray *pNodeList; SQueryPlan *pDag; + int64_t allocatorRefId; const char *sql; int64_t startTs; schedulerExecFp execFp; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index f042c3ad7c..eca1a0ebbe 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -250,7 +250,8 @@ typedef struct SRequestObj { bool inRetry; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; - SNodeAllocator* pNodeAllocator; + // SNodeAllocator* pNodeAllocator; + int64_t allocatorRefId; } SRequestObj; typedef struct SSyncQueryParam { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 8fded03472..e95a2d2871 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -349,7 +349,7 @@ void doDestroyRequest(void *p) { taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->targetTableList); - nodesDestroyNodeAllocator(pRequest->pNodeAllocator); + nodesDestroyAllocator(pRequest->allocatorRefId); destroyQueryExecRes(&pRequest->body.resInfo.execRes); @@ -412,6 +412,7 @@ void taos_init_imp(void) { initTaskQueue(); fmFuncMgtInit(); + nodesAllocatorInit(); clientConnRefPool = taosOpenRef(200, destroyTscObj); clientReqRefPool = taosOpenRef(40960, doDestroyRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index bc1cdc247e..75e0966243 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -194,8 +194,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, return TSDB_CODE_TSC_OUT_OF_MEMORY; } + (*pRequest)->allocatorRefId = -1; if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) { - if (TSDB_CODE_SUCCESS != nodesCreateNodeAllocator(tsQueryNodeChunkSize, &((*pRequest)->pNodeAllocator))) { + if (TSDB_CODE_SUCCESS != nodesCreateAllocator(tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) { tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); @@ -1058,6 +1059,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, + .allocatorRefId = pRequest->allocatorRefId, .sql = pRequest->sqlstr, .startTs = pRequest->metric.start, .execFp = schedulerExecCb, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e19d88fcf3..c3f8ca32b8 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -700,7 +700,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->metric.ctgEnd = taosGetTimestampUs(); - nodesResetThreadLevelAllocator(pRequest->pNodeAllocator); + nodesResetAllocator(pRequest->allocatorRefId); if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); @@ -728,11 +728,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->requestId); launchAsyncQuery(pRequest, pQuery, pResultMeta); qDestroyQuery(pQuery); - nodesResetThreadLevelAllocator(NULL); + nodesResetAllocator(-1); } else { destorySqlParseWrapper(pWrapper); qDestroyQuery(pQuery); - nodesResetThreadLevelAllocator(NULL); + nodesResetAllocator(-1); if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); @@ -805,7 +805,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } SQuery *pQuery = NULL; - nodesResetThreadLevelAllocator(pRequest->pNodeAllocator); + nodesResetAllocator(pRequest->allocatorRefId); pRequest->metric.syntaxStart = taosGetTimestampUs(); @@ -849,7 +849,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { &pRequest->body.queryJob); pCxt = NULL; if (code == TSDB_CODE_SUCCESS) { - nodesResetThreadLevelAllocator(NULL); + nodesResetAllocator(-1); return; } @@ -857,7 +857,7 @@ _error: tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); taosMemoryFree(pCxt); - nodesResetThreadLevelAllocator(NULL); + nodesResetAllocator(-1); terrno = code; pRequest->code = code; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 8b9b723a2a..b7caead3e5 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -21,6 +21,7 @@ #include "taoserror.h" #include "tdatablock.h" #include "thash.h" +#include "tref.h" typedef struct SNodeMemChunk { int32_t availableSize; @@ -30,6 +31,7 @@ typedef struct SNodeMemChunk { } SNodeMemChunk; typedef struct SNodeAllocator { + int64_t self; int32_t chunkSize; int32_t chunkNum; SNodeMemChunk* pCurrChunk; @@ -37,6 +39,22 @@ typedef struct SNodeAllocator { } SNodeAllocator; static threadlocal SNodeAllocator* pNodeAllocator; +static int32_t allocatorReqRefPool = -1; + +int32_t nodesAllocatorInit() { + if (allocatorReqRefPool >= 0) { + nodesWarn("nodes already initialized"); + return TSDB_CODE_SUCCESS; + } + + allocatorReqRefPool = taosOpenRef(40960, nodesDestroyNodeAllocator); + if (allocatorReqRefPool < 0) { + nodesError("init nodes failed"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize); @@ -103,11 +121,13 @@ int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) return TSDB_CODE_SUCCESS; } -void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator) { - if (NULL == pAllocator) { +void nodesDestroyNodeAllocator(void* p) { + if (NULL == p) { return; } + SNodeAllocator* pAllocator = p; + nodesDebug("alloc chunkNum: %d, chunkTotakSize: %d", pAllocator->chunkNum, pAllocator->chunkNum * pAllocator->chunkSize); @@ -122,6 +142,40 @@ void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator) { void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator) { pNodeAllocator = pAllocator; } +int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId) { + SNodeAllocator* pAllocator = NULL; + int32_t code = nodesCreateNodeAllocator(chunkSize, &pAllocator); + if (TSDB_CODE_SUCCESS == code) { + pAllocator->self = taosAddRef(allocatorReqRefPool, pAllocator); + *pRefId = pAllocator->self; + } + return code; +} + +void nodesDestroyAllocator(int64_t refId) { + if (refId < 0) { + return; + } + taosReleaseRef(allocatorReqRefPool, refId); +} + +void nodesResetAllocator(int64_t refId) { + if (refId < 0) { + pNodeAllocator = NULL; + } else { + pNodeAllocator = taosAcquireRef(allocatorReqRefPool, refId); + taosReleaseRef(allocatorReqRefPool, refId); + } +} + +int64_t nodesIncAllocatorRefCount(int64_t refId) { + if (refId < 0) { + return -1; + } + SNodeAllocator* pAllocator = taosAcquireRef(allocatorReqRefPool, refId); + return pAllocator->self; +} + static SNode* makeNode(ENodeType type, int32_t size) { SNode* p = nodesCalloc(1, size); if (NULL == p) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 7fea286732..a62531a875 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -254,7 +254,8 @@ typedef struct SSchJob { SRequestConnInfo conn; SArray *nodeList; // qnode/vnode list, SArray SArray *levels; // starting from 0. SArray - SQueryPlan *pDag; + SQueryPlan *pDag; + int64_t allocatorRefId; SArray *dataSrcTasks; // SArray int32_t levelIdx; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 98501427ab..345f4680b0 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -673,6 +673,7 @@ void schFreeJobImpl(void *job) { destroyQueryExecRes(&pJob->execRes); qDestroyQueryPlan(pJob->pDag); + nodesDestroyAllocator(pJob->allocatorRefId); taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->fetchRes); @@ -724,6 +725,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { pJob->sql = strdup(pReq->sql); } pJob->pDag = pReq->pDag; + pJob->allocatorRefId = nodesIncAllocatorRefCount(pReq->allocatorRefId); pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillParam = pReq->chkKillParam; pJob->userRes.execFp = pReq->execFp; From a914816ce75111b15d75dbaf4a93255da9dba975 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 21 Sep 2022 11:28:34 +0800 Subject: [PATCH 04/12] enh: added memory allocators for parser and planner --- source/client/inc/clientInt.h | 3 +-- source/client/src/clientEnv.c | 1 + source/libs/nodes/src/nodesUtilFuncs.c | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index eca1a0ebbe..7a9c665556 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -250,8 +250,7 @@ typedef struct SRequestObj { bool inRetry; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; - // SNodeAllocator* pNodeAllocator; - int64_t allocatorRefId; + int64_t allocatorRefId; } SRequestObj; typedef struct SSyncQueryParam { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index e95a2d2871..5019314fed 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -288,6 +288,7 @@ void *createRequest(uint64_t connId, int32_t type) { pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default pRequest->type = type; + pRequest->allocatorRefId = -1; pRequest->pDb = getDbOfConnection(pTscObj); pRequest->pTscObj = pTscObj; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index b7caead3e5..480d2cce72 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -153,14 +153,14 @@ int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId) { } void nodesDestroyAllocator(int64_t refId) { - if (refId < 0) { + if (refId <= 0) { return; } taosReleaseRef(allocatorReqRefPool, refId); } void nodesResetAllocator(int64_t refId) { - if (refId < 0) { + if (refId <= 0) { pNodeAllocator = NULL; } else { pNodeAllocator = taosAcquireRef(allocatorReqRefPool, refId); @@ -169,7 +169,7 @@ void nodesResetAllocator(int64_t refId) { } int64_t nodesIncAllocatorRefCount(int64_t refId) { - if (refId < 0) { + if (refId <= 0) { return -1; } SNodeAllocator* pAllocator = taosAcquireRef(allocatorReqRefPool, refId); From 90c300e3202f73d879eb5f66bc2b92769638a371 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 21 Sep 2022 13:53:40 +0800 Subject: [PATCH 05/12] enh: added memory allocators for parser and planner --- source/client/src/clientImpl.c | 2 ++ source/client/src/clientMain.c | 4 +--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 75e0966243..b74dca3cfe 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1046,6 +1046,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM pRequest->body.subplanNum = pDag->numOfSubplans; } + nodesResetAllocator(-1); + pRequest->metric.planEnd = taosGetTimestampUs(); if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c3f8ca32b8..77bd71c756 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -728,7 +728,6 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->requestId); launchAsyncQuery(pRequest, pQuery, pResultMeta); qDestroyQuery(pQuery); - nodesResetAllocator(-1); } else { destorySqlParseWrapper(pWrapper); qDestroyQuery(pQuery); @@ -811,6 +810,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)}; code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq); + nodesResetAllocator(-1); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -849,7 +849,6 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { &pRequest->body.queryJob); pCxt = NULL; if (code == TSDB_CODE_SUCCESS) { - nodesResetAllocator(-1); return; } @@ -857,7 +856,6 @@ _error: tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); taosMemoryFree(pCxt); - nodesResetAllocator(-1); terrno = code; pRequest->code = code; From 5116e0a01b5f619e129a92808d6192bea9c1632b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 21 Sep 2022 17:04:12 +0800 Subject: [PATCH 06/12] enh: added memory allocators for parser and planner --- include/libs/nodes/nodes.h | 17 ++- include/libs/parser/parser.h | 1 + include/libs/planner/planner.h | 1 + source/client/src/clientEnv.c | 2 +- source/client/src/clientImpl.c | 8 +- source/client/src/clientMain.c | 8 +- source/libs/nodes/src/nodesUtilFuncs.c | 148 ++++++++++++++-------- source/libs/parser/src/parser.c | 19 ++- source/libs/planner/src/planner.c | 6 +- source/libs/planner/test/planTestMain.cpp | 2 + source/libs/planner/test/planTestUtil.cpp | 14 +- source/libs/scheduler/src/schJob.c | 4 +- 12 files changed, 143 insertions(+), 87 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3d5b04ddf4..634dae9ec5 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -277,15 +277,14 @@ typedef struct SNodeList { typedef struct SNodeAllocator SNodeAllocator; -int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator); -void nodesDestroyNodeAllocator(void* pAllocator); -void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator); - -int32_t nodesAllocatorInit(); -int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId); -void nodesDestroyAllocator(int64_t refId); -void nodesResetAllocator(int64_t refId); -int64_t nodesIncAllocatorRefCount(int64_t refId); +int32_t nodesInitAllocatorSet(); +void nodesDestroyAllocatorSet(); +int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId); +int32_t nodesAcquireAllocator(int64_t allocatorId); +int32_t nodesReleaseAllocator(int64_t allocatorId); +int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId); +int64_t nodesReleaseAllocatorWeakRef(int64_t allocatorId); +void nodesDestroyAllocator(int64_t allocatorId); SNode* nodesMakeNode(ENodeType type); void nodesDestroyNode(SNode* pNode); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 95bde85864..b1a937910d 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -56,6 +56,7 @@ typedef struct SParseContext { bool nodeOffline; SArray* pTableMetaPos; // sql table pos => catalog data pos SArray* pTableVgroupPos; // sql table pos => catalog data pos + int64_t allocatorId; } SParseContext; int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index e03ac3811a..e52fe39527 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -39,6 +39,7 @@ typedef struct SPlanContext { int32_t msgLen; const char* pUser; bool sysInfo; + int64_t allocatorId; } SPlanContext; // Create the physical plan for the query, according to the AST. diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 5019314fed..2faf268880 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -413,7 +413,7 @@ void taos_init_imp(void) { initTaskQueue(); fmFuncMgtInit(); - nodesAllocatorInit(); + nodesInitAllocatorSet(); clientConnRefPool = taosOpenRef(200, destroyTscObj); clientReqRefPool = taosOpenRef(40960, doDestroyRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c0809ca822..ef19eba7fe 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -197,7 +197,8 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, (*pRequest)->allocatorRefId = -1; if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) { - if (TSDB_CODE_SUCCESS != nodesCreateAllocator(tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) { + if (TSDB_CODE_SUCCESS != + nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) { tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); @@ -1035,7 +1036,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pMsg = pRequest->msgBuf, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, .pUser = pRequest->pTscObj->user, - .sysInfo = pRequest->pTscObj->sysInfo}; + .sysInfo = pRequest->pTscObj->sysInfo, + .allocatorId = pRequest->allocatorRefId}; SAppInstInfo* pAppInfo = getAppInfo(pRequest); SQueryPlan* pDag = NULL; @@ -1047,8 +1049,6 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM pRequest->body.subplanNum = pDag->numOfSubplans; } - nodesResetAllocator(-1); - pRequest->metric.planEnd = taosGetTimestampUs(); if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index a03b6cd5a5..ae386b819c 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -700,8 +700,6 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->metric.ctgEnd = taosGetTimestampUs(); - nodesResetAllocator(pRequest->allocatorRefId); - if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); pRequest->stableQuery = pQuery->stableQuery; @@ -731,7 +729,6 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { } else { destorySqlParseWrapper(pWrapper); qDestroyQuery(pQuery); - nodesResetAllocator(-1); if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); @@ -778,7 +775,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { .enableSysInfo = pTscObj->sysInfo, .async = true, .svrVer = pTscObj->sVer, - .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)}; + .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes), + .allocatorId = pRequest->allocatorRefId}; return TSDB_CODE_SUCCESS; } @@ -804,13 +802,11 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } SQuery *pQuery = NULL; - nodesResetAllocator(pRequest->allocatorRefId); pRequest->metric.syntaxStart = taosGetTimestampUs(); SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)}; code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq); - nodesResetAllocator(-1); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 480d2cce72..2e5a0d935b 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -32,29 +32,16 @@ typedef struct SNodeMemChunk { typedef struct SNodeAllocator { int64_t self; + int64_t queryId; int32_t chunkSize; int32_t chunkNum; SNodeMemChunk* pCurrChunk; SNodeMemChunk* pChunks; + TdThreadMutex mutex; } SNodeAllocator; -static threadlocal SNodeAllocator* pNodeAllocator; -static int32_t allocatorReqRefPool = -1; - -int32_t nodesAllocatorInit() { - if (allocatorReqRefPool >= 0) { - nodesWarn("nodes already initialized"); - return TSDB_CODE_SUCCESS; - } - - allocatorReqRefPool = taosOpenRef(40960, nodesDestroyNodeAllocator); - if (allocatorReqRefPool < 0) { - nodesError("init nodes failed"); - return TSDB_CODE_OUT_OF_MEMORY; - } - - return TSDB_CODE_SUCCESS; -} +static threadlocal SNodeAllocator* g_pNodeAllocator; +static int32_t g_allocatorReqRefPool = -1; static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize); @@ -77,17 +64,17 @@ static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { } static void* nodesCallocImpl(int32_t size) { - if (NULL == pNodeAllocator) { + if (NULL == g_pNodeAllocator) { return taosMemoryCalloc(1, size); } - if (pNodeAllocator->pCurrChunk->usedSize + size > pNodeAllocator->pCurrChunk->availableSize) { - if (NULL == callocNodeChunk(pNodeAllocator)) { + if (g_pNodeAllocator->pCurrChunk->usedSize + size > g_pNodeAllocator->pCurrChunk->availableSize) { + if (NULL == callocNodeChunk(g_pNodeAllocator)) { return NULL; } } - void* p = pNodeAllocator->pCurrChunk->pBuf + pNodeAllocator->pCurrChunk->usedSize; - pNodeAllocator->pCurrChunk->usedSize += size; + void* p = g_pNodeAllocator->pCurrChunk->pBuf + g_pNodeAllocator->pCurrChunk->usedSize; + g_pNodeAllocator->pCurrChunk->usedSize += size; return p; } @@ -96,7 +83,7 @@ static void* nodesCalloc(int32_t num, int32_t size) { if (NULL == p) { return NULL; } - *(char*)p = (NULL != pNodeAllocator) ? 1 : 0; + *(char*)p = (NULL != g_pNodeAllocator) ? 1 : 0; return (char*)p + 1; } @@ -108,7 +95,7 @@ static void nodesFree(void* p) { return; } -int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) { +static int32_t createNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) { *pAllocator = taosMemoryCalloc(1, sizeof(SNodeAllocator)); if (NULL == *pAllocator) { return TSDB_CODE_OUT_OF_MEMORY; @@ -118,18 +105,19 @@ int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) taosMemoryFreeClear(*pAllocator); return TSDB_CODE_OUT_OF_MEMORY; } + taosThreadMutexInit(&(*pAllocator)->mutex, NULL); return TSDB_CODE_SUCCESS; } -void nodesDestroyNodeAllocator(void* p) { +static void destroyNodeAllocator(void* p) { if (NULL == p) { return; } SNodeAllocator* pAllocator = p; - nodesDebug("alloc chunkNum: %d, chunkTotakSize: %d", pAllocator->chunkNum, - pAllocator->chunkNum * pAllocator->chunkSize); + nodesDebug("query id %" PRIx64 " allocator id %" PRIx64 " alloc chunkNum: %d, chunkTotakSize: %d", + pAllocator->queryId, pAllocator->self, pAllocator->chunkNum, pAllocator->chunkNum * pAllocator->chunkSize); SNodeMemChunk* pChunk = pAllocator->pChunks; while (NULL != pChunk) { @@ -137,43 +125,101 @@ void nodesDestroyNodeAllocator(void* p) { taosMemoryFree(pChunk); pChunk = pTemp; } + taosThreadMutexDestroy(&pAllocator->mutex); taosMemoryFree(pAllocator); } -void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator) { pNodeAllocator = pAllocator; } +int32_t nodesInitAllocatorSet() { + if (g_allocatorReqRefPool >= 0) { + nodesWarn("nodes already initialized"); + return TSDB_CODE_SUCCESS; + } -int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId) { + g_allocatorReqRefPool = taosOpenRef(1024, destroyNodeAllocator); + if (g_allocatorReqRefPool < 0) { + nodesError("init nodes failed"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +void nodesDestroyAllocatorSet() { + if (g_allocatorReqRefPool >= 0) { + SNodeAllocator* pAllocator = taosIterateRef(g_allocatorReqRefPool, 0); + int64_t refId = 0; + while (NULL != pAllocator) { + refId = pAllocator->self; + taosRemoveRef(g_allocatorReqRefPool, refId); + pAllocator = taosIterateRef(g_allocatorReqRefPool, refId); + } + taosCloseRef(g_allocatorReqRefPool); + } +} + +int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId) { SNodeAllocator* pAllocator = NULL; - int32_t code = nodesCreateNodeAllocator(chunkSize, &pAllocator); + int32_t code = createNodeAllocator(chunkSize, &pAllocator); if (TSDB_CODE_SUCCESS == code) { - pAllocator->self = taosAddRef(allocatorReqRefPool, pAllocator); - *pRefId = pAllocator->self; + pAllocator->self = taosAddRef(g_allocatorReqRefPool, pAllocator); + if (pAllocator->self <= 0) { + return terrno; + } + pAllocator->queryId = queryId; + *pAllocatorId = pAllocator->self; } return code; } -void nodesDestroyAllocator(int64_t refId) { - if (refId <= 0) { +int32_t nodesAcquireAllocator(int64_t allocatorId) { + if (allocatorId <= 0) { + return TSDB_CODE_SUCCESS; + } + + SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId); + if (NULL == pAllocator) { + return terrno; + } + taosThreadMutexLock(&pAllocator->mutex); + g_pNodeAllocator = pAllocator; + return TSDB_CODE_SUCCESS; +} + +int32_t nodesReleaseAllocator(int64_t allocatorId) { + if (allocatorId <= 0) { + return TSDB_CODE_SUCCESS; + } + + if (NULL == g_pNodeAllocator) { + nodesError("allocator id %" PRIx64 + " release failed: The nodesReleaseAllocator function needs to be called after the nodesAcquireAllocator " + "function is called!", + allocatorId); + return TSDB_CODE_FAILED; + } + SNodeAllocator* pAllocator = g_pNodeAllocator; + g_pNodeAllocator = NULL; + taosThreadMutexUnlock(&pAllocator->mutex); + return taosReleaseRef(g_allocatorReqRefPool, allocatorId); +} + +int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) { + if (allocatorId <= 0) { + return 0; + } + + SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId); + return pAllocator->self; +} + +int64_t nodesReleaseAllocatorWeakRef(int64_t allocatorId) { return taosReleaseRef(g_allocatorReqRefPool, allocatorId); } + +void nodesDestroyAllocator(int64_t allocatorId) { + if (allocatorId <= 0) { return; } - taosReleaseRef(allocatorReqRefPool, refId); -} -void nodesResetAllocator(int64_t refId) { - if (refId <= 0) { - pNodeAllocator = NULL; - } else { - pNodeAllocator = taosAcquireRef(allocatorReqRefPool, refId); - taosReleaseRef(allocatorReqRefPool, refId); - } -} - -int64_t nodesIncAllocatorRefCount(int64_t refId) { - if (refId <= 0) { - return -1; - } - SNodeAllocator* pAllocator = taosAcquireRef(allocatorReqRefPool, refId); - return pAllocator->self; + taosRemoveRef(g_allocatorReqRefPool, allocatorId); } static SNode* makeNode(ENodeType type, int32_t size) { diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 7ee6a5b223..940c676689 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -177,15 +177,18 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) { SParseMetaCache metaCache = {0}; - int32_t code = TSDB_CODE_SUCCESS; - if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) { - code = parseInsertSyntax(pCxt, pQuery, &metaCache); - } else { - code = parseSqlSyntax(pCxt, pQuery, &metaCache); + int32_t code = nodesAcquireAllocator(pCxt->allocatorId); + if (TSDB_CODE_SUCCESS == code) { + if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) { + code = parseInsertSyntax(pCxt, pQuery, &metaCache); + } else { + code = parseSqlSyntax(pCxt, pQuery, &metaCache); + } } if (TSDB_CODE_SUCCESS == code) { code = buildCatalogReq(pCxt, &metaCache, pCatalogReq); } + code = nodesReleaseAllocator(pCxt->allocatorId); destoryParseMetaCache(&metaCache, true); terrno = code; return code; @@ -194,7 +197,10 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq, const struct SMetaData* pMetaData, SQuery* pQuery) { SParseMetaCache metaCache = {0}; - int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot); + int32_t code = nodesAcquireAllocator(pCxt->allocatorId); + if (TSDB_CODE_SUCCESS == code) { + code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot); + } if (TSDB_CODE_SUCCESS == code) { if (NULL == pQuery->pRoot) { code = parseInsertSql(pCxt, &pQuery, &metaCache); @@ -202,6 +208,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata code = analyseSemantic(pCxt, pQuery, &metaCache); } } + code = nodesReleaseAllocator(pCxt->allocatorId); destoryParseMetaCache(&metaCache, false); terrno = code; return code; diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 35903d45b1..0b6128ddec 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -33,7 +33,10 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo SLogicSubplan* pLogicSubplan = NULL; SQueryLogicPlan* pLogicPlan = NULL; - int32_t code = createLogicPlan(pCxt, &pLogicSubplan); + int32_t code = nodesAcquireAllocator(pCxt->allocatorId); + if (TSDB_CODE_SUCCESS == code) { + code = createLogicPlan(pCxt, &pLogicSubplan); + } if (TSDB_CODE_SUCCESS == code) { code = optimizeLogicPlan(pCxt, pLogicSubplan); } @@ -49,6 +52,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo if (TSDB_CODE_SUCCESS == code) { dumpQueryPlan(*pPlan); } + code = nodesReleaseAllocator(pCxt->allocatorId); nodesDestroyNode((SNode*)pLogicSubplan); nodesDestroyNode((SNode*)pLogicPlan); diff --git a/source/libs/planner/test/planTestMain.cpp b/source/libs/planner/test/planTestMain.cpp index e86bf90f51..df6e72ce46 100644 --- a/source/libs/planner/test/planTestMain.cpp +++ b/source/libs/planner/test/planTestMain.cpp @@ -32,6 +32,7 @@ class PlannerEnv : public testing::Environment { generateMetaData(); initLog(TD_TMP_DIR_PATH "td"); initCfg(); + nodesInitAllocatorSet(); } virtual void TearDown() { @@ -39,6 +40,7 @@ class PlannerEnv : public testing::Environment { qCleanupKeywordsTable(); fmFuncMgtDestroy(); taosCloseLog(); + nodesDestroyAllocatorSet(); } PlannerEnv() {} diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 65aed6cadc..73d695195c 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -129,10 +129,10 @@ class PlannerTestBaseImpl { } void runImpl(const string& sql, int32_t queryPolicy) { - SNodeAllocator* pAllocator = NULL; + int64_t allocatorId = 0; if (g_useNodeAllocator) { - nodesCreateNodeAllocator(32 * 1024, &pAllocator); - nodesResetThreadLevelAllocator(pAllocator); + nodesCreateAllocator(sqlNo_, 32 * 1024, &allocatorId); + nodesAcquireAllocator(allocatorId); } reset(); @@ -166,13 +166,13 @@ class PlannerTestBaseImpl { dump(g_dumpModule); } catch (...) { dump(DUMP_MODULE_ALL); - nodesDestroyNodeAllocator(pAllocator); - nodesResetThreadLevelAllocator(NULL); + nodesReleaseAllocator(allocatorId); + nodesDestroyAllocator(allocatorId); throw; } - nodesDestroyNodeAllocator(pAllocator); - nodesResetThreadLevelAllocator(NULL); + nodesReleaseAllocator(allocatorId); + nodesDestroyAllocator(allocatorId); } void prepare(const string& sql) { diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 345f4680b0..9880490594 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -673,7 +673,7 @@ void schFreeJobImpl(void *job) { destroyQueryExecRes(&pJob->execRes); qDestroyQueryPlan(pJob->pDag); - nodesDestroyAllocator(pJob->allocatorRefId); + nodesReleaseAllocatorWeakRef(pJob->allocatorRefId); taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->fetchRes); @@ -725,7 +725,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { pJob->sql = strdup(pReq->sql); } pJob->pDag = pReq->pDag; - pJob->allocatorRefId = nodesIncAllocatorRefCount(pReq->allocatorRefId); + pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId); pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillParam = pReq->chkKillParam; pJob->userRes.execFp = pReq->execFp; From 5680c03c1783db19eb313e471558972c444f7230 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 21 Sep 2022 17:07:22 +0800 Subject: [PATCH 07/12] enh: added memory allocators for parser and planner --- source/client/src/clientMain.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ae386b819c..6b707bf7a0 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -65,6 +65,7 @@ void taos_cleanup(void) { fmFuncMgtDestroy(); qCleanupKeywordsTable(); + nodesDestroyAllocatorSet(); id = clientConnRefPool; clientConnRefPool = -1; From 876303be5621c6b84b41bf5f8c24a851035ece40 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 21 Sep 2022 17:42:03 +0800 Subject: [PATCH 08/12] enh: added memory allocators for parser and planner --- source/libs/parser/src/parser.c | 4 ++-- source/libs/planner/src/planner.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 940c676689..2fe6ebfb79 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -188,7 +188,7 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq if (TSDB_CODE_SUCCESS == code) { code = buildCatalogReq(pCxt, &metaCache, pCatalogReq); } - code = nodesReleaseAllocator(pCxt->allocatorId); + nodesReleaseAllocator(pCxt->allocatorId); destoryParseMetaCache(&metaCache, true); terrno = code; return code; @@ -208,7 +208,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata code = analyseSemantic(pCxt, pQuery, &metaCache); } } - code = nodesReleaseAllocator(pCxt->allocatorId); + nodesReleaseAllocator(pCxt->allocatorId); destoryParseMetaCache(&metaCache, false); terrno = code; return code; diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 0b6128ddec..e4f02f12e6 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -52,7 +52,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo if (TSDB_CODE_SUCCESS == code) { dumpQueryPlan(*pPlan); } - code = nodesReleaseAllocator(pCxt->allocatorId); + nodesReleaseAllocator(pCxt->allocatorId); nodesDestroyNode((SNode*)pLogicSubplan); nodesDestroyNode((SNode*)pLogicPlan); From e8a4f50c0a65559f7055b048693cb0754769316e Mon Sep 17 00:00:00 2001 From: tomchon Date: Wed, 21 Sep 2022 18:50:25 +0800 Subject: [PATCH 09/12] modify checkpackages scritps of muti-platform tes --- packaging/testpackage.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packaging/testpackage.sh b/packaging/testpackage.sh index f604def9c4..20f93ecaec 100755 --- a/packaging/testpackage.sh +++ b/packaging/testpackage.sh @@ -215,6 +215,7 @@ elif [[ ${packgeName} =~ "tar" ]];then exit -1 else echoColor G "The number and names of files are the same as previous installation packages" + rm -rf ${installPath}/diffFile.log fi echoColor YD "===== install Package of tar =====" cd ${installPath}/${tdPath} @@ -251,6 +252,9 @@ if [[ ${packgeName} =~ "server" ]] ;then systemctl restart taosd fi +rm -rf ${installPath}/${packgeName} +rm -rf ${installPath}/${tdPath}/ + # if ([[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]]) || [[ ${packgeName} =~ "client" ]] ;then # echoColor G "===== install taos-tools when package is lite or client =====" # cd ${installPath} From 16be3ca310777b9159c640b77370af0defba0201 Mon Sep 17 00:00:00 2001 From: robotspace Date: Wed, 21 Sep 2022 23:04:48 +0800 Subject: [PATCH 10/12] Remove error about stream in lua test case. (#16990) --- examples/lua/OpenResty/so/luaconnector51.so | Bin 22472 -> 17672 bytes examples/lua/lua51/lua_connector51.c | 73 +------------------- examples/lua/test.lua | 21 ++---- 3 files changed, 9 insertions(+), 85 deletions(-) diff --git a/examples/lua/OpenResty/so/luaconnector51.so b/examples/lua/OpenResty/so/luaconnector51.so index 442de6e39f909e1aeb869988722b84795c048855..168d3a9d2406680ceec3c12f29a8157d19aca2ff 100755 GIT binary patch literal 17672 zcmeHPeQ;dWb$=_{AdK+}upGw*yf}Oq1B+~8Vm@3}w$@&#O@y(Slq9oS?LJA1SG!{O z3EOl45rbi)D5H{;q?u{sbW&G^HWf4wL5mSzwji`42zV5lx~egZwwOfN1QcSd{?5JU zyuG{LM>?7Qk?H7a_MLlv_ndRjJ@>x*-n;Kz-5uVruFB`LB+an?$SQJ{72<;7)gnRI zD(h4n&$iA|vNMZCC*}6FS1d5?ZazC?Cnse)gxF*0G$>a=lU3E2rnN zo=_DFU^l((Bz~Xz8dPrxpY@Yi=E{ngUQ*B zlj&4=JZr}s+fh!DJs77VCzi=Ya_wO3Hm(9zi+)j| z;R~^;WLvw_m9w*nm=j~CIx}P;tE;%J%g%0hYfEQzR@sCvU1|2D)pp|Th!8@M(;~6f zR`_WrqS?&Woa-AmST#lKr0jH?(_WM?qeUL3lPRT5@lg>mbYYtP$=Y@_Yv;OBjup#o zPsgLm?`S-hO0~q|Ta<-tY%3y}RCy|(%m!$+S{>jracxQ~#xxU8Wpb*6Go5xi3V9BH zCcARER;K`WrQ`5F*+QNLbD1EDk}Yw{U16=?aNU}<(G|fJ!Id;5aoyv;Vonf~etM}X zsZRhxwi@SB*TN(&wN=(Vz$nRmliat|ovHC{e19bP1W%IPWw`LW%JBUGZKuBszmn|_ zl;H&mEnh2UhZeb&jV*^`H?dD!z@2qhEJZ&_OI2H?j6+qtY)oY1E0$n zkr4x@`wmH?25w$g#tghxgh3aCG;=ZtPgx2G7&v#8!j~Gj+~QI}oq^N+l%!Q5og@3y zHc6q7&Jj*ONs*Aw5hgJuq%lNCEi3nG#D6J7M~%M48`YM1FtslB?ew&;GZ+_1qObefd>ryd;?!<;1?Kpoq=Cy;HwOL zse%80%O3Tg^KoRy;p#}wtVb`itjK*sPSw;%WXB8D&#R_Rtvm>L>ik1E`WJ_gBfg!= z3S(19=f6q3kN6SAzeao&@k5G#nRuFV3ImFNj(D1C3cD2l6!A257Wx$b81Xda6uK4v zF!3~83Z07oYvO5&DKsnod&JY!QfO5CgT&7!KBV}s6Hik~p-%B%Bc7&^LO}6%5KmJ_ zp+@mp;%Uk*Sc<=uct7zI#{rz*LOe|!g)zn7LOe|wg%QPXBK{2GhZKK3@icW51{A-B zc$zW_yA*#F@ibKw`V_x{c$y*#-HKmEJWUOSPQ@=Fo~DFCv*H&LPg6mmQSoOGPg6i4 zr1;sy)2UymQ+yTibjlY3ivKtWo=){bjpC0GPp5dnQv81tPp5Wa;uC8BLE;0%k1765 z;?E_1MDec?Pp5EUNbxTdPp58SK=IEJPp52Q7x+l;>L&q2dS^XG7DF+J)fO6x zr%a9c7k5L}dY&od^Gbf@*N9wnJd&S?Jp0RQBhOCGi1_wKUOn#2gMm-*{6_|=r^Z_S ziyMmiG~T;choNhAUECDevHDF&T9N$Q&K#IdFSD%ii_q-x1e&^c*4dEreHC@P^^<;K zD+-&SgA=DQlD~5>vM)SBPDXmdgHO>=-b3d@B)@O`Y1kq((p&o=#+-}4cueDaU}=g3 zUjP7q7b+8E%PpBrCi3A)Hq&UDxvSjFC)&&c5_T>013cotWGEm01ska`ja*r7WR*5j z!I!(J$MWF`HuD-zE4N8Uep}Y0_W@LH(igOuaW>Ount7z$%)_2$Cdl!8cuX~xVn%^{ zgj@5J3ZkHovsIG#J4ociuk?g};r|Jux)H5uf{iY6XRteYR%w^#bK8f`Z5d^wSMGk9ponLkv82}V!p=}d=hx!>)PW%SNdp)*-fr=k2b5ESxou7>CEa9 zt#e(iCZ*LhV)*@BiRdxR$hx)5AUa6r!%NTeIn#`DrXQO9?%3I>DpupFUw0f`Kqwk> zbP4qhmq_yu{ze77pQQGq?tJ(VI+6q@B07?dT6YxCC`pYesn>Vh`MTx5_nSIEs^CHt z=q|k;ZbMXeX^qrS?9zV#)Uy#4h5K2e_b@W^LvZCjvB6kL-PHGybfwlcmPpU4PirbjE={MuGSu_$?*}i z@7=afuaHlx0IGZU8V}JE`{w;$L;N&d)xPnYP){E*d+4r&hVFk+xvy_Wc+v-(SBFRZ z-`GqSDpTfZ$N*4hr;xuvkEIHxN0bV*6hACHV)s-ICCCo!=LY(Yrr1(tinHbf6UZ;JSD80h{>;vT)=DvO7`yi1IAKr22E7tbA zpo=qR-}o9*Q`apUci%wtBzLjs6(_l2Ks_4|Q>8cwE`+q6u9~@GfVU|VYI0jxB7GcF zgqB_b>340Z{a)qLxN3+x)8J8aiRLq|<|N_aaXs$QAXUxfCBl!p!aoI0R&_Km(y3|V zVW55P1&4x+VHO^CV`wXPjwwuZeAGN0^WoBq(EU_L9v;C&Y1F@ZRMkFAb>0XmT!fyX zXM^#1RFFr1QlIuTL4Z!n(VnJbdFsw4fa-<=&3iS{Qyaz&GuikLvDZ_33!3PjS+&6R zfEq^hzl|wU2l-z`zPI*$fE=s)kC@V1TWt-b zTQk{&c-tCFrtCz($pm=CIsmEL?d&B1yj#d+(pR0In{CB23ERRugiIFMblTENB_gtl zZzUbO1IBN|ZOj_{cH<6V6u;TH`?w0fcKmkZw;#XHpqywf(_uS!8_^a><^o%@8N3k5 zY_ZZ^sZ@YpF$Apy-o6A??etPZzif~dc)j>8Ei&oEGrmPLu5JKC3k?f?H8s@?x*VPP z04PlsbX6l?he7w_zH%ItotvB~mntMM(Gb8>Yb%FaU>J#-KJ z61FjRt+_Sp<}SG2f9lriZtL2GS1-H#yd^-D4EgZ`jIqyS2C(Yq*6gfWd)i4eE}H>U zs+>CIaZEOIa9t^upFB&-?gwN3)~C~S1g0Y}9f9cxOh;fk0@D$gj=*#TrXyfRK)#bH|1QRqG4t|0 zjC}ucAfQX-`<6q@%l9jXnV0WVv~=;vk15uz^bCR|Jbmj!2``3p;*TTf#VDn(@XLMq z=0Lvxk#7p*-hZJMx8yq@dgh@d<-a~Yl~Dja+aS>+45d1H@IsPrO_p*w-Z-e#%l^qX zAqyC1G0O^RfW@qBTp-V5Y~GS@EC$(-;H$VI@w1x)gB5<2q&__ADgPI+Jk|~sFL9h; zeh(|+F-(<<-)!y=hD@xk?8+6~|Ld6E%(Rv1?M!zveTeC9rcW^4%XB}}cbI<2^d$b+ zWId3oc!L5$d7wrJ9Zb zv%da3-{rC8RyW%cT1iGerk&4mt0W`mXoAnxN-|Q1%qNv(Zee|)zpsq=y697U7GING zC-p_8MCaE`>;0+nGMb;NFl$M>q}^W@vG#|0pLCiwnhq6S*j#|SjqS;**798)KOLVB2UJo z;Ij}|pM}p`LV{?zh;fO(lwPRtGcEaERPfI$e4bT4f2{^yWnpsWF(>kCftL*hRdh4( zGEs%o{)b9^!LK7=h>PUjbioB5XbBAwTSazu|!&1YRi) z??IBJn&_}D~+qGJ@9oNb~by+w|U@nUscKfogVm~d)V3KAwS@O z|BDCyqKADeo^@KVDIXGEQjC|(1k?Ho38Br?&qRHh}CiYA;)HW!U`ZNr;` zj?R>gZ3e-*$`Z6|BN>flv$5?_JMCaELu)qHVMh~P9Ua@DVsKGNIwhjC`NH_(KNz=C z!Xu~OR-~fFh65_sTLzLX)FeBdpaw;w>o(PI3`fH^G|(QDDDFR^;Ru6~hE108(u%I% z@WnOt8=_xax9+C!=IG}7H5lb`WL1;#MCrs5XbF2Ji-(XvzK`uei9oQFzFd?k5qq z^&TIO{Up?|VsG)jAJQ!DBGQ4SjW*`~8#s!+Igs3-OsE|xN>X0n>1{Y#+1Mo)+3cmXB<1&mhK96W)X(0 zv)u~1P)k=bmAEvSu#`Z1EZ1%Y6Wh};rYUwisS>=(%q27F5+;hWteuLH0_QrhA1Fwt zwH0*iZ8*|wfU=C*?i94KWl7Bj?Fr}#r#19Mpfw~18$)c4xAEb=oY z!?Ye0AIH#TUnk7#DluKktmx4*iJATw3_W9uzN{Yzt!719Z@{gN>%TnL-vo@R5`9^Z z5Gv~r5K)P2nCZ)4Xe~hW<@sOe%`7bbi=5DQl+)UR@Up%jG{E}h{xfXRa|hWLeOb>C z+RYA2{SrTE|2?d~h7HR4h)`KiA$|Hdy;F5kyoHKZM_jjHw@B zJ)zCgFjkOz5TSoorr*ssM?&9p7q+wk(PMMgcgpnTxkad~gNb~(|9{E)uA2dtWJ=!& zm`UO%*xw*SBBC$PQ$ia=kZF1S|5T>mc&-){I`enb|1`ws;wPt_tnbM_{c?YW7y1%( z>6u&fW&Lm4r2vscPIEDdwnme)mw47H-zyBJoadw&iWvul-Y_COy literal 22472 zcmeHPe{@vUoxe!{BOntHs0Dva5nMo=4R}D5ViRD(pb17K9_iLjCdq`%I+=+x69p@p zU6eXTD&p4K{iBENww86fyU{A5v>4DO(b{daT^c=ho4Vkck#3EGY-%a9pYOf*`*PpQ zdwO=y?&Z>Iw~bxg~(m|~=DxsJz4=W0Esq)Vm#c&3H+>laIQ z46RkLV9Itk!cNAi<)214?HhD`UU}K2<7cXyE88|5l~B6Yu5S=O_}-0I{py#GPCvJG z`qA44=3eyNO?zLa7|8qzY-A_ndq9YC;RmPiQxA;n{PCmJ&r87N=P6*iE7+tz2FGnM z1uIaYoBk6X`u8F*H~ol*AF9)DR;c%@pzjvXb`QM9qu!(N=N8XBz}@_x>*4=^hyFAV z{r5fUJ%u>k>ZJqghEMR&KkX6ELmoKX2tPJw`6d9jdUt!&`wt%Yau2*6xLf{@dEl2K z4{rJm9&yg`sQ2HY@8;)5k9uG5z!!SN^EV##e#yhnBE;jz<}9yx=%4T5zXlHGIGuoR z!RB@xZ}5o!b`SlxWIU5ZX11ziqMpCw($6F@RaaiE#6pC_o7 z+!9SiH^Uc5{-56<#N0rQ75^mlS3Ae@)k$CK`sEBt)YQxQK(dOGSTRS8h z-W<)e#-i~Sl7(aGbS4!`Y?e$W88^6CI?>tQ6isQ7!K9;^OtM2{BFS_(mWXAfMsq3} z$wV_UJo1ccET!gTA`xxQD2r$+g<3=hs?D@Y(~iz`8<^;3T^an3Mo2;^ zbTrix$wYK`9ZB+#QdQjE8BJ}qYD*;5uJjphPIe}AB(2d*a~pDnWTcf!q_q`sMq9$E z|Rp!(P( zQ%e^UTpOnBcP5(A{8$s21k%X>M6sr3+FK^p)~;K%I=m#XB(PNXTy$uU^52)RMTtBw zlUv&R7GNHeowH)+(%E~jR^{s@zP{-=Uj?&8Uq>-ssqroc&ioz+K0{yU>~-K#Tg@F$lxNee^<_!EYLx0%77Z~^j^@^c7F-2O%Z{XYuNI2iX`4X7$N(1L> zO~RKOxV(-h5T(k%sm)wM2F`tjWc3D)4ri5H41A)63ejlblMKAgz$Y7chk>7G;9CrQ zih*|-xX-{JFz^Zk-(}!a4g7HfH_z*P3|x0{X|>nD>AIRrkAcr%5OlwRf5yOj4g3NF z?>F$z8u);L&ob~q1J|ckDIGHK3l068fnQ|c!v?O0kd%%X_$7wExKT0GZ~X>dZs2<8 zN@<0G&oT5bFz`za+;8A>4Sc?V&ol5!1OF2PUvA*@4ZO<0&G8sA@P&qcy@6kD;I|k! zU5ou+m6N`UKMrmGO?jw${Oey3B6NRmrnE2++WvC+OR~zs(suwC<{iVY?~*F)5#L6# z{7?a9-dn`e6q6s2{2Rp6)ROO&{Hw&%l#=g}{9)p0D#`DW{0qd>6q4U1`KO7esUzPd z`6r2|?vU@0{I`jxDJ0)0`M)NfrjC5QzR?rxHIP`3=O=kk9u@{yO4msONhm zzlwMo;`u$2zlL}k+WB3QUqU=h{rN7*FC?CZc)mmObBL#*oo|%iUnQP~V7^E4hl!`5m)|4#7l@}Jm)|A%r-`RiT)s>4PZCc< zFW({gZxc^LF5d_~v~%S*0fcsr-%aG|k<1Kq<!xFKr^S z;B4s7*|Jb}ICLoYg;2?n(CcS2)8XJJ+Ch0?sMUALno)ZiFI_9=!B%u$-VoZp@=0il zQ1*CcGF%6)5hAw$S<1EG=g9a!;AvCIt*G0opX~E@Lf8Ns47~bKcH90?UvPke40Q+h zzd%FbAdRO`wlB95hk($~&KVnU%nrh^K={9E+yagPz}TH7U63zJ^4=LT365wt737B9 zf6eKp4k?A33rN}W5CZt2Z&7bH__lU3fc|7TS?6?e3jp_6zN}&y)^2v2Zc3eQrh2;h zjB=CHZt6`p2dGn+(Z8yi)!7l!`7>1srqpc%!gv2ak(=(|0ExO}GmfC&d<)}M zReCqO?>oZB`tJaB*S$uepc_06)$G5J`Clki>1wHTby(_TkCOCJAfr~BEh}mtsaXwr z&*pxOr8oz8{y`P)sP=_6?MasQDWrniwU?^Mn!!x41@jhWE7gR9(yC#=h@0{?8uzo7 z>Z4HIPyP0lm-NtS#L(Fb%e_`l{3*H12B~}PeF3WCAf1OO8^?bqC3Il4gv2B{K@y;} z|EJi`29Mz=lHzbkWiqR52LTO|)|;p$8~pk9Z9f;j`?|@E7WByK>3EhQY;`=p&+bQ$ z=bM1K>re|S-8rKlQ@KjUQ%V)p_T{3|6{Qe?$>aGFkWs5|SXN(wmCQ^2Z*h|C3!ae; z(-9?PsFYqA`HNITHrUU6a)es)GSy;rhZVNy7FzY3PdId&$r}eX#is1Hl;4G8Qd0+k zQ{4qL|2z)K_TZV44}A-RXOyeH+-X<+dZ2y55i-w5VTM5!dOWmq`akO??hln97`kTh z-B+dM2roMjIy5wnWQT@IRX^+}E^DSUBbzfq&GFFp?W?Nx?d#n>GST-zAH6-WJG$ z?Z9-B{%5`QL){g|VO&evC(tyHcux8%#7W;5d--rl{R7_mA3~pXXOr)TJ$216hM;<4 zvquc}h`)Xss-O2z9h>l0Qa_JF3Zb4|#2mz_0Ry5hw}a#*yS4{MO5k%vaKQJ_!|FUT zs?!7=7|DV3F70PAA8+5bACvkbr7J~~;CytP&cubC8r?tp$T@%2WB%$7WjlLA*@pe_ zjD0zEf$z&rS2fT)WbtxpA{9(rhyycJcYQg9bZ~f3wPQUhA;3QF%7N;ML64&|pE8cl zc#MmjHIC3$IV~+hoZT46?&FB`KXDu_z=bf6ql>Vw$B}D%qlc9%fA|#omKsOy7S=do zi(AP;jU!hJ9!IQy1?j7CLR*a^S8Zb)!35V2q_4)2tG*sbQuk-1tHzO= zE{!7&<8cH{BbAlAOlT8H}SdD}u+Y zaiqH>W+ZgpzTC4P!qhsGSILop5ny%XAmmwje(1N(?jHctL!{q+c0Wf+Xt4Ajq+3m@ zVh%uc)k>#YZ2_4Se)NGHC>U~mxvR0C4Gz7CvzN&IP`Oek`|buP$aHl#oXOUgcjMZC zrrA1(oX!0)OVJNJ|8*66f2;5PUaIGMoAN|U`6S}zHhzz4$eOW?x5e@%Zl_fXex$6B zPh13M>yhgQNBl)9=gfG6E1=RRsnAj}#d3*xvxT zOuy9)$NIR3!c+H7Is2UY40%U&p_s&24U zmqFE@Po!&`Or5>iKT>s%NHnx_>8+Qm$Qyub2LhK0`k9jT6p4Cyu@o~<%s?>%#S9cP zP|QFv1H}v!Gf>PxF$2X6{4q1omP}_vM>3TW^qOB&Bpns>a+~Ogq|(&_`<9e z3$L6z2dLD+-`C=iUl$51=+Rj9l!_gttIwNIwyX@Uq?``&_m33{z2i*zfpILq4{$B= zFW;@T?8jU7-+_D?^1qtmu;ka4vi-||H{k)n8c<7qFG*E=$3VA1KAGBV*`HIw_TzxZ zkeh0VE&1Kbo}gkWW}ujXVg`yCC}yCTfno-V87OAp|7!;LyBX(qGG6iW_c8oE41fQ^ z-@EYlE&M$Tf4{=ttMKwop?pdg?{G+plB=L>9K6?9=%CP=;gufwqK+E}CkY_YMNvR9^ z^PURb;>c33h*lGWTEO>H^g=lm{&r$eJ7Rpku88C8(TU-2RJcC8qblPs*ZO#&Nb;OV z+{#J*JK7Mhok^basG|5I{;qegdzn7oHJaY6X{)ApXu3mFdMSa*6PoVR^oXWMHGNmp z)0$48H|J4iYI>Qb%QUUg^kz+4HN8XA9hyF-=@Xjn)AWd@M>Ty{)6<$x&>!c_)buh< zmuXs~>CKw9YI=vJJ2ZVv(*Be_ z&CdpH&-8z)!?Di(gqS(jh^|vb?)_Va*^Ad%Fty4=sWrdY@iM{liX9(k%|CX0yx?(c z$0rCL|91Qw!Sje6FBd%D*l|n}Iv;j?qVT)oldS$>*Pkr-c-wKzI+Z4(_RGW+Yn=&G zS|)sA&tel{{R(ToH>G7_sx{Bs@o9p;|Fq-yEXHO2DZ^7Q{%+E)kGCo7O+@XNiJ7AD zMmL;ice|_%XN$Jm?HJA1j$0gf+PUC!jXZ$Mclo)aH z`JqPQWAjGOM#(>4FR-7jQh#jz9|!Ih|C8Dek7@Y~5ZE&s=j#ah><<{dsz&wl^@Dt- z2kcdiAM#@dMLx>|eG?n$^Yw>(h6n7|*h;XX0e|n?2%d^po{vkJPxLO4YPKJ@cAbPq zDXvbO{rq>*&)EJ(>r#}6vHi^_55(C1PwO<4h#6!0RY2p;ei#B?idAu(<97{kM;DQ8 zTMyi2NQ+G#`nP%L-{GO3^}zoMxL-zK|Do#}w|HLkz<&bVtzCm2_;C+ECq3{p9{6Nj z(7DBV5pcJ77JKNg^uTL8@S8mFW)Hjr_%w_Q=eiVk0e8!P*2B+Z9{6`X@TWcSUf^!| zdBX#L#{)m@;eR}?uicKzY~Yj!=Qv?YQjm z(0|Iqe=qPkW1A)3_R!CJ`2W3!zGzNm(wWZI)Z;jLT^9@o*RP=^S|~0G zIh_o*MG`HvGRvAx>#OV5tpu@ln4MJ-+0Ky_JY`V7ghE>(I;TvjdZVGM;Z>(Nb z8w{(JJ**{F^b#txx<^%&x$Fit$y|hm|GHGmmKf`Ne9{^+P1G3Wn!$AO2Uo>VC*ZGsMTJ^w!&@k6S)G3wL*<^aTse^n$cBN91G{DhB+5)8C}c9 zTDL_h+E$JkyCjUUT8mnKWlSS5`#DU{= zXr!6Bu#G(8t}6p6RbyEiYCicvB54Jnux;g_F|8b3H%L0Nj}UmWRuU3{^wxI#VGo)~ zDN0qyy+kq-4Qx(y2AVo!@s=xMEkbf_k#w5~v}{elQc)}rBqjJCKOIXZY)lxkR5TtT z1HIRQRdxb$+ypYwE!dX>GmuKk6@mg$tnwo#jJ6ioGN&xmT%jy@pRA|~IFR8)+GEWK zAqmI$3CP--(rFREK&%@wH~H*S?IpmEX%>rBPh1#`%{%^z4f5`97EFm$fp-&w5N7Ag5>|Y4^4Z& zwqv@78>Wr;+Jxyn4*OnxvBq?_&#Z0iwatAFd%iDX>U`eljQ>GxZ-rT@>(jK}(P23M z9Mx`zy>tCadMCy#&ieO2NL6tC{M>=*+)t^W?sMs$ocjkqkLGnPo$)fybRTRf#ca>d ztG8eU2P&+mxKW<*!w^u2Y|qa-y0m>48*1uz@IM7leUI&}=OWiC#R1)Z&KuXu{bvwr zX8rv9WB{vuP?@iY^~?55e+h|cZ{6QkDU(KBKRaMMrtd;P`y4+%k7=w{7Ocni4tL@N z1cY(7V?DNC#DJ!3&-4^ #include #include +#include #include -#include "lua.h" +#include "taos.h" #include "lauxlib.h" +#include "lua.h" #include "lualib.h" -#include struct cb_param{ lua_State* state; @@ -60,7 +60,6 @@ static int l_connect(lua_State *L){ lua_settop(L,0); - taos_init(); lua_newtable(L); int table_index = lua_gettop(L); @@ -137,19 +136,15 @@ static int l_query(lua_State *L){ lua_pushstring(L,fields[i].name); int32_t* length = taos_fetch_lengths(result); switch (fields[i].type) { - case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_TINYINT: lua_pushinteger(L,*((char *)row[i])); break; - case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_SMALLINT: lua_pushinteger(L,*((short *)row[i])); break; - case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_INT: lua_pushinteger(L,*((int *)row[i])); break; - case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_BIGINT: lua_pushinteger(L,*((int64_t *)row[i])); break; @@ -159,7 +154,6 @@ static int l_query(lua_State *L){ case TSDB_DATA_TYPE_DOUBLE: lua_pushnumber(L,*((double *)row[i])); break; - case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: //printf("type:%d, max len:%d, current len:%d\n",fields[i].type, fields[i].bytes, length[i]); @@ -241,67 +235,6 @@ static int l_async_query(lua_State *L){ return 1; } -void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){ - struct cb_param* p = (struct cb_param*) param; - TAOS_FIELD *fields = taos_fetch_fields(result); - int numFields = taos_num_fields(result); - - // printf("\nnumfields:%d\n", numFields); - //printf("\n\r-----------------------------------------------------------------------------------\n"); - - lua_State *L = p->state; - lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback); - - lua_newtable(L); - - for (int i = 0; i < numFields; ++i) { - if (row[i] == NULL) { - continue; - } - - lua_pushstring(L,fields[i].name); - - switch (fields[i].type) { - case TSDB_DATA_TYPE_TINYINT: - lua_pushinteger(L,*((char *)row[i])); - break; - case TSDB_DATA_TYPE_SMALLINT: - lua_pushinteger(L,*((short *)row[i])); - break; - case TSDB_DATA_TYPE_INT: - lua_pushinteger(L,*((int *)row[i])); - break; - case TSDB_DATA_TYPE_BIGINT: - lua_pushinteger(L,*((int64_t *)row[i])); - break; - case TSDB_DATA_TYPE_FLOAT: - lua_pushnumber(L,*((float *)row[i])); - break; - case TSDB_DATA_TYPE_DOUBLE: - lua_pushnumber(L,*((double *)row[i])); - break; - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - lua_pushstring(L,(char *)row[i]); - break; - case TSDB_DATA_TYPE_TIMESTAMP: - lua_pushinteger(L,*((int64_t *)row[i])); - break; - case TSDB_DATA_TYPE_BOOL: - lua_pushinteger(L,*((char *)row[i])); - break; - default: - lua_pushnil(L); - break; - } - - lua_settable(L, -3); - } - - lua_call(L, 1, 0); - - // printf("-----------------------------------------------------------------------------------\n\r"); -} static int l_close(lua_State *L){ TAOS *taos= (TAOS*)lua_topointer(L,1); diff --git a/examples/lua/test.lua b/examples/lua/test.lua index ff49b8408d..3d725cc6a3 100644 --- a/examples/lua/test.lua +++ b/examples/lua/test.lua @@ -173,16 +173,16 @@ function async_query_callback(res) end end -driver.query_a(conn,"insert into therm1 values ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) +driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback) res = driver.query(conn, "create stream stream_avg_degree into avg_degree as select avg(degree) from thermometer interval(5s) sliding(1s)") -print("From now on we start continous insertion in an definite (infinite if you want) loop.") +print("From now on we start continous insert in an definite loop, pls wait for about 10 seconds and check stream table for result.") local loop_index = 0 -while loop_index < 30 do +while loop_index < 10 do local t = os.time()*1000 - local v = loop_index - res = driver.query(conn,string.format("insert into therm1 values (%d, %d)",t,v)) + local v = math.random(20) + res = driver.query(conn,string.format("INSERT INTO therm1 VALUES (%d, %d)",t,v)) if res.code ~=0 then print("continous insertion--- failed:" .. res.error) @@ -190,17 +190,8 @@ while loop_index < 30 do else --print("insert successfully, affected:"..res.affected) end - local res1 = driver.query(conn, string.format("select last(*) from avg_degree")) - if res1.code ~=0 then - print("select failed: "..res1.error) - return - else --- print(dump(res1)) - if(#res1.item > 0) then print("avg_degree: " .. res1.item[1]["last(avg(degree))"]) end - end - os.execute("sleep " .. 1) loop_index = loop_index + 1 end - +driver.query(conn,"DROP STREAM IF EXISTS avg_therm_s") driver.close(conn) From a6733dbbd1d68f13d5d64f603ecb4ffc5f75d096 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 21 Sep 2022 22:53:35 +0800 Subject: [PATCH 11/12] feat(wal): auto fix corrupt file --- source/libs/wal/src/walMeta.c | 96 ++++++++++++++++++++++++++++++++--- source/libs/wal/src/walMgmt.c | 8 ++- 2 files changed, 96 insertions(+), 8 deletions(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 5284aeff77..a01869fbe0 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -116,7 +116,6 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { } #endif } - // TODO truncate file if (found == NULL) { // file corrupted, no complete log @@ -125,8 +124,20 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } + + // truncate file SWalCkHead* lastEntry = (SWalCkHead*)found; int64_t retVer = lastEntry->head.version; + int64_t lastEntryBeginOffset = offset + (int64_t)((char*)found - (char*)buf); + int64_t lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen; + if (lastEntryEndOffset != fileSize) { + wWarn("vgId:%d repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset, + fileSize); + taosFtruncateFile(pFile, lastEntryEndOffset); + ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = lastEntryEndOffset; + pWal->totSize -= (fileSize - lastEntryEndOffset); + } + taosCloseFile(&pFile); taosMemoryFree(buf); @@ -226,16 +237,87 @@ int walCheckAndRepairMeta(SWal* pWal) { } } - // TODO: set fileSize and lastVer if necessary - return 0; } int walCheckAndRepairIdx(SWal* pWal) { - // TODO: iterate all log files - // if idx not found, scan log and write idx - // if found, check complete by first and last entry of each idx file - // if idx incomplete, binary search last valid entry, and then build other part + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + for (int32_t i = 0; i < sz; i++) { + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i); + + char fnameStr[WAL_FILE_LEN]; + walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); + int64_t fsize; + TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE); + if (pIdxFile == NULL) { + ASSERT(0); + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fnameStr, terrstr()); + return -1; + } + + taosFStatFile(pIdxFile, &fsize, NULL); + + int32_t left = fsize % sizeof(SWalIdxEntry); + int64_t offset = taosLSeekFile(pIdxFile, -left, SEEK_END); + if (left != 0) { + taosFtruncateFile(pIdxFile, offset); + wWarn("vgId:%d wal truncate file %s to offset %ld since size invalid, file size %ld", pWal->cfg.vgId, fnameStr, + offset, fsize); + } + offset -= sizeof(SWalIdxEntry); + + SWalIdxEntry idxEntry = {0}; + while (1) { + if (offset < 0) { + taosLSeekFile(pIdxFile, 0, SEEK_SET); + taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); + break; + } + taosLSeekFile(pIdxFile, offset, SEEK_SET); + int64_t contLen = taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); + if (contLen < 0 || contLen != sizeof(SWalIdxEntry)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if ((idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry) != offset) { + taosFtruncateFile(pIdxFile, offset); + wWarn("vgId:%d wal truncate file %s to offset %ld since entry invalid, entry ver %ld, entry offset %ld", + pWal->cfg.vgId, fnameStr, offset, idxEntry.ver, idxEntry.offset); + offset -= sizeof(SWalIdxEntry); + } else { + break; + } + } + + if (idxEntry.ver < pFileInfo->lastVer) { + char fLogNameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr); + TdFilePtr pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ); + if (pLogFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr()); + return -1; + } + while (idxEntry.ver < pFileInfo->lastVer) { + taosLSeekFile(pLogFile, idxEntry.offset, SEEK_SET); + SWalCkHead ckHead; + taosReadFile(pLogFile, &ckHead, sizeof(SWalCkHead)); + if (idxEntry.ver != ckHead.head.version) { + // todo truncate this idx also + taosCloseFile(&pLogFile); + wError("vgId:%d, invalid repair case", pWal->cfg.vgId); + return -1; + } + idxEntry.ver = ckHead.head.version + 1; + idxEntry.offset = idxEntry.offset + sizeof(SWalCkHead) + ckHead.head.bodyLen; + wWarn("vgId:%d wal idx append new entry %ld %ld", pWal->cfg.vgId, idxEntry.ver, idxEntry.offset); + taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); + } + taosCloseFile(&pLogFile); + } + taosCloseFile(&pIdxFile); + } return 0; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index c939c8c436..a55f00d277 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -149,15 +149,21 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { walLoadMeta(pWal); if (walCheckAndRepairMeta(pWal) < 0) { + wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId); taosHashCleanup(pWal->pRefHash); taosRemoveRef(tsWal.refSetId, pWal->refId); taosThreadMutexDestroy(&pWal->mutex); taosArrayDestroy(pWal->fileInfoSet); - taosMemoryFree(pWal); return NULL; } if (walCheckAndRepairIdx(pWal) < 0) { + wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId); + taosHashCleanup(pWal->pRefHash); + taosRemoveRef(tsWal.refSetId, pWal->refId); + taosThreadMutexDestroy(&pWal->mutex); + taosArrayDestroy(pWal->fileInfoSet); + return NULL; } wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, From 78b4d1024934d7db380a0d26cf27936700f753dd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 22 Sep 2022 00:36:04 +0800 Subject: [PATCH 12/12] optimize fix condition --- source/libs/wal/src/walMeta.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index a01869fbe0..c69046f707 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -257,6 +257,10 @@ int walCheckAndRepairIdx(SWal* pWal) { } taosFStatFile(pIdxFile, &fsize, NULL); + if (fsize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) { + taosCloseFile(&pIdxFile); + continue; + } int32_t left = fsize % sizeof(SWalIdxEntry); int64_t offset = taosLSeekFile(pIdxFile, -left, SEEK_END); @@ -267,7 +271,7 @@ int walCheckAndRepairIdx(SWal* pWal) { } offset -= sizeof(SWalIdxEntry); - SWalIdxEntry idxEntry = {0}; + SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer}; while (1) { if (offset < 0) { taosLSeekFile(pIdxFile, 0, SEEK_SET); @@ -306,7 +310,8 @@ int walCheckAndRepairIdx(SWal* pWal) { if (idxEntry.ver != ckHead.head.version) { // todo truncate this idx also taosCloseFile(&pLogFile); - wError("vgId:%d, invalid repair case", pWal->cfg.vgId); + wError("vgId:%d, invalid repair case, log seek to %ld to find ver %ld, actual ver %ld", pWal->cfg.vgId, + idxEntry.offset, idxEntry.ver, ckHead.head.version); return -1; } idxEntry.ver = ckHead.head.version + 1;