From 2d1e9ba6317ada5bd39f4272cf68ba2a54a33a07 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 19:11:57 +0800 Subject: [PATCH 1/7] fix(stream):add ref for task in check rsp monitor timer. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckStatus.c | 35 ++++++++++++++++----- source/libs/stream/src/streamStartHistory.c | 3 +- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c74a9fd7b..dfe35e50c7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -848,6 +848,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); +int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index d164d01934..d7960ee725 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -55,7 +55,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - streamTaskStartMonitorCheckRsp(pTask); + streamTaskPrepareMonitorCheckRsp(pTask); STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; @@ -69,8 +69,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamTaskStartMonitorCheckRsp(pTask); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + streamTaskPrepareMonitorCheckRsp(pTask); SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; @@ -89,6 +90,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } + + // the check rsp monitor timer must be invoked here + streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr); @@ -161,18 +165,27 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { +int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask) { + /*SStreamTask* p = */streamMetaAcquireOneTask(pTask); // add task ref here STaskCheckInfo* pInfo = &pTask->taskCheckInfo; taosThreadMutexLock(&pInfo->checkInfoLock); int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { taosThreadMutexUnlock(&pInfo->checkInfoLock); + streamMetaReleaseTask(pTask->pMeta, pTask); return TSDB_CODE_FAILED; } streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; +} +int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + + taosThreadMutexLock(&pInfo->checkInfoLock); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); @@ -183,7 +196,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { } taosThreadMutexUnlock(&pInfo->checkInfoLock); - return 0; + return TSDB_CODE_SUCCESS; } int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { @@ -329,7 +342,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { } stDebug("s-task:%s set the in-check-procedure flag", id); - return 0; + return TSDB_CODE_SUCCESS; } int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) { @@ -519,6 +532,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; + SStreamMeta* pMeta = pTask->pMeta; SStreamTaskState* pStat = streamTaskGetStatus(pTask); STaskCheckInfo* pInfo = &pTask->taskCheckInfo; int32_t vgId = pTask->pMeta->vgId; @@ -546,6 +560,8 @@ void rspMonitorFn(void* param, void* tmrId) { STaskId* pHId = &pTask->hTaskInfo.id; streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); } + + streamMetaReleaseTask(pMeta, pTask); return; } @@ -554,6 +570,7 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); + streamMetaReleaseTask(pMeta, pTask); return; } @@ -565,6 +582,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); + streamMetaReleaseTask(pMeta, pTask); return; } @@ -591,6 +609,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); + streamMetaReleaseTask(pMeta, pTask); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); @@ -609,12 +628,14 @@ void rspMonitorFn(void* param, void* tmrId) { taosThreadMutexUnlock(&pInfo->checkInfoLock); // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); } + streamMetaReleaseTask(pMeta, pTask); + taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); return; diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 04a99feab0..c76536aedf 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -85,9 +85,10 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) { SStreamTaskState* p = streamTaskGetStatus(pTask); if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { - streamMetaReleaseTask(pTask->pMeta, pTask); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); return; } From faa87e13b0da823a9fbb07f69ce444482bc0bc1a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 19:12:19 +0800 Subject: [PATCH 2/7] fix(util): fix race condition. --- include/util/tconfig.h | 5 ++- source/common/src/tglobal.c | 22 +++++++++- source/util/src/tconfig.c | 82 +++++++++++++++++++++++++------------ 3 files changed, 80 insertions(+), 29 deletions(-) diff --git a/include/util/tconfig.h b/include/util/tconfig.h index 2095601e14..4c16ed0ea6 100644 --- a/include/util/tconfig.h +++ b/include/util/tconfig.h @@ -106,12 +106,15 @@ int32_t cfgLoad(SConfig *pCfg, ECfgSrcType cfgType, const void *sourceStr); int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs); // SConfigPair void cfgCleanup(SConfig *pCfg); int32_t cfgGetSize(SConfig *pCfg); -SConfigItem *cfgGetItem(SConfig *pCfg, const char *name); +SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName); int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype); int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer); + SConfigIter *cfgCreateIter(SConfig *pConf); SConfigItem *cfgNextIter(SConfigIter *pIter); void cfgDestroyIter(SConfigIter *pIter); +void cfgLock(SConfig *pCfg); +void cfgUnLock(SConfig *pCfg); // clang-format off int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope, int8_t dynScope); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ba96dc0adf..74c5b923f3 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1515,15 +1515,20 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { return 0; } + cfgLock(pCfg); + SConfigItem *pItem = cfgGetItem(pCfg, name); if (!pItem || (pItem->dynScope & CFG_DYN_SERVER) == 0) { uError("failed to config:%s, not support", name); terrno = TSDB_CODE_INVALID_CFG; + + cfgUnLock(pCfg); return -1; } if (strncasecmp(name, "debugFlag", 9) == 0) { taosSetAllDebugFlag(pCfg, pItem->i32); + cfgUnLock(pCfg); return 0; } @@ -1580,17 +1585,21 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { } } + cfgUnLock(pCfg); return terrno == TSDB_CODE_SUCCESS ? 0 : -1; } -// todo fix race condition caused by update of config, pItem->str may be removed static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { terrno = TSDB_CODE_SUCCESS; + cfgLock(pCfg); + SConfigItem *pItem = cfgGetItem(pCfg, name); if ((pItem == NULL) || (pItem->dynScope & CFG_DYN_CLIENT) == 0) { uError("failed to config:%s, not support", name); terrno = TSDB_CODE_INVALID_CFG; + + cfgUnLock(pCfg); return -1; } @@ -1728,6 +1737,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { matched = true; } else if (strcasecmp("slowLogScope", name) == 0) { if (taosSetSlowLogScope(pItem->str)) { + cfgUnLock(pCfg); return -1; } uInfo("%s set to %s", name, pItem->str); @@ -1747,6 +1757,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { taosExpandDir(tsTempDir, tsTempDir, PATH_MAX); if (taosMulMkDir(tsTempDir) != 0) { uError("failed to create tempDir:%s since %s", tsTempDir, terrstr()); + cfgUnLock(pCfg); return -1; } matched = true; @@ -1802,6 +1813,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { } _out: + cfgUnLock(pCfg); return terrno == TSDB_CODE_SUCCESS ? 0 : -1; } @@ -1844,6 +1856,8 @@ static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) { return; } + cfgLock(pCfg); + SArray *noNeedToSetVars = NULL; SConfigItem *pItem = cfgGetItem(pCfg, "debugFlag"); if (pItem != NULL) { @@ -1878,7 +1892,11 @@ static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) { taosArrayClear(noNeedToSetVars); // reset array uInfo("all debug flag are set to %d", flag); - if (terrno == TSDB_CODE_CFG_NOT_FOUND) terrno = TSDB_CODE_SUCCESS; // ignore not exist + if (terrno == TSDB_CODE_CFG_NOT_FOUND) { + terrno = TSDB_CODE_SUCCESS; // ignore not exist + } + + cfgUnLock(pCfg); } int8_t taosGranted(int8_t type) { diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index b1d9403a9d..1461861f35 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -88,7 +88,7 @@ int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs) { return 0; } -static void cfgFreeItem(SConfigItem *pItem) { +void cfgItemFreeVal(SConfigItem *pItem) { if (pItem->dtype == CFG_DTYPE_STRING || pItem->dtype == CFG_DTYPE_DIR || pItem->dtype == CFG_DTYPE_LOCALE || pItem->dtype == CFG_DTYPE_CHARSET || pItem->dtype == CFG_DTYPE_TIMEZONE) { taosMemoryFreeClear(pItem->str); @@ -100,23 +100,26 @@ static void cfgFreeItem(SConfigItem *pItem) { } void cfgCleanup(SConfig *pCfg) { - if (pCfg != NULL) { - int32_t size = taosArrayGetSize(pCfg->array); - for (int32_t i = 0; i < size; ++i) { - SConfigItem *pItem = taosArrayGet(pCfg->array, i); - cfgFreeItem(pItem); - taosMemoryFreeClear(pItem->name); - } - taosArrayDestroy(pCfg->array); - taosThreadMutexDestroy(&pCfg->lock); - taosMemoryFree(pCfg); + if (pCfg == NULL) { + return; } + + int32_t size = taosArrayGetSize(pCfg->array); + for (int32_t i = 0; i < size; ++i) { + SConfigItem *pItem = taosArrayGet(pCfg->array, i); + cfgItemFreeVal(pItem); + taosMemoryFreeClear(pItem->name); + } + + taosArrayDestroy(pCfg->array); + taosThreadMutexDestroy(&pCfg->lock); + taosMemoryFree(pCfg); } int32_t cfgGetSize(SConfig *pCfg) { return taosArrayGetSize(pCfg->array); } static int32_t cfgCheckAndSetConf(SConfigItem *pItem, const char *conf) { - cfgFreeItem(pItem); + cfgItemFreeVal(pItem); ASSERT(pItem->str == NULL); pItem->str = taosStrdup(conf); @@ -257,13 +260,21 @@ static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value, const char *level, const char *primary, ECfgSrcType stype) { + taosThreadMutexLock(&pCfg->lock); + SConfigItem *pItem = cfgGetItem(pCfg, name); - if (pItem == NULL) return -1; + if (pItem == NULL) { + taosThreadMutexUnlock(&pCfg->lock); + + return -1; + } if (pItem->array == NULL) { pItem->array = taosArrayInit(16, sizeof(SDiskCfg)); if (pItem->array == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosThreadMutexUnlock(&pCfg->lock); + return -1; } } @@ -275,10 +286,14 @@ static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value, void *ret = taosArrayPush(pItem->array, &cfg); if (ret == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosThreadMutexUnlock(&pCfg->lock); + return -1; } pItem->stype = stype; + taosThreadMutexUnlock(&pCfg->lock); + return 0; } @@ -312,15 +327,16 @@ static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool rese int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype) { // GRANT_CFG_SET; - int32_t code = 0; + int32_t code = 0; + taosThreadMutexLock(&pCfg->lock); + SConfigItem *pItem = cfgGetItem(pCfg, name); if (pItem == NULL) { terrno = TSDB_CODE_CFG_NOT_FOUND; + taosThreadMutexUnlock(&pCfg->lock); return -1; } - taosThreadMutexLock(&pCfg->lock); - switch (pItem->dtype) { case CFG_DTYPE_BOOL: { code = cfgSetBool(pItem, value, stype); @@ -369,12 +385,12 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy return code; } -SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) { +SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName) { if (pCfg == NULL) return NULL; int32_t size = taosArrayGetSize(pCfg->array); for (int32_t i = 0; i < size; ++i) { SConfigItem *pItem = taosArrayGet(pCfg->array, i); - if (strcasecmp(pItem->name, name) == 0) { + if (strcasecmp(pItem->name, pName) == 0) { return pItem; } } @@ -383,9 +399,23 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) { return NULL; } +void cfgLock(SConfig *pCfg) { + if (pCfg == NULL) { + return; + } + + taosThreadMutexLock(&pCfg->lock); +} + +void cfgUnLock(SConfig *pCfg) { + taosThreadMutexUnlock(&pCfg->lock); +} + int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer) { ECfgDynType dynType = isServer ? CFG_DYN_SERVER : CFG_DYN_CLIENT; + cfgLock(pCfg); + SConfigItem *pItem = cfgGetItem(pCfg, name); if (!pItem || (pItem->dynScope & dynType) == 0) { uError("failed to config:%s, not support update this config", name); @@ -439,6 +469,7 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p default: break; } + return 0; } @@ -1037,18 +1068,18 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) { paGetToken(name + olen + 1, &value, &vlen); if (vlen == 0) continue; value[vlen] = 0; - + if (strcasecmp(name, "encryptScope") == 0) { - char* tmp = NULL; + char *tmp = NULL; int32_t len = 0; - char newValue[1024] = {0}; + char newValue[1024] = {0}; strcpy(newValue, value); - + int32_t count = 1; - while(vlen < 1024){ + while (vlen < 1024) { paGetToken(value + vlen + 1 * count, &tmp, &len); - if(len == 0) break; + if (len == 0) break; tmp[len] = 0; strcpy(newValue + vlen, tmp); vlen += len; @@ -1057,8 +1088,7 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) { code = cfgSetItem(pConfig, name, newValue, CFG_STYPE_CFG_FILE); if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; - } - else{ + } else { paGetToken(value + vlen + 1, &value2, &vlen2); if (vlen2 != 0) { value2[vlen2] = 0; From a33e7b2eb3acc21ab7afa3c412f814ac921b108f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 22:22:54 +0800 Subject: [PATCH 3/7] refactor: do some internal refactor. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamCheckStatus.c | 31 +++++++--------------- 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 44fb0706b8..07dce9a451 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,7 +26,7 @@ extern "C" { #endif -#define CHECK_RSP_INTERVAL 300 +#define CHECK_RSP_CHECK_INTERVAL 300 #define LAUNCH_HTASK_INTERVAL 100 #define WAIT_FOR_MINIMAL_INTERVAL 100.00 #define MAX_RETRY_LAUNCH_HISTORY_TASK 40 diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index d7960ee725..0a87833055 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -55,7 +55,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - streamTaskPrepareMonitorCheckRsp(pTask); + streamTaskStartMonitorCheckRsp(pTask); STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; @@ -69,9 +69,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); - streamTaskStartMonitorCheckRsp(pTask); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - streamTaskPrepareMonitorCheckRsp(pTask); + streamTaskStartMonitorCheckRsp(pTask); SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; @@ -90,9 +89,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } - - // the check rsp monitor timer must be invoked here - streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr); @@ -165,38 +161,30 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask) { - /*SStreamTask* p = */streamMetaAcquireOneTask(pTask); // add task ref here +int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; - taosThreadMutexLock(&pInfo->checkInfoLock); + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { taosThreadMutexUnlock(&pInfo->checkInfoLock); - streamMetaReleaseTask(pTask->pMeta, pTask); return TSDB_CODE_FAILED; } + /*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; -} -int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { - STaskCheckInfo* pInfo = &pTask->taskCheckInfo; - - taosThreadMutexLock(&pInfo->checkInfoLock); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); if (pInfo->checkRspTmr == NULL) { - pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); } else { - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); + taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); } taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; + return 0; } int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { @@ -376,7 +364,6 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) { SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0}; - taosThreadMutexLock(&pInfo->checkInfoLock); SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); @@ -649,7 +636,7 @@ void rspMonitorFn(void* param, void* tmrId) { handleTimeoutDownstreamTasks(pTask, pTimeoutList); } - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); + taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", From 9df2bc2634313a0e65668e41d23170828e537902 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 22:23:23 +0800 Subject: [PATCH 4/7] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 1 - 1 file changed, 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index dfe35e50c7..3c74a9fd7b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -848,7 +848,6 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); From 12570d7390ce90ba2082b9b00df153f22fce1d59 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 23:09:17 +0800 Subject: [PATCH 5/7] fix(client): fix deadlock. --- source/common/src/tglobal.c | 4 ---- source/util/src/tconfig.c | 21 ++++++++++++++++++--- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 74c5b923f3..a839da7264 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1856,8 +1856,6 @@ static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) { return; } - cfgLock(pCfg); - SArray *noNeedToSetVars = NULL; SConfigItem *pItem = cfgGetItem(pCfg, "debugFlag"); if (pItem != NULL) { @@ -1895,8 +1893,6 @@ static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) { if (terrno == TSDB_CODE_CFG_NOT_FOUND) { terrno = TSDB_CODE_SUCCESS; // ignore not exist } - - cfgUnLock(pCfg); } int8_t taosGranted(int8_t type) { diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 1461861f35..0d99412d1b 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -420,6 +420,7 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p if (!pItem || (pItem->dynScope & dynType) == 0) { uError("failed to config:%s, not support update this config", name); terrno = TSDB_CODE_INVALID_CFG; + cfgUnLock(pCfg); return -1; } @@ -429,28 +430,37 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p if (ival != 0 && ival != 1) { uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name, cfgDtypeStr(pItem->dtype), ival); terrno = TSDB_CODE_OUT_OF_RANGE; + cfgUnLock(pCfg); return -1; } } break; case CFG_DTYPE_INT32: { int32_t ival; int32_t code = (int32_t)taosStrHumanToInt32(pVal, &ival); - if (code != TSDB_CODE_SUCCESS) return code; + if (code != TSDB_CODE_SUCCESS) { + cfgUnLock(pCfg); + return code; + } if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax); terrno = TSDB_CODE_OUT_OF_RANGE; + cfgUnLock(pCfg); return -1; } } break; case CFG_DTYPE_INT64: { int64_t ival; int32_t code = taosStrHumanToInt64(pVal, &ival); - if (code != TSDB_CODE_SUCCESS) return code; + if (code != TSDB_CODE_SUCCESS) { + cfgUnLock(pCfg); + return code; + } if (ival < pItem->imin || ival > pItem->imax) { uError("cfg:%s, type:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name, cfgDtypeStr(pItem->dtype), ival, pItem->imin, pItem->imax); terrno = TSDB_CODE_OUT_OF_RANGE; + cfgUnLock(pCfg); return -1; } } break; @@ -458,11 +468,15 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p case CFG_DTYPE_DOUBLE: { double dval; int32_t code = parseCfgReal(pVal, &dval); - if (code != TSDB_CODE_SUCCESS) return code; + if (code != TSDB_CODE_SUCCESS) { + cfgUnLock(pCfg); + return code; + } if (dval < pItem->fmin || dval > pItem->fmax) { uError("cfg:%s, type:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype), dval, pItem->fmin, pItem->fmax); terrno = TSDB_CODE_OUT_OF_RANGE; + cfgUnLock(pCfg); return -1; } } break; @@ -470,6 +484,7 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p break; } + cfgUnLock(pCfg); return 0; } From 1f6b7bbcff352abe0ba0a865be27f63e975f0482 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 27 Apr 2024 10:08:15 +0800 Subject: [PATCH 6/7] fix(util): fix dead lock. --- include/util/tconfig.h | 2 +- source/client/src/clientEnv.c | 2 +- source/common/src/tglobal.c | 21 +++++++++++++-------- source/libs/command/src/command.c | 2 +- source/util/src/tconfig.c | 26 ++++++++++++++++---------- 5 files changed, 32 insertions(+), 21 deletions(-) diff --git a/include/util/tconfig.h b/include/util/tconfig.h index 4c16ed0ea6..8373e00085 100644 --- a/include/util/tconfig.h +++ b/include/util/tconfig.h @@ -107,7 +107,7 @@ int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs); // SConfigPair void cfgCleanup(SConfig *pCfg); int32_t cfgGetSize(SConfig *pCfg); SConfigItem *cfgGetItem(SConfig *pCfg, const char *pName); -int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype); +int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock); int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *pVal, bool isServer); SConfigIter *cfgCreateIter(SConfig *pConf); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index f37e9851e1..439103e5c4 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -838,7 +838,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { return -1; } - int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS); + int code = cfgSetItem(pCfg, name, str, CFG_STYPE_TAOS_OPTIONS, true); if (code != 0) { tscError("failed to set cfg:%s to %s since %s", name, str, terrstr()); } else { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a839da7264..cad1145a6b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1081,13 +1081,13 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { SEp firstEp = {0}; taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp); snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port); - cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype); + cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, true); SConfigItem *pSecondpItem = cfgGetItem(pCfg, "secondEp"); SEp secondEp = {0}; taosGetFqdnPortFromEp(strlen(pSecondpItem->str) == 0 ? defaultFirstEp : pSecondpItem->str, &secondEp); snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port); - cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype); + cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype, true); tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX); taosExpandDir(tsTempDir, tsTempDir, PATH_MAX); @@ -1149,9 +1149,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { static void taosSetSystemCfg(SConfig *pCfg) { SConfigItem *pItem = cfgGetItem(pCfg, "timezone"); + osSetTimezone(pItem->str); uDebug("timezone format changed from %s to %s", pItem->str, tsTimezoneStr); - cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype); + cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype, true); const char *locale = cfgGetItem(pCfg, "locale")->str; const char *charset = cfgGetItem(pCfg, "charset")->str; @@ -1639,7 +1640,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { SEp firstEp = {0}; taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp); snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port); - cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype); + + cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, false); uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst); matched = true; } else if (strcasecmp("firstEp", name) == 0) { @@ -1654,7 +1656,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { SEp firstEp = {0}; taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp); snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port); - cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype); + + cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, false); uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst); matched = true; } @@ -1701,7 +1704,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { SEp secondEp = {0}; taosGetFqdnPortFromEp(strlen(pItem->str) == 0 ? tsFirst : pItem->str, &secondEp); snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port); - cfgSetItem(pCfg, "secondEp", tsSecond, pItem->stype); + cfgSetItem(pCfg, "secondEp", tsSecond, pItem->stype, false); uInfo("%s set to %s", name, tsSecond); matched = true; } else if (strcasecmp("smlChildTableName", name) == 0) { @@ -1732,7 +1735,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { SEp firstEp = {0}; taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp); snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port); - cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype); + + cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype, false); uInfo("localEp set to '%s', tsFirst set to '%s'", tsLocalEp, tsFirst); matched = true; } else if (strcasecmp("slowLogScope", name) == 0) { @@ -1749,7 +1753,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) { if (strcasecmp("timezone", name) == 0) { osSetTimezone(pItem->str); uInfo("%s set from %s to %s", name, tsTimezoneStr, pItem->str); - cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype); + + cfgSetItem(pCfg, "timezone", tsTimezoneStr, pItem->stype, false); matched = true; } else if (strcasecmp("tempDir", name) == 0) { uInfo("%s set from %s to %s", name, tsTempDir, pItem->str); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 2d190edb58..cb44b7d433 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -914,7 +914,7 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { return terrno; } - if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) { + if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD, true)) { return terrno; } diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 0d99412d1b..d05d616233 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -80,7 +80,7 @@ int32_t cfgLoadFromArray(SConfig *pCfg, SArray *pArgs) { int32_t size = taosArrayGetSize(pArgs); for (int32_t i = 0; i < size; ++i) { SConfigPair *pPair = taosArrayGet(pArgs, i); - if (cfgSetItem(pCfg, pPair->name, pPair->value, CFG_STYPE_ARG_LIST) != 0) { + if (cfgSetItem(pCfg, pPair->name, pPair->value, CFG_STYPE_ARG_LIST, true) != 0) { return -1; } } @@ -325,10 +325,13 @@ static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool rese return 0; } -int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype) { +int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype, bool lock) { // GRANT_CFG_SET; int32_t code = 0; - taosThreadMutexLock(&pCfg->lock); + + if (lock) { + taosThreadMutexLock(&pCfg->lock); + } SConfigItem *pItem = cfgGetItem(pCfg, name); if (pItem == NULL) { @@ -381,7 +384,10 @@ int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcTy break; } - taosThreadMutexUnlock(&pCfg->lock); + if (lock) { + taosThreadMutexUnlock(&pCfg->lock); + } + return code; } @@ -923,7 +929,7 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) { if (vlen3 != 0) value3[vlen3] = 0; } - code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR); + code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR, true); if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; if (strcasecmp(name, "dataDir") == 0) { @@ -966,7 +972,7 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) { if (vlen3 != 0) value3[vlen3] = 0; } - code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD); + code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD, true); if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; if (strcasecmp(name, "dataDir") == 0) { @@ -1031,7 +1037,7 @@ int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) { if (vlen3 != 0) value3[vlen3] = 0; } - code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE); + code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE, true); if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; if (strcasecmp(name, "dataDir") == 0) { @@ -1101,7 +1107,7 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) { count++; } - code = cfgSetItem(pConfig, name, newValue, CFG_STYPE_CFG_FILE); + code = cfgSetItem(pConfig, name, newValue, CFG_STYPE_CFG_FILE, true); if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; } else { paGetToken(value + vlen + 1, &value2, &vlen2); @@ -1111,7 +1117,7 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) { if (vlen3 != 0) value3[vlen3] = 0; } - code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE); + code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE, true); if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; } @@ -1286,7 +1292,7 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) { if (vlen3 != 0) value3[vlen3] = 0; } - code = cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL); + code = cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL, true); if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break; if (strcasecmp(name, "dataDir") == 0) { From 8e47adba09426d4f7bb16c1dd1ab198658dc0034 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 27 Apr 2024 17:46:23 +0800 Subject: [PATCH 7/7] fix(util): fix syntax error. --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 56fdb463c4..355b47c10b 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -219,7 +219,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("start to config, option:%s, value:%s", cfgReq.config, cfgReq.value); SConfig *pCfg = taosGetCfg(); - cfgSetItem(pCfg, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_CMD); + cfgSetItem(pCfg, cfgReq.config, cfgReq.value, CFG_STYPE_ALTER_CMD, true); taosCfgDynamicOptions(pCfg, cfgReq.config, true); return 0; } @@ -254,7 +254,6 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK; pStatus->details[0] = 0; - SServerStatusRsp statusRsp = {0}; SMonMloadInfo minfo = {0}; (*pMgmt->getMnodeLoadsFp)(&minfo); if (minfo.isMnode &&