From ac860f7fe8c2eb197d30f8c23989006ec8c2e894 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 09:29:42 +0800 Subject: [PATCH 01/10] fix(query): add ts in cache_scan_operator if pk exists and only retrieve ts column. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 6 +----- source/libs/executor/src/aggregateoperator.c | 2 +- source/libs/executor/src/cachescanoperator.c | 17 +++++++++-------- source/libs/function/src/builtinsimpl.c | 5 +++-- source/libs/nodes/src/nodesUtilFuncs.c | 2 +- source/libs/planner/src/planLogicCreater.c | 15 +++++++++------ source/libs/planner/src/planOptimizer.c | 6 +++++- 7 files changed, 29 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 42b8365130..a4be0518e3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -367,11 +367,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 goto _end; } - int32_t pkBufLen = 0; - if (pr->rowKey.numOfPKs > 0) { - pkBufLen = pr->pkColumn.bytes; - } - + int32_t pkBufLen = (pr->rowKey.numOfPKs > 0)? pr->pkColumn.bytes:0; for (int32_t j = 0; j < pr->numOfCols; ++j) { int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes; diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 2429fcff79..d0e1449188 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -278,7 +278,7 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { - // todo add a dummy funtion to avoid process check + // todo add a dummy function to avoid process check if (pCtx[k].fpSet.process == NULL) { continue; } diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 985cdb9433..0d0870911e 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -221,6 +221,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableListInfo* pTableList = pInfo->pTableList; SStoreCacheReader* pReaderFn = &pInfo->readHandle.api.cacheFn; + SSDataBlock* pBufRes = pInfo->pBufferedRes; uint64_t suid = tableListGetSuid(pTableList); int32_t size = tableListGetSize(pTableList); @@ -237,18 +238,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) { - blockDataCleanup(pInfo->pBufferedRes); + if (pInfo->indexOfBufferedRes >= pBufRes->info.rows) { + blockDataCleanup(pBufRes); taosArrayClear(pInfo->pUidList); - int32_t code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds, - pInfo->pDstSlotIds, pInfo->pUidList); + int32_t code = + pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } // check for tag values - int32_t resultRows = pInfo->pBufferedRes->info.rows; + int32_t resultRows = pBufRes->info.rows; // the results may be null, if last values are all null ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList)); @@ -257,12 +258,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; - if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) { + if (pInfo->indexOfBufferedRes < pBufRes->info.rows) { + for (int32_t i = 0; i < taosArrayGetSize(pBufRes->pDataBlock); ++i) { SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); int32_t slotId = pCol->info.slotId; - SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId); + SColumnInfoData* pSrc = taosArrayGet(pBufRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 56f2ccd630..3fb298e1ea 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2837,7 +2837,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p memcpy(pOutput->buf, pInput->buf, pOutput->bytes); if (pInput->pkData) { pOutput->pkBytes = pInput->pkBytes; - memcpy(pOutput->buf+pOutput->bytes, pInput->pkData, pOutput->pkBytes); + memcpy(pOutput->buf + pOutput->bytes, pInput->pkData, pOutput->pkBytes); pOutput->pkData = pOutput->buf + pOutput->bytes; } return TSDB_CODE_SUCCESS; @@ -2885,7 +2885,8 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer } else { pInputInfo->pkData = NULL; } - int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); + + int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 76bf7b04fd..adb011e3ec 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -2343,7 +2343,7 @@ static EDealRes collectFuncs(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } } - SExprNode* pExpr = (SExprNode*)pNode; + bool bFound = false; SNode* pn = NULL; FOREACH(pn, pCxt->pFuncs) { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 2e3e8f189b..60bce622be 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -19,6 +19,9 @@ #include "tglobal.h" #include "parser.h" +// primary key column always the second column if exists +#define PRIMARY_COLUMN_SLOT 1 + typedef struct SLogicPlanContext { SPlanContext* pPlanCxt; SLogicNode* pCurrRoot; @@ -304,7 +307,7 @@ static SNode* createFirstCol(SRealTableNode* pTable, const SSchema* pSchema) { return (SNode*)pCol; } -static int32_t addPrimaryKeyCol(SRealTableNode* pTable, SNodeList** pCols) { +static int32_t addPrimaryTsCol(SRealTableNode* pTable, SNodeList** pCols) { bool found = false; SNode* pCol = NULL; FOREACH(pCol, *pCols) { @@ -327,10 +330,10 @@ static int32_t addSystableFirstCol(SRealTableNode* pTable, SNodeList** pCols) { return nodesListMakeStrictAppend(pCols, createFirstCol(pTable, pTable->pMeta->schema)); } -static int32_t addPkCol(SRealTableNode* pTable, SNodeList** pCols) { +static int32_t addPrimaryKeyCol(SRealTableNode* pTable, SNodeList** pCols) { bool found = false; SNode* pCol = NULL; - SSchema* pSchema = &pTable->pMeta->schema[1]; + SSchema* pSchema = &pTable->pMeta->schema[PRIMARY_COLUMN_SLOT]; FOREACH(pCol, *pCols) { if (pSchema->colId == ((SColumnNode*)pCol)->colId) { found = true; @@ -348,9 +351,9 @@ static int32_t addDefaultScanCol(SRealTableNode* pTable, SNodeList** pCols) { if (TSDB_SYSTEM_TABLE == pTable->pMeta->tableType) { return addSystableFirstCol(pTable, pCols); } - int32_t code = addPrimaryKeyCol(pTable, pCols); + int32_t code = addPrimaryTsCol(pTable, pCols); if (code == TSDB_CODE_SUCCESS && hasPkInTable(pTable->pMeta)) { - code = addPkCol(pTable, pCols); + code = addPrimaryKeyCol(pTable, pCols); } return code; } @@ -1802,7 +1805,7 @@ static int32_t createDeleteScanLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* p STableMeta* pMeta = ((SRealTableNode*)pDelete->pFromTable)->pMeta; if (TSDB_CODE_SUCCESS == code && hasPkInTable(pMeta)) { - code = addPkCol((SRealTableNode*)pDelete->pFromTable, &pScan->pScanCols); + code = addPrimaryKeyCol((SRealTableNode*)pDelete->pFromTable, &pScan->pScanCols); } if (TSDB_CODE_SUCCESS == code && NULL != pDelete->pTagCond) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index da39228a62..eee0766589 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3966,21 +3966,25 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic if (NULL != cxt.pLastCols) { cxt.doAgg = false; cxt.funcType = FUNCTION_TYPE_CACHE_LAST; + lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, pLastRowCols, true, cxt.pkBytes); nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt); + lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false, cxt.pkBytes); lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols, pLastRowCols); - if (pPKTsCol && pScan->node.pTargets->length == 1) { + if (pPKTsCol && ((pScan->node.pTargets->length == 1) || (pScan->node.pTargets->length == 2 && cxt.pkBytes > 0))) { // when select last(ts),ts from ..., we add another ts to targets sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol); nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pPKTsCol)); } + if (pNonPKCol && cxt.pLastCols->length == 1 && nodesEqualNode((SNode*)pNonPKCol, nodesListGetNode(cxt.pLastCols, 0))) { // when select last(c1), c1 from ..., we add c1 to targets sprintf(pNonPKCol->colName, "#sel_val.%p", pNonPKCol); nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pNonPKCol)); } + nodesClearList(cxt.pLastCols); } nodesClearList(cxt.pOtherCols); From b698b8be96d5e608e747eafbfe5ad50ce4f98324 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 09:57:08 +0800 Subject: [PATCH 02/10] fix(stream): pass the address of tmr_h id --- source/libs/stream/src/streamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2bc09c1c22..2528e03593 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1315,7 +1315,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { if (pInfo->checkRspTmr == NULL) { pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); } else { - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, pInfo->checkRspTmr); + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); } taosThreadMutexUnlock(&pInfo->checkInfoLock); From c7806ebca20e178d5484f639b751e1b0b9e873b3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 13:58:02 +0800 Subject: [PATCH 03/10] fix(util): fix race condition. --- include/common/tcommon.h | 3 +- include/common/tglobal.h | 2 +- include/util/tconfig.h | 32 ++-- source/client/src/clientEnv.c | 57 ++++--- source/client/test/clientTests.cpp | 8 +- source/common/src/tglobal.c | 19 ++- source/common/src/tmisce.c | 48 +++++- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 35 +--- source/libs/command/src/command.c | 37 +--- source/util/src/tconfig.c | 178 +++++++++++--------- 10 files changed, 221 insertions(+), 198 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2c4a00a72d..d28477ae40 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -15,7 +15,7 @@ #ifndef _TD_COMMON_DEF_H_ #define _TD_COMMON_DEF_H_ -// #include "taosdef.h" + #include "tarray.h" #include "tmsg.h" #include "tvariant.h" @@ -412,6 +412,7 @@ typedef struct STUidTagInfo { #define UD_TAG_COLUMN_INDEX 2 int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime); +int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol); #define TSMA_RES_STB_POSTFIX "_tsma_res_stb_" #define MD5_OUTPUT_LEN 32 diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 00ed8bfb8e..3b8929f241 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -260,7 +260,7 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile bool tsc); void taosCleanupCfg(); -int32_t taosCfgDynamicOptions(SConfig *pCfg, char *name, bool forServer); +int32_t taosCfgDynamicOptions(SConfig *pCfg, const char *name, bool forServer); struct SConfig *taosGetCfg(); diff --git a/include/util/tconfig.h b/include/util/tconfig.h index 45abe2ff83..2095601e14 100644 --- a/include/util/tconfig.h +++ b/include/util/tconfig.h @@ -98,34 +98,32 @@ typedef struct { const char *value; } SConfigPair; -typedef struct SConfig { - ECfgSrcType stype; - SArray *array; -} SConfig; - -SConfig *cfgInit(); -int32_t cfgLoad(SConfig *pCfg, ECfgSrcType cfgType, const void *sourceStr); -int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs); // SConfigPair -void cfgCleanup(SConfig *pCfg); +typedef struct SConfig SConfig; +typedef struct SConfigIter SConfigIter; +SConfig *cfgInit(); +int32_t cfgLoad(SConfig *pCfg, ECfgSrcType cfgType, const void *sourceStr); +int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs); // SConfigPair +void cfgCleanup(SConfig *pCfg); int32_t cfgGetSize(SConfig *pCfg); SConfigItem *cfgGetItem(SConfig *pCfg, const char *name); int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype); +int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer); +SConfigIter *cfgCreateIter(SConfig *pConf); +SConfigItem *cfgNextIter(SConfigIter *pIter); +void cfgDestroyIter(SConfigIter *pIter); -int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer); - +// clang-format off int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope); -int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, - int8_t dynScope); -int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, - int8_t dynScope); -int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope, - int8_t dynScope); +int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope); +int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope, int8_t dynScope); +int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, float minval, float maxval, int8_t scope, int8_t dynScope); int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope); int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope); int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope); int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope); int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope); +// clang-format on const char *cfgStypeStr(ECfgSrcType type); const char *cfgDtypeStr(ECfgDataType type); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 7f73aa6845..f37e9851e1 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -770,6 +770,32 @@ int taos_init() { return tscInitRes; } +const char* getCfgName(TSDB_OPTION option) { + const char* name = NULL; + + switch (option) { + case TSDB_OPTION_SHELL_ACTIVITY_TIMER: + name = "shellActivityTimer"; + break; + case TSDB_OPTION_LOCALE: + name = "locale"; + break; + case TSDB_OPTION_CHARSET: + name = "charset"; + break; + case TSDB_OPTION_TIMEZONE: + name = "timezone"; + break; + case TSDB_OPTION_USE_ADAPTER: + name = "useAdapter"; + break; + default: + break; + } + + return name; +} + int taos_options_imp(TSDB_OPTION option, const char *str) { if (option == TSDB_OPTION_CONFIGDIR) { #ifndef WINDOWS @@ -799,39 +825,26 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { SConfig *pCfg = taosGetCfg(); SConfigItem *pItem = NULL; + const char *name = getCfgName(option); - switch (option) { - case TSDB_OPTION_SHELL_ACTIVITY_TIMER: - pItem = cfgGetItem(pCfg, "shellActivityTimer"); - break; - case TSDB_OPTION_LOCALE: - pItem = cfgGetItem(pCfg, "locale"); - break; - case TSDB_OPTION_CHARSET: - pItem = cfgGetItem(pCfg, "charset"); - break; - case TSDB_OPTION_TIMEZONE: - pItem = cfgGetItem(pCfg, "timezone"); - break; - case TSDB_OPTION_USE_ADAPTER: - pItem = cfgGetItem(pCfg, "useAdapter"); - break; - default: - break; + if (name == NULL) { + tscError("Invalid option %d", option); + return -1; } + pItem = cfgGetItem(pCfg, name); if (pItem == NULL) { tscError("Invalid option %d", option); return -1; } - int code = cfgSetItem(pCfg, pItem->name, str, CFG_STYPE_TAOS_OPTIONS); + int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS); if (code != 0) { - tscError("failed to set cfg:%s to %s since %s", pItem->name, str, terrstr()); + tscError("failed to set cfg:%s to %s since %s", name, str, terrstr()); } else { - tscInfo("set cfg:%s to %s", pItem->name, str); + tscInfo("set cfg:%s to %s", name, str); if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) { - code = taosCfgDynamicOptions(pCfg, pItem->name, false); + code = taosCfgDynamicOptions(pCfg, name, false); } } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 778b1826b4..b534671acb 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -827,10 +827,14 @@ TEST(clientCase, projection_query_tables) { // } // taos_free_result(pRes); - TAOS_RES* pRes = taos_query(pConn, "use test"); + TAOS_RES* pRes = taos_query(pConn, "use cache_1"); taos_free_result(pRes); - pRes = taos_query(pConn, "create table st2 (ts timestamp, k int primary key, j varchar(1000)) tags(a int)"); + pRes = taos_query(pConn, "select last(ts), ts from cache_1.t1"); +// pRes = taos_query(pConn, "select last(ts), ts from cache_1.no_pk_t1"); + if (taos_errno(pRes) != 0) { + printf("failed to exec query, %s\n", taos_errstr(pRes)); + } taos_free_result(pRes); // pRes = taos_query(pConn, "create stream stream_1 trigger at_once fill_history 1 ignore expired 0 into str_res1 as select _wstart as ts, count(*) from stable_1 interval(10s);"); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 24f00ab6a5..ba96dc0adf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1032,7 +1032,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) { sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32; } -static int32_t taosSetSlowLogScope(char *pScope) { +static int32_t taosSetSlowLogScope(const char *pScope) { if (NULL == pScope || 0 == strlen(pScope)) { tsSlowLogScope = SLOW_LOG_TYPE_ALL; return 0; @@ -1505,7 +1505,7 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, return terrno == TSDB_CODE_SUCCESS ? 0 : -1; } -static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { +static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { terrno = TSDB_CODE_SUCCESS; if (strcasecmp(name, "resetlog") == 0) { @@ -1583,11 +1583,12 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { return terrno == TSDB_CODE_SUCCESS ? 0 : -1; } -static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { +// todo fix race condition caused by update of config, pItem->str may be removed +static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { terrno = TSDB_CODE_SUCCESS; SConfigItem *pItem = cfgGetItem(pCfg, name); - if (!pItem || (pItem->dynScope & CFG_DYN_CLIENT) == 0) { + if ((pItem == NULL) || (pItem->dynScope & CFG_DYN_CLIENT) == 0) { uError("failed to config:%s, not support", name); terrno = TSDB_CODE_INVALID_CFG; return -1; @@ -1598,6 +1599,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { int32_t len = strlen(name); char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len)); + switch (lowcaseName[0]) { case 'd': { if (strcasecmp("debugFlag", name) == 0) { @@ -1803,9 +1805,12 @@ _out: return terrno == TSDB_CODE_SUCCESS ? 0 : -1; } -int32_t taosCfgDynamicOptions(SConfig *pCfg, char *name, bool forServer) { - if (forServer) return taosCfgDynamicOptionsForServer(pCfg, name); - return taosCfgDynamicOptionsForClient(pCfg, name); +int32_t taosCfgDynamicOptions(SConfig *pCfg, const char *name, bool forServer) { + if (forServer) { + return taosCfgDynamicOptionsForServer(pCfg, name); + } else { + return taosCfgDynamicOptionsForClient(pCfg, name); + } } void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal) { diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index 77dd8344b1..0a9e8f434b 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -17,6 +17,8 @@ #include "tmisce.h" #include "tglobal.h" #include "tjson.h" +#include "tdatablock.h" + int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) { pEp->port = 0; memset(pEp->fqdn, 0, TSDB_FQDN_LEN); @@ -97,10 +99,10 @@ void epsetSort(SEpSet* pDst) { SEp* s = &pDst->eps[j + 1]; int cmp = strncmp(f->fqdn, s->fqdn, sizeof(f->fqdn)); if (cmp > 0 || (cmp == 0 && f->port > s->port)) { - SEp ep = {0}; - epAssign(&ep, f); + SEp ep1 = {0}; + epAssign(&ep1, f); epAssign(f, s); - epAssign(s, &ep); + epAssign(s, &ep1); } } } @@ -216,3 +218,43 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t return TSDB_CODE_SUCCESS; } + +int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) { + SConfig* pConf = taosGetCfg(); + int32_t numOfRows = 0; + int32_t col = startCol; + SConfigItem* pItem = NULL; + + blockDataEnsureCapacity(pBlock, cfgGetSize(pConf)); + SConfigIter* pIter = cfgCreateIter(pConf); + + while ((pItem = cfgNextIter(pIter)) != NULL) { + col = startCol; + + // GRANT_CFG_SKIP; + char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, col++); + colDataSetVal(pColInfo, numOfRows, name, false); + + char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; + int32_t valueLen = 0; + cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen); + varDataSetLen(value, valueLen); + pColInfo = taosArrayGet(pBlock->pDataBlock, col++); + colDataSetVal(pColInfo, numOfRows, value, false); + + char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0}; + cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen); + varDataSetLen(scope, valueLen); + pColInfo = taosArrayGet(pBlock->pDataBlock, col++); + colDataSetVal(pColInfo, numOfRows, scope, false); + + numOfRows++; + } + + pBlock->info.rows = numOfRows; + + cfgDestroyIter(pIter); + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 12e414b30d..56fdb463c4 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -333,39 +333,10 @@ SSDataBlock *dmBuildVariablesBlock(void) { } int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) { - int32_t numOfCfg = taosArrayGetSize(tsCfg->array); - int32_t numOfRows = 0; - blockDataEnsureCapacity(pBlock, numOfCfg); + /*int32_t code = */dumpConfToDataBlock(pBlock, 1); - for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { - SConfigItem *pItem = taosArrayGet(tsCfg->array, i); - // GRANT_CFG_SKIP; - - SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, (const char *)&dnodeId, false); - - char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); - pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, name, false); - - char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; - int32_t valueLen = 0; - cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen); - varDataSetLen(value, valueLen); - pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, value, false); - - char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0}; - cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen); - varDataSetLen(scope, valueLen); - pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, scope, false); - - numOfRows++; - } - - pBlock->info.rows = numOfRows; + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); + colDataSetNItems(pColInfo, 0, (const char *)&dnodeId, pBlock->info.rows, false); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index bf901188dc..2d190edb58 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -955,46 +955,11 @@ static int32_t buildLocalVariablesResultDataBlock(SSDataBlock** pOutput) { return TSDB_CODE_SUCCESS; } -int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) { - int32_t numOfCfg = taosArrayGetSize(tsCfg->array); - int32_t numOfRows = 0; - blockDataEnsureCapacity(pBlock, numOfCfg); - - for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) { - SConfigItem* pItem = taosArrayGet(tsCfg->array, i); - // GRANT_CFG_SKIP; - - char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE); - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, name, false); - - char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; - int32_t valueLen = 0; - cfgDumpItemValue(pItem, &value[VARSTR_HEADER_SIZE], TSDB_CONFIG_VALUE_LEN, &valueLen); - varDataSetLen(value, valueLen); - pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, value, false); - - char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0}; - cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen); - varDataSetLen(scope, valueLen); - pColInfo = taosArrayGet(pBlock->pDataBlock, c++); - colDataSetVal(pColInfo, i, scope, false); - - numOfRows++; - } - - pBlock->info.rows = numOfRows; - - return TSDB_CODE_SUCCESS; -} - static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) { SSDataBlock* pBlock = NULL; int32_t code = buildLocalVariablesResultDataBlock(&pBlock); if (TSDB_CODE_SUCCESS == code) { - code = setLocalVariablesResultIntoDataBlock(pBlock); + code = dumpConfToDataBlock(pBlock, 0); } if (TSDB_CODE_SUCCESS == code) { code = buildRetrieveTableRsp(pBlock, SHOW_LOCAL_VARIABLES_RESULT_COLS, pRsp); diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index aec1eba684..5ca2be37f2 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -27,12 +27,17 @@ #define CFG_NAME_PRINT_LEN 24 #define CFG_SRC_PRINT_LEN 12 +struct SConfig { + ECfgSrcType stype; + SArray *array; + TdThreadMutex lock; +}; + int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath); -int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *filepath); +int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile); int32_t cfgLoadFromEnvVar(SConfig *pConfig); int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd); int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url); -int32_t cfgSetItem(SConfig *pConfig, const char *name, const char *value, ECfgSrcType stype); extern char **environ; @@ -50,6 +55,7 @@ SConfig *cfgInit() { return NULL; } + taosThreadMutexInit(&pCfg->lock, NULL); return pCfg; } @@ -87,9 +93,9 @@ static void cfgFreeItem(SConfigItem *pItem) { pItem->dtype == CFG_DTYPE_CHARSET || pItem->dtype == CFG_DTYPE_TIMEZONE) { taosMemoryFreeClear(pItem->str); } + if (pItem->array) { - taosArrayDestroy(pItem->array); - pItem->array = NULL; + pItem->array = taosArrayDestroy(pItem->array); } } @@ -102,37 +108,18 @@ void cfgCleanup(SConfig *pCfg) { taosMemoryFreeClear(pItem->name); } taosArrayDestroy(pCfg->array); + taosThreadMutexDestroy(&pCfg->lock); taosMemoryFree(pCfg); } } int32_t cfgGetSize(SConfig *pCfg) { return taosArrayGetSize(pCfg->array); } -static int32_t cfgCheckAndSetTimezone(SConfigItem *pItem, const char *timezone) { +static int32_t cfgCheckAndSetConf(SConfigItem *pItem, const char *conf) { cfgFreeItem(pItem); - pItem->str = taosStrdup(timezone); - if (pItem->str == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + ASSERT(pItem->str == NULL); - return 0; -} - -static int32_t cfgCheckAndSetCharset(SConfigItem *pItem, const char *charset) { - cfgFreeItem(pItem); - pItem->str = taosStrdup(charset); - if (pItem->str == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static int32_t cfgCheckAndSetLocale(SConfigItem *pItem, const char *locale) { - cfgFreeItem(pItem); - pItem->str = taosStrdup(locale); + pItem->str = taosStrdup(conf); if (pItem->str == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -229,7 +216,7 @@ static int32_t cfgSetString(SConfigItem *pItem, const char *value, ECfgSrcType s return -1; } - taosMemoryFree(pItem->str); + taosMemoryFreeClear(pItem->str); pItem->str = tmp; pItem->stype = stype; return 0; @@ -246,20 +233,8 @@ static int32_t cfgSetDir(SConfigItem *pItem, const char *value, ECfgSrcType styp return 0; } -static int32_t cfgSetLocale(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - if (cfgCheckAndSetLocale(pItem, value) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype), - cfgStypeStr(stype), value, terrstr()); - return -1; - } - - pItem->stype = stype; - return 0; -} - -static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - if (cfgCheckAndSetCharset(pItem, value) != 0) { +static int32_t doSetConf(SConfigItem *pItem, const char *value, ECfgSrcType stype) { + if (cfgCheckAndSetConf(pItem, value) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr()); @@ -271,18 +246,13 @@ static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType } static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType stype) { - if (cfgCheckAndSetTimezone(pItem, value) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype), - cfgStypeStr(stype), value, terrstr()); - return -1; + int32_t code = doSetConf(pItem, value, stype); + if (code != TSDB_CODE_SUCCESS) { + return code; } - pItem->stype = stype; - // apply new timezone osSetTimezone(value); - - return 0; + return code; } static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value, const char *level, const char *primary, @@ -342,40 +312,62 @@ static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool rese int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype) { // GRANT_CFG_SET; + int32_t code = 0; SConfigItem *pItem = cfgGetItem(pCfg, name); if (pItem == NULL) { terrno = TSDB_CODE_CFG_NOT_FOUND; return -1; } + taosThreadMutexLock(&pCfg->lock); + switch (pItem->dtype) { - case CFG_DTYPE_BOOL: - return cfgSetBool(pItem, value, stype); - case CFG_DTYPE_INT32: - return cfgSetInt32(pItem, value, stype); - case CFG_DTYPE_INT64: - return cfgSetInt64(pItem, value, stype); + case CFG_DTYPE_BOOL: { + code = cfgSetBool(pItem, value, stype); + break; + } + case CFG_DTYPE_INT32: { + code = cfgSetInt32(pItem, value, stype); + break; + } + case CFG_DTYPE_INT64: { + code = cfgSetInt64(pItem, value, stype); + break; + } case CFG_DTYPE_FLOAT: - case CFG_DTYPE_DOUBLE: - return cfgSetFloat(pItem, value, stype); - case CFG_DTYPE_STRING: - return cfgSetString(pItem, value, stype); - case CFG_DTYPE_DIR: - return cfgSetDir(pItem, value, stype); - case CFG_DTYPE_TIMEZONE: - return cfgSetTimezone(pItem, value, stype); - case CFG_DTYPE_CHARSET: - return cfgSetCharset(pItem, value, stype); - case CFG_DTYPE_LOCALE: - return cfgSetLocale(pItem, value, stype); + case CFG_DTYPE_DOUBLE: { + code = cfgSetFloat(pItem, value, stype); + break; + } + case CFG_DTYPE_STRING: { + code = cfgSetString(pItem, value, stype); + break; + } + case CFG_DTYPE_DIR: { + code = cfgSetDir(pItem, value, stype); + break; + } + case CFG_DTYPE_TIMEZONE: { + code = cfgSetTimezone(pItem, value, stype); + break; + } + case CFG_DTYPE_CHARSET: { + code = doSetConf(pItem, value, stype); + break; + } + case CFG_DTYPE_LOCALE: { + code = doSetConf(pItem, value, stype); + break; + } case CFG_DTYPE_NONE: default: break; } -_err_out: + taosThreadMutexUnlock(&pCfg->lock); + terrno = TSDB_CODE_INVALID_CFG; - return -1; + return code; } SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) { @@ -388,16 +380,16 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) { } } - // uError("name:%s, cfg not found", name); terrno = TSDB_CODE_CFG_NOT_FOUND; return NULL; } int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer) { ECfgDynType dynType = isServer ? CFG_DYN_SERVER : CFG_DYN_CLIENT; + SConfigItem *pItem = cfgGetItem(pCfg, name); if (!pItem || (pItem->dynScope & dynType) == 0) { - uError("failed to config:%s, not support", name); + uError("failed to config:%s, not support update this config", name); terrno = TSDB_CODE_INVALID_CFG; return -1; } @@ -459,7 +451,7 @@ static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) { return -1; } - int size = pCfg->array->size; + int32_t size = taosArrayGetSize(pCfg->array); for (int32_t i = 0; i < size; ++i) { SConfigItem *existItem = taosArrayGet(pCfg->array, i); if (existItem != NULL && strcmp(existItem->name, pItem->name) == 0) { @@ -559,7 +551,7 @@ int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_ int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) { SConfigItem item = {.dtype = CFG_DTYPE_LOCALE, .scope = scope, .dynScope = dynScope}; - if (cfgCheckAndSetLocale(&item, defaultVal) != 0) { + if (cfgCheckAndSetConf(&item, defaultVal) != 0) { return -1; } @@ -568,7 +560,7 @@ int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, in int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) { SConfigItem item = {.dtype = CFG_DTYPE_CHARSET, .scope = scope, .dynScope = dynScope}; - if (cfgCheckAndSetCharset(&item, defaultVal) != 0) { + if (cfgCheckAndSetConf(&item, defaultVal) != 0) { return -1; } @@ -577,7 +569,7 @@ int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, i int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope, int8_t dynScope) { SConfigItem item = {.dtype = CFG_DTYPE_TIMEZONE, .scope = scope, .dynScope = dynScope}; - if (cfgCheckAndSetTimezone(&item, defaultVal) != 0) { + if (cfgCheckAndSetConf(&item, defaultVal) != 0) { return -1; } @@ -1356,3 +1348,35 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char *apolloUrl uInfo("fail get apollo url from cmd env file"); return -1; } + +struct SConfigIter { + int32_t index; + SConfig *pConf; +}; + +SConfigIter *cfgCreateIter(SConfig *pConf) { + SConfigIter* pIter = taosMemoryCalloc(1, sizeof(SConfigIter)); + if (pIter == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pIter->pConf = pConf; + return pIter; +} + +SConfigItem *cfgNextIter(SConfigIter* pIter) { + if (pIter->index < cfgGetSize(pIter->pConf)) { + return taosArrayGet(pIter->pConf->array, pIter->index++); + } + + return NULL; +} + +void cfgDestroyIter(SConfigIter *pIter) { + if (pIter == NULL) { + return; + } + + taosMemoryFree(pIter); +} \ No newline at end of file From e1cd7710edbc6ee0493834e0e7d5820d9699c685 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 14:23:54 +0800 Subject: [PATCH 04/10] fix(test): fix error in unit test. --- source/util/test/cfgTest.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/source/util/test/cfgTest.cpp b/source/util/test/cfgTest.cpp index e10ffe7c9b..92422b6a80 100644 --- a/source/util/test/cfgTest.cpp +++ b/source/util/test/cfgTest.cpp @@ -63,9 +63,11 @@ TEST_F(CfgTest, 02_Basic) { EXPECT_EQ(cfgGetSize(pConfig), 6); - int32_t size = taosArrayGetSize(pConfig->array); - for (int32_t i = 0; i < size; ++i) { - SConfigItem *pItem = (SConfigItem *)taosArrayGet(pConfig->array, i); + int32_t size = cfgGetSize(pConfig); + + SConfigItem* pItem = NULL; + SConfigIter* pIter = cfgCreateIter(pConfig); + while((pItem == cfgNextIter(pIter)) != NULL) { switch (pItem->dtype) { case CFG_DTYPE_BOOL: printf("index:%d, cfg:%s value:%d\n", size, pItem->name, pItem->bval); @@ -90,9 +92,12 @@ TEST_F(CfgTest, 02_Basic) { break; } } + + cfgDestroyIter(pIter); + EXPECT_EQ(cfgGetSize(pConfig), 6); - SConfigItem *pItem = cfgGetItem(pConfig, "test_bool"); + pItem = cfgGetItem(pConfig, "test_bool"); EXPECT_EQ(pItem->stype, CFG_STYPE_DEFAULT); EXPECT_EQ(pItem->dtype, CFG_DTYPE_BOOL); EXPECT_STREQ(pItem->name, "test_bool"); From d1ecfe5cf329bdcb3890a6cc98c06272c9348efb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 14:24:20 +0800 Subject: [PATCH 05/10] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 18 ++++++++++ source/libs/stream/src/streamStart.c | 44 ++++++----------------- 2 files changed, 29 insertions(+), 33 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 36886329ac..8efd661d12 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -94,6 +94,24 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea return 0; } +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + static int32_t streamAlignCheckpoint(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 0b91359f48..1f6c5add42 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -29,19 +29,15 @@ typedef struct SLaunchHTaskInfo { STaskId hTaskId; } SLaunchHTaskInfo; -typedef struct STaskRecheckInfo { - SStreamTask* pTask; - SStreamTaskCheckReq req; - void* checkTimer; -} STaskRecheckInfo; - static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); -static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, - int32_t hTaskId); +static SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId); static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); +static void doExecScanhistoryInFuture(void* param, void* tmrId); +static int32_t doStartScanHistoryTask(SStreamTask* pTask); +static int32_t streamTaskStartScanHistory(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); @@ -83,7 +79,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } -static void doExecScanhistoryInFuture(void* param, void* tmrId) { +void doExecScanhistoryInFuture(void* param, void* tmrId) { SStreamTask* pTask = param; pTask->schedHistoryInfo.numOfTicks -= 1; @@ -139,7 +135,7 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) return TSDB_CODE_SUCCESS; } -static int32_t doStartScanHistoryTask(SStreamTask* pTask) { +int32_t doStartScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); @@ -663,16 +659,15 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { taosMemoryFree(pInfo); } -SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t hStreamId, - int32_t hTaskId) { +SLaunchHTaskInfo* createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId) { SLaunchHTaskInfo* pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pInfo->id.streamId = streamId; - pInfo->id.taskId = taskId; + pInfo->id.streamId = pTaskId->streamId; + pInfo->id.taskId = pTaskId->taskId; pInfo->hTaskId.streamId = hStreamId; pInfo->hTaskId.taskId = hTaskId; @@ -691,7 +686,8 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId); - SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId); + STaskId id = streamTaskGetTaskId(pTask); + SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId); if (pInfo == NULL) { stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); @@ -860,24 +856,6 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) return 0; } -int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; - tEndEncode(pEncoder); - return 0; -} - -int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; From 7d1e2f1f9de92a8e4a52882615390b8421b3ba5f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 16:38:26 +0800 Subject: [PATCH 06/10] fix(util): set code to be success if set config success. --- source/util/src/tconfig.c | 3 +-- tests/army/community/cmdline/fullopt.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 5ca2be37f2..b1d9403a9d 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -361,12 +361,11 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy } case CFG_DTYPE_NONE: default: + terrno = TSDB_CODE_INVALID_CFG; break; } taosThreadMutexUnlock(&pCfg->lock); - - terrno = TSDB_CODE_INVALID_CFG; return code; } diff --git a/tests/army/community/cmdline/fullopt.py b/tests/army/community/cmdline/fullopt.py index c03ba428a1..e61501d7b8 100644 --- a/tests/army/community/cmdline/fullopt.py +++ b/tests/army/community/cmdline/fullopt.py @@ -49,7 +49,7 @@ class TDTestCase(TBase): def checkQueryOK(self, rets): if rets[-2][:9] != "Query OK,": - tdLog.exit(f"check taos -s return unecpect: {rets}") + tdLog.exit(f"check taos -s return unexpect: {rets}") def doTaos(self): tdLog.info(f"check taos command options...") From cd8baa7ba00b59b361b22370114494034944ca69 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 16:39:11 +0800 Subject: [PATCH 07/10] fix(stream): add log for update task epset, set correct update flag if epset updated. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 9 +-- source/libs/stream/src/streamTask.c | 76 +++++++++++++++------- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 8b2e9693eb..254e7e5856 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -211,7 +211,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); if (pReqTask != NULL) { - tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", idstr, vgId, req.transId); + tqDebug("s-task:%s (vgId:%d) already update in transId:%d, discard the nodeEp update msg", idstr, vgId, req.transId); rsp.code = TSDB_CODE_SUCCESS; streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); @@ -235,7 +235,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } else { tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); bool updateEpSet = streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); - if (!updated) { + if (updateEpSet) { updated = updateEpSet; } @@ -245,14 +245,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } if (updated) { - tqDebug("s-task:%s vgId:%d save task after update epset", idstr, vgId); + tqDebug("s-task:%s vgId:%d save task after update epset, and stop task", idstr, vgId); streamMetaSaveTask(pMeta, pTask); if (ppHTask != NULL) { streamMetaSaveTask(pMeta, *ppHTask); } + } else { + tqDebug("s-task:%s vgId:%s not save task since not update epset actually, stop task", idstr); } - tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId); streamTaskStop(pTask); // keep the already updated info diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2528e03593..dff1a1505f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -36,15 +36,20 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet, bool* pUpdated) { char buf[512] = {0}; - if (pTask->info.nodeId == nodeId) { // execution task should be moved away - if (!(*pUpdated)) { - *pUpdated = isEpsetEqual(&pTask->info.epSet, pEpSet); - } - - epsetAssign(&pTask->info.epSet, pEpSet); + bool isEqual = isEpsetEqual(&pTask->info.epSet, pEpSet); epsetToStr(pEpSet, buf, tListLen(buf)); - stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf); + + if (!isEqual) { + (*pUpdated) = true; + char tmp[512] = {0}; + epsetToStr(&pTask->info.epSet, tmp, tListLen(tmp)); + + epsetAssign(&pTask->info.epSet, pEpSet); + stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp); + } else { + stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", buf); + } } // check for the dispatch info and the upstream task info @@ -620,13 +625,21 @@ void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpS for (int32_t i = 0; i < numOfUpstream; ++i) { SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); if (pInfo->nodeId == nodeId) { - if (!(*pUpdated)) { - *pUpdated = isEpsetEqual(&pInfo->epSet, pEpSet); + bool equal = isEpsetEqual(&pInfo->epSet, pEpSet); + if (!equal) { + *pUpdated = true; + + char tmp[512] = {0}; + epsetToStr(&pInfo->epSet, tmp, tListLen(tmp)); + + epsetAssign(&pInfo->epSet, pEpSet); + stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s old:%s", pTask->id.taskId, + pInfo->taskId, nodeId, buf, tmp); + } else { + stDebug("s-task:0x%x not update upstreamInfo, since identical, task:0x%x(nodeId:%d) epset:%s", pTask->id.taskId, + pInfo->taskId, nodeId, buf); } - epsetAssign(&pInfo->epSet, pEpSet); - stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId, pInfo->taskId, - nodeId, buf); break; } } @@ -653,7 +666,6 @@ void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDo void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool *pUpdated) { char buf[512] = {0}; epsetToStr(pEpSet, buf, tListLen(buf)); - *pUpdated = false; int32_t id = pTask->id.taskId; int8_t type = pTask->outputInfo.type; @@ -661,29 +673,43 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgroups = taosArrayGetSize(pVgs); - for (int32_t i = 0; i < numOfVgroups; i++) { + for (int32_t i = 0; i < taosArrayGetSize(pVgs); i++) { SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); if (pVgInfo->vgId == nodeId) { - if (!(*pUpdated)) { - (*pUpdated) = isEpsetEqual(&pVgInfo->epSet, pEpSet); - } + bool isEqual = isEpsetEqual(&pVgInfo->epSet, pEpSet); + if (!isEqual) { + *pUpdated = true; + char tmp[512] = {0}; + epsetToStr(&pVgInfo->epSet, tmp, tListLen(tmp)); - epsetAssign(&pVgInfo->epSet, pEpSet); - stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pVgInfo->taskId, nodeId, buf); + epsetAssign(&pVgInfo->epSet, pEpSet); + stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pVgInfo->taskId, + nodeId, buf, tmp); + } else { + stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id, + pVgInfo->taskId, nodeId, buf); + } break; } } } else if (type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatcher = &pTask->outputInfo.fixedDispatcher; if (pDispatcher->nodeId == nodeId) { - if (!(*pUpdated)) { - *pUpdated = isEpsetEqual(&pDispatcher->epSet, pEpSet); - } + bool equal = isEpsetEqual(&pDispatcher->epSet, pEpSet); + if (!equal) { + *pUpdated = true; - epsetAssign(&pDispatcher->epSet, pEpSet); - stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s", id, pDispatcher->taskId, nodeId, buf); + char tmp[512] = {0}; + epsetToStr(&pDispatcher->epSet, tmp, tListLen(tmp)); + + epsetAssign(&pDispatcher->epSet, pEpSet); + stDebug("s-task:0x%x update dispatch info, task:0x%x(nodeId:%d) newEpset:%s old:%s", id, pDispatcher->taskId, + nodeId, buf, tmp); + } else { + stDebug("s-task:0x%x not update dispatch info, since identical, task:0x%x(nodeId:%d) epset:%s", id, + pDispatcher->taskId, nodeId, buf); + } } } } From addd2d2a84502580ebe49c64a1d235d4755c1f47 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 17:06:15 +0800 Subject: [PATCH 08/10] fix(stream): fix syntax error. --- source/libs/stream/src/streamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dff1a1505f..5849e1f00e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -48,7 +48,7 @@ static int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEp epsetAssign(&pTask->info.epSet, pEpSet); stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s, old:%s", pTask->id.taskId, nodeId, buf, tmp); } else { - stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", buf); + stDebug("s-task:0x%x (vgId:%d) not updated task epset, since epset identical, %s", pTask->id.taskId, nodeId, buf); } } From 3b932599ab418385c94839e07274a8ea28aa0ef1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 17:47:03 +0800 Subject: [PATCH 09/10] fix(stream): fix syntax error. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 254e7e5856..59941b32df 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -251,7 +251,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaSaveTask(pMeta, *ppHTask); } } else { - tqDebug("s-task:%s vgId:%s not save task since not update epset actually, stop task", idstr); + tqDebug("s-task:%s vgId:%s not save task since not update epset actually, stop task", idstr, vgId); } streamTaskStop(pTask); From eef5e6e4096b3b22d6ebce8fbc3b03ef5d2a7630 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Apr 2024 18:30:33 +0800 Subject: [PATCH 10/10] fix(stream): fix syntax error. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 59941b32df..ee4f5366d6 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -251,7 +251,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaSaveTask(pMeta, *ppHTask); } } else { - tqDebug("s-task:%s vgId:%s not save task since not update epset actually, stop task", idstr, vgId); + tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId); } streamTaskStop(pTask);