From 22808ce1b70efe3e0eb4cf675d8b76b834235773 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 17:33:30 +0800 Subject: [PATCH 01/20] fix(stream): update the fill-time for quota limitation. --- source/libs/stream/src/streamQueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 63ee702ada..11e02e94b9 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -160,7 +160,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); - taosMsleep(10); +// taosMsleep(10); return TSDB_CODE_SUCCESS; } From 0463c0d7557ddb76591da03781936a45ad8a28b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 17:49:49 +0800 Subject: [PATCH 02/20] refactor: wait for a while when no quota available. --- source/libs/stream/src/streamQueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 11e02e94b9..63ee702ada 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -160,7 +160,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); -// taosMsleep(10); + taosMsleep(10); return TSDB_CODE_SUCCESS; } From 608c72e901c75202ef2ea5b8fa0118f22ceef31e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Nov 2023 11:04:16 +0800 Subject: [PATCH 03/20] refactor(stream): create sim env for stream processing. --- source/dnode/vnode/src/tq/tq.c | 38 ++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1c1a4a192c..9dc941eab1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1925,7 +1925,45 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaWUnLock(pMeta); } else { streamMetaWUnLock(pMeta); +#if 0 tqStartStreamTaskAsync(pTq, true); +#else + // For debug purpose. + // the following procedure consume many CPU resource, result in the re-election of leader + // with high probability. So we employ it as a test case for the stream processing framework, with + // checkpoint/restart/nodeUpdate etc. + while (streamMetaTaskInTimer(pMeta)) { + tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + streamMetaWLock(pMeta); + + int32_t code = streamMetaReopen(pMeta); + if (code != 0) { + tqError("vgId:%d failed to reopen stream meta", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { + tqError("vgId:%d failed to load stream tasks", vgId); + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return -1; + } + + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + tqResetStreamTaskStatus(pTq); + tqStartStreamTaskAsync(pTq, false); + } else { + tqInfo("vgId:%d, follower node not start stream tasks", vgId); + } + + streamMetaWUnLock(pMeta); +#endif } } From 77d6fd5394427123aa06c6373319f8299559cdaf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Nov 2023 14:02:08 +0800 Subject: [PATCH 04/20] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/vnode/src/tq/tq.c | 6 ++++-- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fd0c349dd2..0cdb180645 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2571,7 +2571,7 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) { } if (transId == 0) { - mError("failed to find the checkpoint trans, reset not executed"); + mDebug("failed to find the checkpoint trans, reset not executed"); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9dc941eab1..10d42dcbea 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1924,6 +1924,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { pMeta->startInfo.tasksWillRestart = 0; streamMetaWUnLock(pMeta); } else { + tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); + streamMetaWUnLock(pMeta); #if 0 tqStartStreamTaskAsync(pTq, true); @@ -1955,13 +1957,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + tqInfo("vgId:%d start all stream tasks after all being updated", vgId); tqResetStreamTaskStatus(pTq); tqStartStreamTaskAsync(pTq, false); } else { tqInfo("vgId:%d, follower node not start stream tasks", vgId); } - + taosArrayDestroy(req.pNodeList); streamMetaWUnLock(pMeta); #endif } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 26849f8578..0531557cc1 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -299,7 +299,7 @@ int32_t tqResetStreamTaskStatus(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d start all %d stream task(s)", vgId, numOfTasks); + tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; } From 801b211cd87c6068c30a4d4a283d83432f4aca56 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 Nov 2023 14:29:40 +0800 Subject: [PATCH 05/20] refactor:do some internal refactor. --- cmake/cmake.define | 10 +++++++++- include/os/osEnv.h | 3 ++- include/os/osSysinfo.h | 2 +- source/common/src/tglobal.c | 15 ++++++++------- source/libs/function/src/detail/tavgfunction.c | 2 +- source/libs/function/src/detail/tminmax.c | 10 +++++----- source/os/src/osEnv.c | 3 ++- source/os/src/osSysinfo.c | 11 +++-------- source/util/src/tcompression.c | 4 ++-- 9 files changed, 33 insertions(+), 27 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 3343798686..56b6b7e1de 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -149,6 +149,8 @@ ELSE () CHECK_C_COMPILER_FLAG("-mfma" COMPILER_SUPPORT_FMA) CHECK_C_COMPILER_FLAG("-mavx" COMPILER_SUPPORT_AVX) CHECK_C_COMPILER_FLAG("-mavx2" COMPILER_SUPPORT_AVX2) + CHECK_C_COMPILER_FLAG("-mavx512f" COMPILER_SUPPORT_AVX512F) + CHECK_C_COMPILER_FLAG("-mavx512vbmi" COMPILER_SUPPORT_AVX512BMI) IF (COMPILER_SUPPORT_SSE42) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -msse4.2") @@ -168,7 +170,13 @@ ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx2") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx2") ENDIF() - MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2) is ACTIVATED") + MESSAGE(STATUS "SIMD instructions (FMA/AVX/AVX2/AVX512) is ACTIVATED") + + IF (COMPILER_SUPPORT_AVX512F AND COMPILER_SUPPORT_AVX512BMI) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mavx512f -mavx512vbmi") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512f -mavx512vbmi") + MESSAGE(STATUS "avx512 supported by gcc") + ENDIF() ENDIF() # build mode diff --git a/include/os/osEnv.h b/include/os/osEnv.h index bc65da47a9..ac4ecd4212 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -36,11 +36,12 @@ extern int64_t tsStreamMax; extern float tsNumOfCores; extern int64_t tsTotalMemoryKB; extern char *tsProcPath; -extern char tsSIMDBuiltins; +extern char tsSIMDEnable; extern char tsSSE42Enable; extern char tsAVXEnable; extern char tsAVX2Enable; extern char tsFMAEnable; +extern char tsAVX512Enable; extern char tsTagFilterCache; extern char configDir[]; diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 29b6f07dca..7a1df2b81c 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -41,7 +41,7 @@ int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores); int32_t taosGetCpuCores(float *numOfCores, bool physical); void taosGetCpuUsage(double *cpu_system, double *cpu_engine); -int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma); +int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512); int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetProcMemory(int64_t *usedKB); int32_t taosGetSysMemory(int64_t *usedKB); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cb67fc1ba3..d12ebb13c2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -511,12 +511,13 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "enableCoreFile", 1, CFG_SCOPE_BOTH, CFG_DYN_CLIENT) != 0) return -1; if (cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - if (cfgAddBool(pCfg, "ssd42", tsSSE42Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - if (cfgAddBool(pCfg, "avx", tsAVXEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - if (cfgAddBool(pCfg, "avx2", tsAVX2Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - if (cfgAddBool(pCfg, "fma", tsFMAEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - if (cfgAddBool(pCfg, "simdEnable", tsSIMDBuiltins, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "ssd42", tsSSE42Enable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "avx", tsAVXEnable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "avx2", tsAVX2Enable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "fma", tsFMAEnable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "avx512", tsAVX512Enable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "simdEnable", tsSIMDEnable, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; #if !defined(_ALPINE) @@ -1080,7 +1081,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; - tsSIMDBuiltins = (bool)cfgGetItem(pCfg, "simdEnable")->bval; + tsSIMDEnable = (bool)cfgGetItem(pCfg, "simdEnable")->bval; tsTagFilterCache = (bool)cfgGetItem(pCfg, "tagFilterCache")->bval; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; diff --git a/source/libs/function/src/detail/tavgfunction.c b/source/libs/function/src/detail/tavgfunction.c index 50df1b5067..e626c937da 100644 --- a/source/libs/function/src/detail/tavgfunction.c +++ b/source/libs/function/src/detail/tavgfunction.c @@ -565,7 +565,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { numOfElem = pInput->numOfRows; pAvgRes->count += pInput->numOfRows; - bool simdAvailable = tsAVXEnable && tsSIMDBuiltins && (numOfRows > THRESHOLD_SIZE); + bool simdAvailable = tsAVXEnable && tsSIMDEnable && (numOfRows > THRESHOLD_SIZE); switch(type) { case TSDB_DATA_TYPE_UTINYINT: diff --git a/source/libs/function/src/detail/tminmax.c b/source/libs/function/src/detail/tminmax.c index 3ca1c06303..a6c91a57ce 100644 --- a/source/libs/function/src/detail/tminmax.c +++ b/source/libs/function/src/detail/tminmax.c @@ -370,7 +370,7 @@ static int32_t findFirstValPosition(const SColumnInfoData* pCol, int32_t start, static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { // AVX2 version to speedup the loop - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); } else { if (!pBuf->assign) { @@ -404,7 +404,7 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { // AVX2 version to speedup the loop - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); } else { if (!pBuf->assign) { @@ -438,7 +438,7 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc, bool signVal) { // AVX2 version to speedup the loop - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal); } else { if (!pBuf->assign) { @@ -502,7 +502,7 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo float* val = (float*)&pBuf->v; // AVX version to speedup the loop - if (tsAVXEnable && tsSIMDBuiltins) { + if (tsAVXEnable && tsSIMDEnable) { *val = floatVectorCmpAVX(pData, numOfRows, isMinFunc); } else { if (!pBuf->assign) { @@ -533,7 +533,7 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR double* val = (double*)&pBuf->v; // AVX version to speedup the loop - if (tsAVXEnable && tsSIMDBuiltins) { + if (tsAVXEnable && tsSIMDEnable) { *val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc); } else { if (!pBuf->assign) { diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 0fc136c693..54107db325 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -37,11 +37,12 @@ float tsNumOfCores = 0; int64_t tsTotalMemoryKB = 0; char *tsProcPath = NULL; -char tsSIMDBuiltins = 0; +char tsSIMDEnable = 0; char tsSSE42Enable = 0; char tsAVXEnable = 0; char tsAVX2Enable = 0; char tsFMAEnable = 0; +char tsAVX512Enable = 0; void osDefaultInit() { taosSeedRand(taosSafeRand()); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 4816ec8f8b..fea7a4f63d 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -250,7 +250,7 @@ void taosGetSystemInfo() { taosGetCpuCores(&tsNumOfCores, false); taosGetTotalMemory(&tsTotalMemoryKB); taosGetCpuUsage(NULL, NULL); - taosGetCpuInstructions(&tsSSE42Enable, &tsAVXEnable, &tsAVX2Enable, &tsFMAEnable); + taosGetCpuInstructions(&tsSSE42Enable, &tsAVXEnable, &tsAVX2Enable, &tsFMAEnable, &tsAVX512Enable); #endif } @@ -602,7 +602,7 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) { : "0"(level)) // todo add for windows and mac -int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) { +int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512) { #ifdef WINDOWS #elif defined(_TD_DARWIN_64) #else @@ -610,12 +610,6 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) { #ifdef _TD_X86_ // Since the compiler is not support avx/avx2 instructions, the global variables always need to be // set to be false -//#if __AVX__ || __AVX2__ -// tsSIMDBuiltins = true; -//#else -// tsSIMDBuiltins = false; -//#endif - uint32_t eax = 0, ebx = 0, ecx = 0, edx = 0; int32_t ret = __get_cpuid(1, &eax, &ebx, &ecx, &edx); @@ -631,6 +625,7 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma) { // Ref to https://gcc.gnu.org/bugzilla/show_bug.cgi?id=77756 __cpuid_fix(7u, eax, ebx, ecx, edx); *avx2 = (char) ((ebx & bit_AVX2) == bit_AVX2); + *avx512 = (char)((ebx & bit_AVX512F) == bit_AVX512F); #endif // _TD_X86_ #endif diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 3fc3ef6be6..dc89a24180 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -283,7 +283,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha int32_t batch = num >> 2; int32_t remain = num & 0x03; if (selector == 0 || selector == 1) { - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { for (int32_t i = 0; i < batch; ++i) { __m256i prev = _mm256_set1_epi64x(prev_value); _mm256_storeu_si256((__m256i *)&p[_pos], prev); @@ -300,7 +300,7 @@ int32_t tsDecompressINTImp(const char *const input, const int32_t nelements, cha } } } else { - if (tsAVX2Enable && tsSIMDBuiltins) { + if (tsAVX2Enable && tsSIMDEnable) { __m256i base = _mm256_set1_epi64x(w); __m256i maskVal = _mm256_set1_epi64x(mask); From 45ab92a02da7a355ae1227b9f7570e6d25b11dee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Nov 2023 10:10:37 +0800 Subject: [PATCH 06/20] fix(stream): remove invalid free. --- source/dnode/vnode/src/tq/tq.c | 1 - source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 10d42dcbea..b3a359a683 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1963,7 +1963,6 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } else { tqInfo("vgId:%d, follower node not start stream tasks", vgId); } - taosArrayDestroy(req.pNodeList); streamMetaWUnLock(pMeta); #endif } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 0531557cc1..e578638e9d 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -448,7 +448,7 @@ bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* num numOfNewItems += 1; int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.nextProcessVer = ver; - tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", id, ver); + tqDebug("s-task:%s set ver:%" PRId64 " for reader after extract data from WAL", id, ver); bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver); if (itemInFillhistory) { From 97772e9aabdb06ead7d9c52efdaed330d4e208cf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Nov 2023 17:11:37 +0800 Subject: [PATCH 07/20] fix(stream): the checkpoint version can only be updated when generating checkpoint. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckpoint.c | 5 ++++- source/libs/stream/src/streamExec.c | 17 ++++++++++------- source/libs/stream/src/streamTask.c | 6 ++++-- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index eab3ecf04e..9d32912ece 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -304,6 +304,7 @@ typedef struct SCheckpointInfo { int64_t startTs; int64_t checkpointId; int64_t checkpointVer; // latest checkpointId version + int64_t processedVer; // already processed ver, that has generated results version. int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t failedId; // record the latest failed checkpoint id } SCheckpointInfo; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 81840aaeb7..48b6486e05 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -297,9 +297,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { continue; } - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); + ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && + p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->checkpointingId; + p->chkInfo.checkpointVer = p->chkInfo.processedVer; + streamTaskClearCheckInfo(p); char* str = NULL; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a6101b0932..cae537a860 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -593,7 +593,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { const SStreamQueueItem* pItem = pInput; stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type); - int64_t ver = pTask->chkInfo.checkpointVer; + int64_t ver = pTask->chkInfo.processedVer; doSetStreamInputBlock(pTask, pInput, &ver, id); int64_t resSize = 0; @@ -604,13 +604,16 @@ int32_t streamExecForAll(SStreamTask* pTask) { stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, SIZE_IN_MiB(resSize), totalBlocks); - // update the currentVer if processing the submit blocks. - ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer); + SCheckpointInfo* pInfo = &pTask->chkInfo; - if (ver != pTask->chkInfo.checkpointVer) { - stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 ", nextProcessVer:%" PRId64, - pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer); - pTask->chkInfo.checkpointVer = ver; + // update the currentVer if processing the submit blocks. + ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); + + if (ver != pInfo->processedVer) { + stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 + " ckpt:%" PRId64, + pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); + pInfo->processedVer = ver; } streamFreeQitem(pInput); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a7fb590d1b..24228c0307 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -431,8 +431,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; - pTask->chkInfo.checkpointVer = ver - 1; - pTask->chkInfo.nextProcessVer = ver; + pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint + pTask->chkInfo.processedVer = ver - 1; // already processed version + + pTask->chkInfo.nextProcessVer = ver; // next processed version pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; From e1de1de4214e6f28122a172ffa03f4be8a5f1d24 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Nov 2023 18:14:27 +0800 Subject: [PATCH 08/20] fix(stream): fix the invalid free. --- source/libs/stream/src/streamData.c | 9 ++++----- source/libs/stream/src/streamQueue.c | 10 ++++------ source/libs/stream/src/streamStart.c | 1 - 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 80927b36b9..f6ec6e9fdb 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -129,6 +129,7 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); taosMemoryFree(pDataSubmit->submit.msgStr); + taosFreeQitem(pDataSubmit); } SStreamMergedSubmit* streamMergedSubmitNew() { @@ -208,12 +209,10 @@ void streamFreeQitem(SStreamQueueItem* data) { if (type == STREAM_INPUT__GET_RES) { blockDataDestroy(((SStreamTrigger*)data)->pBlock); taosFreeQitem(data); - } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) { - taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(data); + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) { + destroyStreamDataBlock((SStreamDataBlock*)data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { streamDataSubmitDestroy((SStreamDataSubmit*)data); - taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; @@ -228,7 +227,7 @@ void streamFreeQitem(SStreamQueueItem* data) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; blockDataDestroy(pRefBlock->pBlock); taosFreeQitem(pRefBlock); - } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { SStreamDataBlock* pBlock = (SStreamDataBlock*) data; taosArrayDestroyEx(pBlock->blocks, freeItems); taosFreeQitem(pBlock); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 63ee702ada..556de169b4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -270,7 +270,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); - taosFreeQitem(pItem); return -1; } @@ -280,7 +279,6 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { streamDataSubmitDestroy(px); - taosFreeQitem(pItem); return code; } @@ -296,13 +294,13 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); - destroyStreamDataBlock((SStreamDataBlock*)pItem); + streamFreeQitem(pItem); return -1; } int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { - destroyStreamDataBlock((SStreamDataBlock*)pItem); + streamFreeQitem(pItem); return code; } @@ -312,7 +310,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) type == STREAM_INPUT__TRANS_STATE) { int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { - taosFreeQitem(pItem); + streamFreeQitem(pItem); return code; } @@ -323,7 +321,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) // use the default memory limit, refactor later. int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { - taosFreeQitem(pItem); + streamFreeQitem(pItem); return code; } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index e672b256da..da4aa02e9c 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -562,7 +562,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { taosMemoryFree(pBlock); if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTranstate) < 0) { - taosFreeQitem(pTranstate); return TSDB_CODE_OUT_OF_MEMORY; } From 4d9b4228742f008adb18f1f67e2ea93de8198e8a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 9 Nov 2023 23:26:09 +0800 Subject: [PATCH 09/20] fix(stream): check for the repeatedly sent checkpoint-source msg. --- source/dnode/vnode/src/tq/tq.c | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b3a359a683..438734e191 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1668,7 +1668,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) SStreamCheckpointSourceReq req = {0}; if (!vnodeIsRoleLeader(pTq->pVnode)) { - tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId); + tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs @@ -1676,7 +1676,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } if (!pTq->pVnode->restored) { - tqDebug("vgId:%d checkpoint-source msg received during restoring, ignore it", vgId); + tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs @@ -1696,7 +1696,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } tDecoderClear(&decoder); - // todo handle failure to reset from checkpoint procedure SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, @@ -1707,7 +1706,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } - // todo handle failure to reset from checkpoint procedure // downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req. if (pTask->status.downstreamReady != 1) { pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id @@ -1728,7 +1726,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) ETaskStatus status = streamTaskGetStatus(pTask, NULL); if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { - qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", + tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); taosThreadMutexUnlock(&pTask->lock); @@ -1739,6 +1737,18 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) tmsgSendRsp(&rsp); // error occurs return TSDB_CODE_SUCCESS; } + + // check if the checkpoint msg already sent or not. + if (status == TASK_STATUS__CK) { + ASSERT(pTask->checkpointingId == req.checkpointId); + + tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 + " already received, ignore this msg and continue process checkpoint", + pTask->id.idStr, pTask->checkpointingId); + streamMetaReleaseTask(pMeta, pTask); + return code; + } + streamProcessCheckpointSourceReq(pTask, &req); taosThreadMutexUnlock(&pTask->lock); From af08a189c140bcc8c03d89ead1087aecee2c47ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 09:24:08 +0800 Subject: [PATCH 10/20] fix(stream): disable concurrently restart stream tasks. --- source/dnode/vnode/src/tq/tq.c | 10 ++++++++++ source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 6 +++--- source/libs/stream/src/streamMeta.c | 7 ++++--- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 438734e191..dafd3aaa4a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1944,6 +1944,16 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // the following procedure consume many CPU resource, result in the re-election of leader // with high probability. So we employ it as a test case for the stream processing framework, with // checkpoint/restart/nodeUpdate etc. + while(1) { + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); + if (startVal == 0) { + break; + } + + tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId); + taosMsleep(500); + } + while (streamMetaTaskInTimer(pMeta)) { tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); taosMsleep(100); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index b34b3420fe..441c71662e 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -44,7 +44,7 @@ typedef struct { int64_t defaultCfInit; } SBackendWrapper; -void* streamBackendInit(const char* path, int64_t chkpId); +void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); int32_t streamBackendLoadCheckpointInfo(void* pMeta); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b22c6c9b0f..63dc497c6f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -469,11 +469,11 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } -void* streamBackendInit(const char* streamPath, int64_t chkpId) { +void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { char* backendPath = NULL; int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); - stDebug("start to init stream backend at %s, checkpointid: %" PRId64 "", backendPath, chkpId); + stDebug("start to init stream backend at %s, checkpointid: %" PRId64 " vgId:%d", backendPath, chkpId, vgId); uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); @@ -534,7 +534,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } - stDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle); + stDebug("succ to init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId); taosMemoryFreeClear(backendPath); return (void*)pHandle; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 17cd9fac57..e6bbd89f02 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -195,10 +195,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->chkpDirLock); pMeta->chkpId = streamGetLatestCheckpointId(pMeta); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId); while (pMeta->streamBackend == NULL) { taosMsleep(100); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId); if (pMeta->streamBackend == NULL) { stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); } @@ -263,7 +263,8 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { } } - while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId)) == NULL) { + // todo: not wait in a critical region + while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) { stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); taosMsleep(100); } From cb0d244d5a6e965c96083f8fec41dbc92ef40919 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 09:35:41 +0800 Subject: [PATCH 11/20] fix(stream): release lock. --- source/dnode/vnode/src/tq/tq.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index dafd3aaa4a..ba79844e40 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1728,25 +1728,28 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); - taosThreadMutexUnlock(&pTask->lock); + taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs + return TSDB_CODE_SUCCESS; } // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { ASSERT(pTask->checkpointingId == req.checkpointId); - tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 " already received, ignore this msg and continue process checkpoint", pTask->id.idStr, pTask->checkpointingId); + + taosThreadMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - return code; + + return TSDB_CODE_SUCCESS; } streamProcessCheckpointSourceReq(pTask, &req); From 5937bdddf58249ee65166d5caacc6da6beffcfaf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 09:46:20 +0800 Subject: [PATCH 12/20] refactor:disable test. --- source/dnode/vnode/src/tq/tq.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ba79844e40..065690dbfe 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1940,7 +1940,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); streamMetaWUnLock(pMeta); -#if 0 +#if 1 tqStartStreamTaskAsync(pTq, true); #else // For debug purpose. From 8e5db21c3f76c04c1b71fec5fb282a3bfc932149 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 10:10:38 +0800 Subject: [PATCH 13/20] fix(test): fix syntax error. --- source/libs/stream/test/tstreamUpdateTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index f63939ac9e..1b999e5fb0 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -12,7 +12,7 @@ class StreamStateEnv : public ::testing::Test { protected: virtual void SetUp() { streamMetaInit(); - backend = streamBackendInit(path, 0); + backend = streamBackendInit(path, 0, 0); } virtual void TearDown() { streamMetaCleanup(); From 8925c721e54adedeb446f3a1b5328f822a54630c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 10:26:45 +0800 Subject: [PATCH 14/20] fix(stream): adjust critical section. --- source/dnode/vnode/src/tq/tq.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 065690dbfe..3ae0eb1ddf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1939,10 +1939,12 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } else { tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); - streamMetaWUnLock(pMeta); #if 1 tqStartStreamTaskAsync(pTq, true); + streamMetaWUnLock(pMeta); #else + streamMetaWUnLock(pMeta); + // For debug purpose. // the following procedure consume many CPU resource, result in the re-election of leader // with high probability. So we employ it as a test case for the stream processing framework, with From 7a23df4b1aee3fb5595d91f890dd79fb71f9b702 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 13:36:35 +0800 Subject: [PATCH 15/20] fix(stream): set the correct updated nodeId. --- source/libs/stream/src/streamMeta.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e6bbd89f02..042ff1d1d8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -952,19 +952,20 @@ void metaHbToMnode(void* param, void* tmrId) { taosThreadMutexLock(&(*pTask)->lock); int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList); for (int j = 0; j < num; ++j) { - int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); + SDownstreamTaskEpset* pTaskEpset = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); bool exist = false; int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes); for (int k = 0; k < numOfExisted; ++k) { - if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { + if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { exist = true; break; } } if (!exist) { - taosArrayPush(hbMsg.pUpdateNodes, pNodeId); + taosArrayPush(hbMsg.pUpdateNodes, &pTaskEpset->nodeId); + stDebug("vgId:%d nodeId:%d added into the update list", pMeta->vgId, pTaskEpset->nodeId); } } From 5fcef5bd895ba0b570b28c2c5994bb6891c5d5b8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 14:36:18 +0800 Subject: [PATCH 16/20] refactor:add some logs. --- include/libs/stream/tstream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 1 + source/libs/stream/src/streamMeta.c | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9d32912ece..4b760f3f4e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -461,7 +461,7 @@ typedef struct STaskStartInfo { int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed - int32_t elapsedTime; + int64_t elapsedTime; } STaskStartInfo; typedef struct STaskUpdateInfo { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0cdb180645..e589088c35 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2670,6 +2670,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } + mDebug("%d stream nodes needs updated", (int32_t) taosArrayGetSize(req.pUpdateNodes)); setNodeEpsetExpiredFlag(req.pUpdateNodes); for (int32_t i = 0; i < req.numOfTasks; ++i) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 042ff1d1d8..fe157aaa24 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -965,7 +965,8 @@ void metaHbToMnode(void* param, void* tmrId) { if (!exist) { taosArrayPush(hbMsg.pUpdateNodes, &pTaskEpset->nodeId); - stDebug("vgId:%d nodeId:%d added into the update list", pMeta->vgId, pTaskEpset->nodeId); + stDebug("vgId:%d nodeId:%d added into the update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, + (int32_t)taosArrayGetSize(hbMsg.pUpdateNodes)); } } From 15430f4d50703d90611ec84e016feba4f8fc92ff Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 15:30:21 +0800 Subject: [PATCH 17/20] refactor: --- source/dnode/mnode/impl/src/mndStream.c | 11 +++-- source/libs/stream/src/streamMeta.c | 56 ++++++++++++++----------- source/libs/stream/src/streamStart.c | 2 +- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e589088c35..a537b4e501 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2610,16 +2610,18 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { int32_t num = taosArrayGetSize(pNodeList); + mInfo("set node expired for %d nodes", num); for (int k = 0; k < num; ++k) { int32_t* pVgId = taosArrayGet(pNodeList, k); + mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); for (int i = 0; i < numOfNodes; ++i) { SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); if (pNodeEntry->nodeId == *pVgId) { - mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId); + mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); pNodeEntry->stageUpdated = true; break; } @@ -2670,8 +2672,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } - mDebug("%d stream nodes needs updated", (int32_t) taosArrayGetSize(req.pUpdateNodes)); - setNodeEpsetExpiredFlag(req.pUpdateNodes); + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); + if (numOfUpdated > 0) { + mDebug("%d stream nodes needs updated from tasks' report", (int32_t)taosArrayGetSize(req.pUpdateNodes)); + setNodeEpsetExpiredFlag(req.pUpdateNodes); + } for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fe157aaa24..dfe5729b29 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -854,6 +854,37 @@ static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) { taosArrayDestroy(pIdList); } +static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { + int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes); + for (int k = 0; k < numOfExisted; ++k) { + if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) { + return true; + } + } + return false; +} + +static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { + SStreamMeta* pMeta = pTask->pMeta; + + taosThreadMutexLock(&pTask->lock); + + int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + for (int j = 0; j < num; ++j) { + SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j); + + bool exist = existInHbMsg(pMsg, pTaskEpset); + if (!exist) { + taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId); + stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, + (int32_t)taosArrayGetSize(pMsg->pUpdateNodes)); + } + } + + taosArrayClear(pTask->outputInfo.pDownstreamUpdateList); + taosThreadMutexUnlock(&pTask->lock); +} + void metaHbToMnode(void* param, void* tmrId) { int64_t rid = *(int64_t*)param; @@ -949,30 +980,7 @@ void metaHbToMnode(void* param, void* tmrId) { walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); } - taosThreadMutexLock(&(*pTask)->lock); - int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList); - for (int j = 0; j < num; ++j) { - SDownstreamTaskEpset* pTaskEpset = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); - - bool exist = false; - int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes); - for (int k = 0; k < numOfExisted; ++k) { - if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { - exist = true; - break; - } - } - - if (!exist) { - taosArrayPush(hbMsg.pUpdateNodes, &pTaskEpset->nodeId); - stDebug("vgId:%d nodeId:%d added into the update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, - (int32_t)taosArrayGetSize(hbMsg.pUpdateNodes)); - } - } - - taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList); - taosThreadMutexUnlock(&(*pTask)->lock); - + addUpdateNodeIntoHbMsg(*pTask, &hbMsg); taosArrayPush(hbMsg.pTaskStatus, &entry); if (!hasMnodeEpset) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index da4aa02e9c..0b2bf6b4ba 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1083,7 +1083,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { - pStartInfo->readyTs = pTask->execInfo.start; + pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64 From e957e4ad5f5c059ea55f1eec0b4db13cf8ab9aec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 15:39:55 +0800 Subject: [PATCH 18/20] refactor: update the node change check duration. --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d12ebb13c2..142f7f8078 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -240,7 +240,7 @@ int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; -int32_t tsStreamNodeCheckInterval = 30; +int32_t tsStreamNodeCheckInterval = 15; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups From 3bacd7516e09d32e5df94e3054a706fc6a0884fe Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 16:24:20 +0800 Subject: [PATCH 19/20] fix(stream): extact stream nodes list if not initialized. --- source/dnode/mnode/impl/src/mndStream.c | 85 ++++++++++++++----------- source/libs/stream/src/streamMeta.c | 2 +- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a537b4e501..0362b328ae 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -43,7 +43,7 @@ typedef struct SNodeEntry { } SNodeEntry; typedef struct SStreamExecInfo { - SArray *pNodeEntryList; + SArray *pNodeList; int64_t ts; // snapshot ts int64_t activeCheckpoint; // active check point id SHashObj *pTaskMap; @@ -850,7 +850,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); taosThreadMutexLock(&execInfo.lock); - mDebug("register to stream task node list"); + mDebug("stream tasks register into node list"); keepStreamTasksInBuf(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); @@ -1125,6 +1125,15 @@ static const char *mndGetStreamDB(SMnode *pMnode) { return p; } +static int32_t initStreamNodeList(SMnode* pMnode) { + if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) { + execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); + execInfo.pNodeList = extractNodeListFromStream(pMnode); + } + + return taosArrayGetSize(execInfo.pNodeList); +} + static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1135,22 +1144,18 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { { // check if the node update happens or not int64_t ts = taosGetTimestampSec(); - if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) { - if (execInfo.pNodeEntryList != NULL) { - execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); - } + taosThreadMutexLock(&execInfo.lock); + int32_t numOfNodes = initStreamNodeList(pMnode); + taosThreadMutexUnlock(&execInfo.lock); - execInfo.pNodeEntryList = extractNodeListFromStream(pMnode); - } - - if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) { + if (numOfNodes == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); execInfo.ts = ts; return 0; } - for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); + for(int32_t i = 0; i < numOfNodes; ++i) { + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i); if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); return 0; @@ -1165,7 +1170,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { return 0; } - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); @@ -2080,20 +2085,21 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) { break; } - SNodeEntry entry = {0}; + SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime}; entry.epset = mndGetVgroupEpset(pMnode, pVgroup); - entry.nodeId = pVgroup->vgId; - entry.hbTimestamp = pVgroup->updateTime; + // if not all ready till now, no need to check the remaining vgroups. if (*allReady) { for (int32_t i = 0; i < pVgroup->replica; ++i) { if (!pVgroup->vnodeGid[i].syncRestore) { + mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId); *allReady = false; break; } ESyncState state = pVgroup->vnodeGid[i].syncState; if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) { + mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId); *allReady = false; break; } @@ -2300,8 +2306,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { int32_t size = taosArrayGetSize(pNodeSnapshot); SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); - for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { - SNodeEntry* p = taosArrayGet(execInfo.pNodeEntryList, i); + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) { + SNodeEntry* p = taosArrayGet(execInfo.pNodeList, i); for(int32_t j = 0; j < size; ++j) { SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j); @@ -2312,8 +2318,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { } } - execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); - execInfo.pNodeEntryList = pValidNodeEntryList; + taosArrayDestroy(execInfo.pNodeList); + execInfo.pNodeList = pValidNodeEntryList; mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList)); taosArrayDestroy(pRemovedTasks); @@ -2323,6 +2329,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; + int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); if (old != 0) { mDebug("still in checking node change"); @@ -2333,23 +2340,21 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int64_t ts = taosGetTimestampSec(); SMnode *pMnode = pMsg->info.node; - if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) { - if (execInfo.pNodeEntryList != NULL) { - execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); - } - execInfo.pNodeEntryList = extractNodeListFromStream(pMnode); - } - if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) { + taosThreadMutexLock(&execInfo.lock); + int32_t numOfNodes = initStreamNodeList(pMnode); + taosThreadMutexUnlock(&execInfo.lock); + + if (numOfNodes == 0) { mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); execInfo.ts = ts; atomic_store_32(&mndNodeCheckSentinel, 0); return 0; } - bool allVnodeReady = true; - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady); - if (!allVnodeReady) { + bool allVgroupsReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVgroupsReady); + if (!allVgroupsReady) { taosArrayDestroy(pNodeSnapshot); atomic_store_32(&mndNodeCheckSentinel, 0); mWarn("not all vnodes are ready, ignore the exec nodeUpdate check"); @@ -2359,7 +2364,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { taosThreadMutexLock(&execInfo.lock); removeExpirednodeEntryAndTask(pNodeSnapshot); - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { // kill current active checkpoint transaction, since the transaction is vnode wide. @@ -2369,8 +2374,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { // keep the new vnode snapshot if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("create trans successfully, update cached node list"); - taosArrayDestroy(execInfo.pNodeEntryList); - execInfo.pNodeEntryList = pNodeSnapshot; + taosArrayDestroy(execInfo.pNodeList); + execInfo.pNodeList = pNodeSnapshot; execInfo.ts = ts; } else { mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code)); @@ -2616,9 +2621,9 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { int32_t* pVgId = taosArrayGet(pNodeList, k); mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); - int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int i = 0; i < numOfNodes; ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i); if (pNodeEntry->nodeId == *pVgId) { mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); @@ -2632,9 +2637,9 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { } static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { - int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for(int32_t j = 0; j < numOfNodes; ++j) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j); if (pNodeEntry->nodeId == pTaskEntry->nodeId) { mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, @@ -2667,14 +2672,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execInfo.lock); + + // extract stream task list int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap); if (numOfExisted == 0) { doExtractTasksFromStream(pMnode); } + initStreamNodeList(pMnode); + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { - mDebug("%d stream nodes needs updated from tasks' report", (int32_t)taosArrayGetSize(req.pUpdateNodes)); + mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId); setNodeEpsetExpiredFlag(req.pUpdateNodes); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index dfe5729b29..f364ed889d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1020,7 +1020,7 @@ void metaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbCount += 1; - stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, + stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, pMeta->pHbInfo->hbCount); tmsgSendReq(&epset, &msg); } else { From 4ff8907b5817af954a871e0b86b07c75f284d886 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 Nov 2023 23:05:20 +0800 Subject: [PATCH 20/20] fix: fix syntax error. --- source/common/src/tglobal.c | 14 +++++++------- source/libs/stream/src/streamMeta.c | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 76d8a0476e..d2e4e7b845 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -525,13 +525,13 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "enableCoreFile", 1, CFG_SCOPE_BOTH, CFG_DYN_CLIENT) != 0) return -1; if (cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; - if (cfgAddBool(pCfg, "ssd42", tsSSE42Enable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "avx", tsAVXEnable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "avx2", tsAVX2Enable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "fma", tsFMAEnable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "avx512", tsAVX512Enable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "simdEnable", tsSIMDEnable, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "ssd42", tsSSE42Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "avx", tsAVXEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "avx2", tsAVX2Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "fma", tsFMAEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "avx512", tsAVX512Enable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "simdEnable", tsSIMDEnable, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; #if !defined(_ALPINE) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 6b82e6683c..7013b43a6f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -194,7 +194,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->chkpDirLock); pMeta->chkpId = streamMetaGetLatestCheckpointId(pMeta); - pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); while (pMeta->streamBackend == NULL) { taosMsleep(100); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, vgId);