From d50c72d2f4236f2005b6234d6f7746c9dd5f6cf4 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Sat, 14 Sep 2024 10:03:23 +0800 Subject: [PATCH 01/16] fix(stmt2/settbtags): fix memleak & can not set tags issue --- source/client/src/clientStmt2.c | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientStmt2.c b/source/client/src/clientStmt2.c index ad8fc52b95..a1294b847b 100644 --- a/source/client/src/clientStmt2.c +++ b/source/client/src/clientStmt2.c @@ -194,6 +194,12 @@ static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void pStmt->bInfo.tbSuid = pTableMeta->suid; pStmt->bInfo.tbVgId = pTableMeta->vgId; pStmt->bInfo.tbType = pTableMeta->tableType; + + if (!pStmt->bInfo.tagsCached) { + qDestroyBoundColInfo(pStmt->bInfo.boundTags); + taosMemoryFreeClear(pStmt->bInfo.boundTags); + } + pStmt->bInfo.boundTags = tags; pStmt->bInfo.tagsCached = false; tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName)); @@ -984,10 +990,6 @@ int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags) { return TSDB_CODE_SUCCESS; } - if (pStmt->bInfo.inExecCache) { - return TSDB_CODE_SUCCESS; - } - STableDataCxt** pDataBlock = (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); if (NULL == pDataBlock) { @@ -995,6 +997,10 @@ int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags) { STMT_ERR_RET(TSDB_CODE_APP_ERROR); } + if (pStmt->bInfo.inExecCache && (!pStmt->sql.autoCreateTbl || (*pDataBlock)->pData->pCreateTbReq)) { + return TSDB_CODE_SUCCESS; + } + tscDebug("start to bind stmt tag values"); STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName, pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf, From 1b48080a94c470548645fad9e2a05f6fc5d83ccc Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Sat, 14 Sep 2024 11:47:20 +0800 Subject: [PATCH 02/16] fix: os return --- include/os/osSocket.h | 1 - source/client/src/clientImpl.c | 2 +- source/dnode/mnode/impl/src/mndUser.c | 3 +- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 2 +- source/libs/function/src/udfd.c | 5 ++- source/libs/transport/src/thttp.c | 2 +- source/libs/transport/src/transCli.c | 6 ++-- source/os/src/osSignal.c | 20 ++++++++--- source/os/src/osSocket.c | 38 ++++++++------------- source/os/src/osString.c | 12 +++---- 10 files changed, 48 insertions(+), 43 deletions(-) diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 48478c8f49..5ee3f30ddf 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -165,7 +165,6 @@ int32_t taosGetFqdn(char *); void tinet_ntoa(char *ipstr, uint32_t ip); uint32_t ip2uint(const char *const ip_addr); int32_t taosIgnSIGPIPE(); -uint32_t taosInetAddr(const char *ipAddr); const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len); uint64_t taosHton64(uint64_t val); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5fba447c8a..6c98714233 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2043,7 +2043,7 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) { int32_t idx = -1; iconv_t conv = taosAcquireConv(&idx, C2M); - if (!conv) return TSDB_CODE_TSC_INTERNAL_ERROR; + if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR; for (int32_t i = 0; i < numOfCols; ++i) { int32_t type = pResultInfo->fields[i].type; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 7035a9c9c8..8ca3d59868 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -342,7 +342,8 @@ int32_t mndUpdateIpWhiteImpl(SHashObj *pIpWhiteTab, char *user, char *fqdn, int8 SIpV4Range range = {.ip = 0, .mask = 32}; int32_t code = taosGetIpv4FromFqdn(fqdn, &range.ip); if (code) { - //TODO + mError("failed to get ip from fqdn: %s at line %d since %s", fqdn, lino, tstrerror(code)); + TAOS_RETURN(TSDB_CODE_TSC_INVALID_FQDN); } mDebug("ip-white-list may update for user: %s, fqdn: %s", user, fqdn); SIpWhiteList **ppList = taosHashGet(pIpWhiteTab, user, strlen(user)); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 72f2052867..51109a616b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -243,7 +243,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id code = metaGetTbTSchemaNotNull(p->pVnode->pMeta, suid, -1, 1, &p->pSchema); if (TSDB_CODE_SUCCESS != code) { tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr); - if(code != TSDB_CODE_OUT_OF_MEMORY) { + if(code == TSDB_CODE_NOT_FOUND) { return TSDB_CODE_PAR_TABLE_NOT_EXIST; } else { return code; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 6ae718c101..6554c5bdf5 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -85,7 +85,10 @@ int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfNa char mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char *mergeSuffix = "_merge"; snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix); - (void)(uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc))); + int ret = uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); + if (ret != 0) { + fnInfo("uv_dlsym function %s. error: %s", mergeFuncName, uv_strerror(ret)); + } return 0; } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index e0e2a0c323..f62d728511 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -616,7 +616,7 @@ static void httpHandleReq(SHttpMsg* msg) { int32_t fd = taosCreateSocketWithTimeout(5000); if (fd < 0) { tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr, - cli->port, chanId, cli->seq, tstrerror(TAOS_SYSTEM_ERROR(errno))); + cli->port, chanId, cli->seq, tstrerror(terrno)); destroyHttpClient(cli); (void)taosReleaseRef(httpRefMgt, chanId); return; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a8f1d26f15..087e82d0ec 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1403,7 +1403,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, - tstrerror(TAOS_SYSTEM_ERROR(errno))); + tstrerror(terrno)); cliHandleFastFail(conn, -1); return; } @@ -1883,9 +1883,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, - tstrerror(TAOS_SYSTEM_ERROR(errno))); + tstrerror(terrno)); cliHandleExcept(conn, -1); - errno = 0; + terrno = 0; return; } diff --git a/source/os/src/osSignal.c b/source/os/src/osSignal.c index 7060b408ca..f341f5422d 100644 --- a/source/os/src/osSignal.c +++ b/source/os/src/osSignal.c @@ -31,23 +31,35 @@ int32_t taosSetSignal(int32_t signum, FSignalHandler sigfp) { // SIGHUP doesn't exist in windows, we handle it in the way of ctrlhandler if (signum == SIGHUP) { - SetConsoleCtrlHandler((PHANDLER_ROUTINE)sigfp, TRUE); + if(SetConsoleCtrlHandler((PHANDLER_ROUTINE)sigfp, TRUE) == 0) { + terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); + return terrno; + } } else { - signal(signum, (FWinSignalHandler)sigfp); + if(signal(signum, (FWinSignalHandler)sigfp) == SIG_ERR) { + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; + } } return 0; } int32_t taosIgnSignal(int32_t signum) { if (signum == SIGUSR1 || signum == SIGHUP) return 0; - signal(signum, SIG_IGN); + if(signal(signum, SIG_IGN) == SIG_ERR) { + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; + } return 0; } int32_t taosDflSignal(int32_t signum) { if (signum == SIGUSR1 || signum == SIGHUP) return 0; - signal(signum, SIG_DFL); + if(signal(signum, SIG_DFL) == SIG_ERR) { + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; + } return 0; } diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 851615fb7f..2065ba68fe 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -300,14 +300,18 @@ int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void } #endif - return setsockopt(pSocket->fd, level, optname, optval, optlen); +int ret = setsockopt(pSocket->fd, level, optname, optval, optlen); +if (ret == SOCKET_ERROR) { + int errorCode = WSAGetLastError(); + return terrno = TAOS_SYSTEM_WINSOCKET_ERROR(errorCode); +} #else int32_t code = setsockopt(pSocket->fd, level, optname, optval, (int)optlen); if (-1 == code) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; } - return code; + return 0; #endif } @@ -325,19 +329,6 @@ int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void #endif -uint32_t taosInetAddr(const char *ipAddr) { -#ifdef WINDOWS - uint32_t value; - int32_t ret = inet_pton(AF_INET, ipAddr, &value); - if (ret <= 0) { - return INADDR_NONE; - } else { - return value; - } -#else - return inet_addr(ipAddr); -#endif -} const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len) { const char* r = inet_ntop(AF_INET, &ipInt, dstStr, len); if (NULL == r) { @@ -943,8 +934,7 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) { int iResult; iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); if (iResult != 0) { - // printf("WSAStartup failed: %d\n", iResult); - return 0xFFFFFFFF; + return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError()); } #endif @@ -1008,7 +998,7 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) { #endif *ip = 0xFFFFFFFF; - return 0xFFFFFFFF; + return TSDB_CODE_RPC_FQDN_ERROR; } #endif } @@ -1021,7 +1011,7 @@ int32_t taosGetFqdn(char *fqdn) { iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); if (iResult != 0) { // printf("WSAStartup failed: %d\n", iResult); - return 1; + return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError()); } #endif char hostname[1024]; @@ -1077,8 +1067,8 @@ int32_t taosGetFqdn(char *fqdn) { int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); if (!result) { - fprintf(stderr, "failed to get fqdn, code:%d, hostname:%s, reason:%s\n", ret, hostname, gai_strerror(ret)); - return -1; + //fprintf(stderr, "failed to get fqdn, code:%d, hostname:%s, reason:%s\n", ret, hostname, gai_strerror(ret)); + return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError()); } strcpy(fqdn, result->ai_canonname); freeaddrinfo(result); @@ -1140,13 +1130,13 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) { if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) { terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + return terrno; } #if defined(WINDOWS) if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) { taosCloseSocketNoCheck1(fd); - return -1; + return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError()); } #elif defined(_TD_DARWIN_64) // invalid config @@ -1160,7 +1150,7 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) { if (-1 == setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { terrno = TAOS_SYSTEM_ERROR(errno); TAOS_SKIP_ERROR(taosCloseSocketNoCheck1(fd)); - return -1; + return terrno; } #endif diff --git a/source/os/src/osString.c b/source/os/src/osString.c index ffc64f3493..73ddeece3d 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -216,14 +216,14 @@ int32_t taosConvInit(void) { for (int32_t i = 0; i < gConvMaxNum[M2C]; ++i) { gConv[M2C][i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); - if ((iconv_t)-1 == gConv[M2C][i].conv || (iconv_t)0 == gConv[M2C][i].conv) { + if ((iconv_t)-1 == gConv[M2C][i].conv) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; } } for (int32_t i = 0; i < gConvMaxNum[1 - M2C]; ++i) { gConv[1 - M2C][i].conv = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); - if ((iconv_t)-1 == gConv[1 - M2C][i].conv || (iconv_t)0 == gConv[1 - M2C][i].conv) { + if ((iconv_t)-1 == gConv[1 - M2C][i].conv) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; } @@ -251,13 +251,13 @@ iconv_t taosAcquireConv(int32_t *idx, ConvType type) { *idx = -1; if (type == M2C) { iconv_t c = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); - if ((iconv_t)-1 == c || (iconv_t)0 == c) { + if ((iconv_t)-1 == c) { terrno = TAOS_SYSTEM_ERROR(errno); } return c; } else { iconv_t c = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC); - if ((iconv_t)-1 == c || (iconv_t)0 == c) { + if ((iconv_t)-1 == c) { terrno = TAOS_SYSTEM_ERROR(errno); } return c; @@ -312,7 +312,7 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4 int32_t idx = -1; iconv_t conv = taosAcquireConv(&idx, M2C); - if ((iconv_t)-1 == conv || (iconv_t)0 == conv) { + if ((iconv_t)-1 == conv) { return false; } @@ -350,7 +350,7 @@ int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) { int32_t idx = -1; int32_t code = 0; iconv_t conv = taosAcquireConv(&idx, C2M); - if ((iconv_t)-1 == conv || (iconv_t)0 == conv) { + if ((iconv_t)-1 == conv) { return TSDB_CODE_APP_ERROR; } From 6e167a347bc186b1f8cab988a73dba194c0eb400 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Sat, 14 Sep 2024 13:53:26 +0800 Subject: [PATCH 03/16] fix: check conv --- source/os/src/osString.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/os/src/osString.c b/source/os/src/osString.c index d3393f930f..68392b5050 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -289,7 +289,11 @@ iconv_t taosAcquireConv(int32_t *idx, ConvType type) { } *idx = startId; - return gConv[type][startId].conv; + if ((iconv_t)0 == gConv[type][startId].conv) { + return (iconv_t)-1; + } else { + return gConv[type][startId].conv; + } } void taosReleaseConv(int32_t idx, iconv_t conv, ConvType type) { From c789909579695596ca981df61f828ae136eff013 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Sat, 14 Sep 2024 15:52:30 +0800 Subject: [PATCH 04/16] fix(stmt2/gettags): return zero tags for normal tables --- source/libs/parser/src/parInsertStmt.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index b67b7fc583..57f939d6d6 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -892,11 +892,11 @@ int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TA if (NULL == tags) { return TSDB_CODE_APP_ERROR; } - + /* if (pDataBlock->pMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pMeta->tableType != TSDB_CHILD_TABLE) { return TSDB_CODE_TSC_STMT_API_ERROR; } - + */ SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta); if (tags->numOfBound <= 0) { *fieldNum = 0; From ae9eaf9d1a9b11f6dc16c3e75d78433d26ebd4fb Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Sat, 14 Sep 2024 16:14:56 +0800 Subject: [PATCH 05/16] osSys --- include/os/osSysinfo.h | 2 +- source/os/src/osSysinfo.c | 48 +++++++++++++++++++++++++++------------ 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index a6d8a6502f..fb6acf2a0e 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -80,7 +80,7 @@ typedef struct { SysNameInfo taosGetSysNameInfo(); bool taosCheckCurrentInDll(); -int taosGetlocalhostname(char *hostname, size_t maxLen); +int32_t taosGetlocalhostname(char *hostname, size_t maxLen); #ifdef __cplusplus } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index fc6296bc04..4d97232806 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -93,6 +93,10 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) { path[len - 1] = 'p'; HANDLE file = CreateFile(path, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); + if (file == INVALID_HANDLE_VALUE) { + FreeLibrary(dll); + return EXCEPTION_CONTINUE_SEARCH; + } MINIDUMP_EXCEPTION_INFORMATION mei; mei.ThreadId = GetCurrentThreadId(); @@ -333,7 +337,10 @@ bool getWinVersionReleaseName(char *releaseName, int32_t maxLen) { UINT uLen; VS_FIXEDFILEINFO *pFileInfo; - GetWindowsDirectory(szFileName, MAX_PATH); + int ret = GetWindowsDirectory(szFileName, MAX_PATH); + if (ret == 0) { + return false; + } wsprintf(szFileName, L"%s%s", szFileName, L"\\explorer.exe"); dwLen = GetFileVersionInfoSize(szFileName, &dwHandle); if (dwLen == 0) { @@ -373,12 +380,12 @@ int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t if(sName) snprintf(sName, maxLen, "macOS"); if (sysctl(osversion_name, 2, osversion, &osversion_len, NULL, 0) == -1) { - return -1; + return TAOS_SYSTEM_ERROR(errno); } uint32_t major, minor; - if (sscanf(osversion, "%u.%u", &major, &minor) != 2) { - return -1; + if (sscanf(osversion, "%u.%u", &major, &minor) == EOF) { + return TAOS_SYSTEM_ERROR(errno); } if (major >= 20) { major -= 9; // macOS 11 and newer @@ -433,8 +440,11 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) { #ifdef WINDOWS char value[100]; DWORD bufferSize = sizeof(value); - RegGetValue(HKEY_LOCAL_MACHINE, "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0", "ProcessorNameString", + LSTATUS ret = RegGetValue(HKEY_LOCAL_MACHINE, "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0", "ProcessorNameString", RRF_RT_ANY, NULL, (PVOID)&value, &bufferSize); + if (ret != ERROR_SUCCESS) { + return TAOS_SYSTEM_ERROR(ret); + } tstrncpy(cpuModel, value, maxLen); SYSTEM_INFO si; memset(&si, 0, sizeof(SYSTEM_INFO)); @@ -696,7 +706,7 @@ int32_t taosGetTotalMemory(int64_t *totalKB) { MEMORYSTATUSEX memsStat; memsStat.dwLength = sizeof(memsStat); if (!GlobalMemoryStatusEx(&memsStat)) { - return -1; + return TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); } *totalKB = memsStat.ullTotalPhys / 1024; @@ -705,6 +715,9 @@ int32_t taosGetTotalMemory(int64_t *totalKB) { return 0; #else *totalKB = (int64_t)(sysconf(_SC_PHYS_PAGES) * tsPageSizeKB); + if(*totalKB <= 0) { + return TAOS_SYSTEM_ERROR(errno); + } return 0; #endif } @@ -760,7 +773,7 @@ int32_t taosGetSysMemory(int64_t *usedKB) { MEMORYSTATUSEX memsStat; memsStat.dwLength = sizeof(memsStat); if (!GlobalMemoryStatusEx(&memsStat)) { - return -1; + return TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); } int64_t nMemFree = memsStat.ullAvailPhys / 1024; @@ -773,6 +786,9 @@ int32_t taosGetSysMemory(int64_t *usedKB) { return 0; #else *usedKB = sysconf(_SC_AVPHYS_PAGES) * tsPageSizeKB; + if(*usedKB <= 0) { + return TAOS_SYSTEM_ERROR(errno); + } return 0; #endif } @@ -832,7 +848,7 @@ int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int if (write_bytes) *write_bytes = 0; return 0; } - return -1; + return TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); #elif defined(_TD_DARWIN_64) if (rchars) *rchars = 0; if (wchars) *wchars = 0; @@ -1022,7 +1038,10 @@ void taosKillSystem() { int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { #ifdef WINDOWS GUID guid; - CoCreateGuid(&guid); + HRESULT h = CoCreateGuid(&guid); + if (h != S_OK) { + return TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); + } snprintf(uid, uidlen, "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X", guid.Data1, guid.Data2, guid.Data3, guid.Data4[0], guid.Data4[1], guid.Data4[2], guid.Data4[3], guid.Data4[4], guid.Data4[5], guid.Data4[6], guid.Data4[7]); @@ -1226,14 +1245,13 @@ SysNameInfo taosGetSysNameInfo() { } char localHostName[512]; - taosGetlocalhostname(localHostName, 512); + TAOS_SKIP_ERROR(taosGetlocalhostname(localHostName, 512)); TdCmdPtr pCmd = taosOpenCmd("scutil --get LocalHostName"); tstrncpy(info.nodename, localHostName, sizeof(info.nodename)); return info; #else SysNameInfo info = {0}; - struct utsname uts; if (!uname(&uts)) { tstrncpy(info.sysname, uts.sysname, sizeof(info.sysname)); @@ -1269,7 +1287,7 @@ bool taosCheckCurrentInDll() { } #ifdef _TD_DARWIN_64 -int taosGetMaclocalhostnameByCommand(char *hostname, size_t maxLen) { +int32_t taosGetMaclocalhostnameByCommand(char *hostname, size_t maxLen) { TdCmdPtr pCmd = taosOpenCmd("scutil --get LocalHostName"); if (pCmd != NULL) { if (taosGetsCmd(pCmd, maxLen - 1, hostname) > 0) { @@ -1281,10 +1299,10 @@ int taosGetMaclocalhostnameByCommand(char *hostname, size_t maxLen) { } taosCloseCmd(&pCmd); } - return -1; + return TAOS_SYSTEM_ERROR(errno); } -int getMacLocalHostNameBySCD(char *hostname, size_t maxLen) { +int32_t getMacLocalHostNameBySCD(char *hostname, size_t maxLen) { SCDynamicStoreRef store = SCDynamicStoreCreate(NULL, CFSTR(""), NULL, NULL); CFStringRef hostname_cfstr = SCDynamicStoreCopyLocalHostName(store); if (hostname_cfstr != NULL) { @@ -1298,7 +1316,7 @@ int getMacLocalHostNameBySCD(char *hostname, size_t maxLen) { } #endif -int taosGetlocalhostname(char *hostname, size_t maxLen) { +int32_t taosGetlocalhostname(char *hostname, size_t maxLen) { #ifdef _TD_DARWIN_64 int res = getMacLocalHostNameBySCD(hostname, maxLen); if (res != 0) { From 045a04184406519537442bdc3f80c71b7c8cbf85 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Sat, 14 Sep 2024 16:25:07 +0800 Subject: [PATCH 06/16] fix: osSys --- source/os/src/osSystem.c | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/source/os/src/osSystem.c b/source/os/src/osSystem.c index a8a9ff681b..fe52369a53 100644 --- a/source/os/src/osSystem.c +++ b/source/os/src/osSystem.c @@ -31,7 +31,7 @@ void WINAPI windowsServiceCtrlHandle(DWORD request) { ServiceStatus.dwCurrentState = SERVICE_STOP_PENDING; if (!SetServiceStatus(hServiceStatusHandle, &ServiceStatus)) { DWORD nError = GetLastError(); - printf("failed to send stopped status to windows service: %d", nError); + fprintf(stderr, "failed to send stopped status to windows service: %d", nError); } break; default: @@ -50,19 +50,19 @@ void WINAPI mainWindowsService(int argc, char** argv) { hServiceStatusHandle = RegisterServiceCtrlHandler("taosd", &windowsServiceCtrlHandle); if (hServiceStatusHandle == 0) { DWORD nError = GetLastError(); - printf("failed to register windows service ctrl handler: %d", nError); + fprintf(stderr, "failed to register windows service ctrl handler: %d", nError); } ServiceStatus.dwCurrentState = SERVICE_RUNNING; if (SetServiceStatus(hServiceStatusHandle, &ServiceStatus)) { DWORD nError = GetLastError(); - printf("failed to send running status to windows service: %d", nError); + fprintf(stderr, "failed to send running status to windows service: %d", nError); } if (mainWindowsFunc != NULL) mainWindowsFunc(argc, argv); ServiceStatus.dwCurrentState = SERVICE_STOPPED; if (!SetServiceStatus(hServiceStatusHandle, &ServiceStatus)) { DWORD nError = GetLastError(); - printf("failed to send stopped status to windows service: %d", nError); + fprintf(stderr, "failed to send stopped status to windows service: %d", nError); } } void stratWindowsService(MainWindows mainWindows) { @@ -140,17 +140,27 @@ void taosCloseDll(void* handle) { } #endif -int taosSetConsoleEcho(bool on) { +int32_t taosSetConsoleEcho(bool on) { #if defined(WINDOWS) HANDLE hStdin = GetStdHandle(STD_INPUT_HANDLE); + if (hStdin == INVALID_HANDLE_VALUE) { + terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); + return terrno; + } DWORD mode = 0; - GetConsoleMode(hStdin, &mode); + if(!GetConsoleMode(hStdin, &mode)){ + terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); + return terrno; + } if (on) { mode |= ENABLE_ECHO_INPUT; } else { mode &= ~ENABLE_ECHO_INPUT; } - SetConsoleMode(hStdin, mode); + if(!SetConsoleMode(hStdin, mode)) { + terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError()); + return terrno; + } return 0; #else From 1ccc74d768ceb74ffd1b0299fc1df4cfa307d07e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 14 Sep 2024 17:28:24 +0800 Subject: [PATCH 07/16] fix: add memory alloc validation --- source/libs/catalog/src/ctgCache.c | 3 +++ source/libs/catalog/src/ctgDbg.c | 4 ++++ source/libs/executor/src/mergejoinoperator.c | 3 +++ source/libs/scheduler/src/schRemote.c | 6 ++++++ 4 files changed, 16 insertions(+) diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 0e54ac77a2..9a72a6d89f 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -3710,6 +3710,9 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg } *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo)); + if (NULL == *pVgroup) { + CTG_ERR_JRET(terrno); + } CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, NULL, dbCache->vgCache.vgInfo, pTableName, *pVgroup)); _return: diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index a95df06307..0ac606d4d6 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -291,6 +291,10 @@ int32_t ctgdHandleDbgCommand(char *command) { } char *dup = taosStrdup(command); + if (NULL == dup) { + CTG_RET(terrno); + } + char *option = NULL; char *param = NULL; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 4715aa8b96..af5e4ed235 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -149,6 +149,9 @@ int32_t mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* len = varDataTLen(p1); } char* p2 = taosMemoryMalloc(len); + if (NULL == p2) { + MJ_ERR_RET(terrno); + } TAOS_MEMCPY(p2, p1, len); code = colDataSetVal(pDst, numOfRows, p2, false); if (code) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 40391cea7e..7ff3d1386c 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -267,6 +267,9 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD if (pMsg->pData) { SDecoder coder = {0}; SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp)); + if (NULL == rsp) { + SCH_ERR_JRET(terrno); + } tDecoderInit(&coder, pMsg->pData, msgSize); code = tDecodeSSubmitRsp2(&coder, rsp); tDecoderClear(&coder); @@ -957,6 +960,9 @@ int32_t schUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, SQueryNodeAddr *addr pMsgSendInfo->target.type = TARGET_TYPE_VNODE; pMsgSendInfo->target.vgId = addr->nodeId; pMsgSendInfo->target.dbFName = taosStrdup(pTask->plan->dbFName); + if (NULL == pMsgSendInfo->target.dbFName) { + return terrno; + } } return TSDB_CODE_SUCCESS; From f684e12c27b46d750884da2c411b9b1e3638a92b Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Sat, 14 Sep 2024 18:11:36 +0800 Subject: [PATCH 08/16] pointer check --- source/client/src/clientImpl.c | 4 ++ source/client/src/clientRawBlockWrite.c | 8 ++++ source/libs/catalog/src/catalog.c | 4 ++ source/libs/catalog/src/ctgAsync.c | 4 ++ source/libs/command/src/command.c | 4 ++ source/libs/executor/src/sysscanoperator.c | 4 ++ source/libs/function/src/tudf.c | 3 ++ source/libs/function/src/udfd.c | 9 ++++ source/libs/qcom/src/queryUtil.c | 14 ++++++ source/libs/qcom/src/querymsg.c | 51 ++++++++++++++++++++++ 10 files changed, 105 insertions(+) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5fba447c8a..5ba3a9f786 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2240,6 +2240,10 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int } else if (tTagIsJson(data)) { char* jsonString = NULL; parseTagDatatoJson(data, &jsonString); + if(jsonString == NULL) { + tscError("doConvertJson error: parseTagDatatoJson failed"); + return terrno; + } STR_TO_VARSTR(dst, jsonString); taosMemoryFree(jsonString); } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value" diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 43e17b381b..8a888a2a47 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -417,6 +417,10 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { } char* pJson = NULL; parseTagDatatoJson(pTag, &pJson); + if(pJson == NULL) { + uError("parseTagDatatoJson failed, pJson == NULL"); + goto end; + } cJSON* tag = cJSON_CreateObject(); RAW_NULL_CHECK(tag); STagVal* pTagVal = taosArrayGet(pTagVals, 0); @@ -727,6 +731,10 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { goto end; } parseTagDatatoJson(vAlterTbReq.pTagVal, &buf); + if(buf == NULL) { + uError("parseTagDatatoJson failed, buf == NULL"); + goto end; + } } else { if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) { buf = taosMemoryCalloc(vAlterTbReq.nTagVal * 2 + 2 + 3, 1); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index dff607f464..46094fd713 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -448,6 +448,10 @@ int32_t ctgGetTbTag(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, char* pJson = NULL; parseTagDatatoJson(pTag, &pJson); + if(NULL == pJson) { + taosArrayDestroy(pTagVals); + CTG_ERR_JRET(terrno); + } STagVal tagVal; tagVal.cid = 0; tagVal.type = TSDB_DATA_TYPE_JSON; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 9940474891..520c715b01 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2093,6 +2093,10 @@ int32_t ctgHandleGetTbTagRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* char* pJson = NULL; parseTagDatatoJson(pTag, &pJson); + if (NULL == pJson) { + taosArrayDestroy(pTagVals); + CTG_ERR_JRET(terrno); + } STagVal tagVal; tagVal.cid = 0; tagVal.type = TSDB_DATA_TYPE_JSON; diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 2f756e8121..eb74b81141 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -565,6 +565,10 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) { if (tTagIsJson(pTag)) { char* pJson = NULL; parseTagDatatoJson(pTag, &pJson); + if(NULL == pJson) { + qError("failed to parse tag to json, pJson is NULL"); + return terrno; + } *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson); taosMemoryFree(pJson); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index bf1153f412..18344d9e79 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1114,6 +1114,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, if (tagType == TSDB_DATA_TYPE_JSON) { char* tagJson = NULL; parseTagDatatoJson(tagData, &tagJson); + if (tagJson == NULL) { + code = terrno; + goto _end; + } tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE); QUERY_CHECK_NULL(tagVarChar, code, lino, _end, terrno); memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson)); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 0510b6cff5..c2cd8b8c3d 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -900,6 +900,9 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows); int32_t dataLen = udfCol->colData.fixLenCol.dataLen; udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen); + if (NULL == udfCol->colData.fixLenCol.data) { + return terrno; + } char *data = udfCol->colData.fixLenCol.data; memcpy(data, col->pData, dataLen); } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 6ae718c101..bef0d06501 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -92,6 +92,9 @@ int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfNa int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { int32_t err = 0; SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx)); + if (NULL == udfCtx) { + return terrno; + } err = uv_dlopen(udf->path, &udfCtx->lib); if (err != 0) { fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); @@ -606,6 +609,9 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) { SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); + if (NULL == udfNew) { + return terrno; + } udfNew->refCount = 1; udfNew->lastFetchTime = taosGetTimestampMs(); strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); @@ -1105,6 +1111,9 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { taosArrayDestroy(retrieveReq.pFuncNames); SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + if(NULL == msgInfo) { + return terrno; + } msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; msgInfo->param = udf; if(uv_sem_init(&msgInfo->resultSem, 0) != 0) { diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 30902269c3..e08d4cef87 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -514,6 +514,9 @@ end: taosArrayDestroy(pTagVals); if (string == NULL) { string = taosStrdup(TSDB_DATA_NULL_STR_L); + if(string == NULL) { + qError("failed to strdup null string"); + } } *jsonStr = string; } @@ -629,6 +632,7 @@ int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) { (*pDst)->flags = pSrc->flags; if (pSrc->name) { (*pDst)->name = taosStrdup(pSrc->name); + if (NULL == (*pDst)->name) goto _exit; } (*pDst)->uid = pSrc->uid; (*pDst)->btime = pSrc->btime; @@ -636,21 +640,25 @@ int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) { (*pDst)->commentLen = pSrc->commentLen; if (pSrc->comment) { (*pDst)->comment = taosStrdup(pSrc->comment); + if (NULL == (*pDst)->comment) goto _exit; } (*pDst)->type = pSrc->type; if (pSrc->type == TSDB_CHILD_TABLE) { if (pSrc->ctb.stbName) { (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName); + if (NULL == (*pDst)->ctb.stbName) goto _exit; } (*pDst)->ctb.tagNum = pSrc->ctb.tagNum; (*pDst)->ctb.suid = pSrc->ctb.suid; if (pSrc->ctb.tagName) { (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL); + if (NULL == (*pDst)->ctb.tagName) goto _exit; } STag* pTag = (STag*)pSrc->ctb.pTag; if (pTag) { (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len); + if(NULL == (*pDst)->ctb.pTag) goto _exit; memcpy((*pDst)->ctb.pTag, pTag, pTag->len); } } else { @@ -658,11 +666,17 @@ int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) { (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols; if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) { (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema)); + if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit; memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema)); } } return TSDB_CODE_SUCCESS; + +_exit: + tdDestroySVCreateTbReq(*pDst); + taosMemoryFree(*pDst); + return terrno; } void freeDbCfgInfo(SDbCfgInfo* pInfo) { diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index b5b660a51b..8725496ed2 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -85,6 +85,9 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3 int32_t bufLen = tSerializeSTableInfoReq(NULL, 0, &infoReq); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSTableInfoReq(pBuf, bufLen, &infoReq) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -112,6 +115,9 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSUseDbReq(pBuf, bufLen, &usedbReq) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -133,6 +139,9 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t int32_t bufLen = tSerializeSQnodeListReq(NULL, 0, &qnodeListReq); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -154,6 +163,9 @@ int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t int32_t bufLen = tSerializeSDnodeListReq(NULL, 0, &dnodeListReq); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSDnodeListReq(pBuf, bufLen, &dnodeListReq) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -174,6 +186,9 @@ int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t int32_t bufLen = tSerializeSServerVerReq(NULL, 0, &req); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSServerVerReq(pBuf, bufLen, &req) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -195,6 +210,9 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -216,6 +234,9 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSUserIndexReq(pBuf, bufLen, &indexReq) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -247,8 +268,13 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3 int32_t bufLen = tSerializeSRetrieveFuncReq(NULL, 0, &funcReq); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + taosArrayDestroy(funcReq.pFuncNames); + return terrno; + } if(tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq) < 0) { + taosArrayDestroy(funcReq.pFuncNames); return TSDB_CODE_TSC_INVALID_INPUT; } @@ -270,6 +296,9 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32 int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if (tSerializeSGetUserAuthReq(pBuf, bufLen, &req) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -290,6 +319,9 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_ int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSTableIndexReq(pBuf, bufLen, &indexReq) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -314,6 +346,9 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSTableCfgReq(pBuf, bufLen, &cfgReq) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -335,6 +370,9 @@ int32_t queryBuildGetViewMetaMsg(void *input, char **msg, int32_t msgSize, int32 int32_t bufLen = tSerializeSViewMetaReq(NULL, 0, &req); void *pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeSViewMetaReq(pBuf, bufLen, &req) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -357,6 +395,9 @@ int32_t queryBuildGetTableTSMAMsg(void *input, char **msg, int32_t msgSize, int3 int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req); void * pBuf = (*mallcFp)(bufLen); + if (NULL == pBuf) { + return terrno; + } if(tSerializeTableTSMAInfoReq(pBuf, bufLen, &req) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -379,6 +420,10 @@ int32_t queryBuildGetTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t * int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req); void * pBuf = (*mallcFp)(bufLen); + if(pBuf == NULL) + { + return terrno; + } if(tSerializeTableTSMAInfoReq(pBuf, bufLen, &req) < 0) { return TSDB_CODE_TSC_INVALID_INPUT; @@ -396,6 +441,9 @@ int32_t queryBuildGetStreamProgressMsg(void* input, char** msg, int32_t msgSize, int32_t len = tSerializeStreamProgressReq(NULL, 0, input); void* pBuf = (*mallcFp)(len); + if (NULL == pBuf) { + return terrno; + } if(tSerializeStreamProgressReq(pBuf, len, input) < 0) { @@ -666,6 +714,9 @@ int32_t queryProcessGetSerVerRsp(void *output, char *msg, int32_t msgSize) { } *(char **)output = taosStrdup(out.ver); + if (NULL == *(char **)output) { + return terrno; + } return code; } From c4a49a7bd9098dc0a0f1127830c3f807dbacc58a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 15 Sep 2024 00:45:45 +0800 Subject: [PATCH 09/16] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckpoint.c | 80 +++++++++---------- source/libs/stream/src/streamMeta.c | 94 ----------------------- source/libs/stream/src/streamStartTask.c | 58 +++++++++++++- source/libs/stream/src/streamTask.c | 2 +- source/libs/stream/src/streamUtil.c | 51 ++++++++++++ 5 files changed, 148 insertions(+), 137 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 35d5ba4e08..1a8b5c1028 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -209,29 +209,18 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream return code; } -int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { - SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); - if (pDataBlock == NULL) { - return TSDB_CODE_INVALID_PARA; - } - - int64_t checkpointId = pDataBlock->info.version; - int32_t transId = pDataBlock->info.window.skey; - const char* id = pTask->id.idStr; - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = pTask->pMeta->vgId; - int32_t taskLevel = pTask->info.taskLevel; +static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock, + int32_t transId) { + int32_t code = 0; + int32_t vgId = pTask->pMeta->vgId; + int32_t taskLevel = pTask->info.taskLevel; + const char* id = pTask->id.idStr; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - - streamMutexLock(&pTask->lock); if (pTask->chkInfo.checkpointId > checkpointId) { stError("s-task:%s vgId:%d current checkpointId:%" PRId64 " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard", id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId); - streamMutexUnlock(&pTask->lock); - - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -239,37 +228,33 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64 " discard the checkpoint-trigger block", id, vgId, checkpointId, transId, pActiveInfo->failedId); - streamMutexUnlock(&pTask->lock); - - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } if (pTask->chkInfo.checkpointId == checkpointId) { { // send checkpoint-ready msg to upstream - SRpcMsg msg = {0}; + SRpcMsg msg = {0}; SStreamUpstreamEpInfo* pInfo = NULL; streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo); if (pInfo == NULL) { - streamMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg); if (code == TSDB_CODE_SUCCESS) { code = tmsgSendReq(&pInfo->epSet, &msg); + if (code) { + stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code)); + } } } stWarn( - "s-task:%s vgId:%d recv already finished checkpoint msg, send checkpoint-ready to upstream:0x%x to resume the " - "interrupted checkpoint", + "s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume " + "the interrupted checkpoint", id, vgId, pBlock->srcTaskId); streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); - streamMutexUnlock(&pTask->lock); - - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -278,9 +263,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 " discard", id, vgId, pActiveInfo->activeId, checkpointId); - streamMutexUnlock(&pTask->lock); - - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } else { // checkpointId == pActiveInfo->activeId if (pActiveInfo->allUpstreamTriggerRecv == 1) { @@ -288,8 +270,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, " "checkpointId:%" PRId64 " transId:%d", id, vgId, checkpointId, transId); - streamMutexUnlock(&pTask->lock); - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -298,7 +278,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) { STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i); if (p == NULL) { - streamMutexUnlock(&pTask->lock); return TSDB_CODE_INVALID_PARA; } @@ -306,9 +285,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 ", prev recvTs:%" PRId64 " discard", pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); - - streamMutexUnlock(&pTask->lock); - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } } @@ -316,7 +292,33 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock } } + return 0; +} + +int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { + int64_t checkpointId = 0; + int32_t transId = 0; + const char* id = pTask->id.idStr; + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pTask->pMeta->vgId; + int32_t taskLevel = pTask->info.taskLevel; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); + if (pDataBlock == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + checkpointId = pDataBlock->info.version; + transId = pDataBlock->info.window.skey; + + streamMutexLock(&pTask->lock); + code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId); streamMutexUnlock(&pTask->lock); + if (code) { + streamFreeQitem((SStreamQueueItem*)pBlock); + return code; + } stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64 ", transId:%d current active checkpointId:%" PRId64, @@ -367,6 +369,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // before the next checkpoint. code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); if (code) { + streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -675,10 +678,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } streamMetaWLock(pMeta); - - if (streamMetaCommit(pMeta) < 0) { - // persist to disk - } + code = streamMetaCommit(pMeta); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 514e25c689..d8249666c3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -458,9 +458,6 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, code = createMetaHbInfo(pRid, &pMeta->pHbInfo); TSDB_CHECK_CODE(code, lino, _err); - pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); - TSDB_CHECK_NULL(pMeta->qHandle, code, lino, _err, terrno); - code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); TSDB_CHECK_CODE(code, lino, _err); @@ -629,9 +626,6 @@ void streamMetaCloseImpl(void* arg) { taosMemoryFree(pMeta->path); streamMutexDestroy(&pMeta->backendMutex); - taosCleanUpScheduler(pMeta->qHandle); - taosMemoryFree(pMeta->qHandle); - bkdMgtDestroy(pMeta->bkdChkptMgt); pMeta->role = NODE_ROLE_UNINIT; @@ -1261,40 +1255,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) { streamMetaHbToMnode(pRid, NULL); } -void streamMetaRLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-rlock", pMeta->vgId); - int32_t code = taosThreadRwlockRdlock(&pMeta->lock); - if (code) { - stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); - } -} - -void streamMetaRUnLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-runlock", pMeta->vgId); - int32_t code = taosThreadRwlockUnlock(&pMeta->lock); - if (code != TSDB_CODE_SUCCESS) { - stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code)); - } else { - // stTrace("vgId:%d meta-runlock completed", pMeta->vgId); - } -} - -void streamMetaWLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-wlock", pMeta->vgId); - int32_t code = taosThreadRwlockWrlock(&pMeta->lock); - if (code) { - stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code)); - } -} - -void streamMetaWUnLock(SStreamMeta* pMeta) { - // stTrace("vgId:%d meta-wunlock", pMeta->vgId); - int32_t code = taosThreadRwlockUnlock(&pMeta->lock); - if (code) { - stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code)); - } -} - int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { QRY_PARAM_CHECK(pList); @@ -1398,60 +1358,6 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { return 0; } -int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t now = taosGetTimestampMs(); - int64_t startTs = 0; - bool hasFillhistoryTask = false; - STaskId hId = {0}; - - stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId); - - streamMetaRLock(pMeta); - - STaskId id = {.streamId = streamId, .taskId = taskId}; - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - - if (ppTask != NULL) { - startTs = (*ppTask)->taskCheckInfo.startTs; - hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask); - hId = (*ppTask)->hTaskInfo.id; - - streamMetaRUnLock(pMeta); - - // add the failed task info, along with the related fill-history task info into tasks list. - code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); - if (hasFillhistoryTask) { - code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); - } - } else { - streamMetaRUnLock(pMeta); - - stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - streamId, taskId, pMeta->vgId); - code = TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - return code; -} - -void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { - int32_t startTs = pTask->execInfo.checkTs; - int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); - if (code) { - stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code)); - } - - // automatically set the related fill-history task to be failed. - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pId = &pTask->hTaskInfo.id; - code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); - if (code) { - stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code)); - } - } -} - void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs) { const char* id = pTask->id.idStr; diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 98c6534b46..0858f57414 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -30,8 +30,8 @@ typedef struct STaskInitTs { } STaskInitTs; static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now); -static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal); -static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ); +static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal); +static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ); // restore the checkpoint id by negotiating the latest consensus checkpoint id int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { @@ -505,3 +505,57 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) { stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts); } + +int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t now = taosGetTimestampMs(); + int64_t startTs = 0; + bool hasFillhistoryTask = false; + STaskId hId = {0}; + + stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId); + + streamMetaRLock(pMeta); + + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + + if (ppTask != NULL) { + startTs = (*ppTask)->taskCheckInfo.startTs; + hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask); + hId = (*ppTask)->hTaskInfo.id; + + streamMetaRUnLock(pMeta); + + // add the failed task info, along with the related fill-history task info into tasks list. + code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false); + if (hasFillhistoryTask) { + code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); + } + } else { + streamMetaRUnLock(pMeta); + + stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", + streamId, taskId, pMeta->vgId); + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + return code; +} + +void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) { + int32_t startTs = pTask->execInfo.checkTs; + int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false); + if (code) { + stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code)); + } + + // automatically set the related fill-history task to be failed. + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + STaskId* pId = &pTask->hTaskInfo.id; + code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false); + if (code) { + stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code)); + } + } +} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9a324084ff..365710f8a7 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -801,8 +801,8 @@ bool streamTaskSetSchedStatusWait(SStreamTask* pTask) { pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING; ret = true; } - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pTask->lock); return ret; } diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index b79ca32ff3..cef2ba35e7 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -35,3 +35,54 @@ void streamMutexDestroy(TdThreadMutex *pMutex) { stError("%p mutex destroy, code:%s", pMutex, tstrerror(code)); } } + +void streamMetaRLock(SStreamMeta* pMeta) { + // stTrace("vgId:%d meta-rlock", pMeta->vgId); + int32_t code = taosThreadRwlockRdlock(&pMeta->lock); + if (code) { + stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); + } +} + +void streamMetaRUnLock(SStreamMeta* pMeta) { + // stTrace("vgId:%d meta-runlock", pMeta->vgId); + int32_t code = taosThreadRwlockUnlock(&pMeta->lock); + if (code != TSDB_CODE_SUCCESS) { + stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code)); + } else { + // stTrace("vgId:%d meta-runlock completed", pMeta->vgId); + } +} + +void streamMetaWLock(SStreamMeta* pMeta) { + // stTrace("vgId:%d meta-wlock", pMeta->vgId); + int32_t code = taosThreadRwlockWrlock(&pMeta->lock); + if (code) { + stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code)); + } +} + +void streamMetaWUnLock(SStreamMeta* pMeta) { + // stTrace("vgId:%d meta-wunlock", pMeta->vgId); + int32_t code = taosThreadRwlockUnlock(&pMeta->lock); + if (code) { + stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code)); + } +} + +void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino) { + int32_t oldCode = atomic_val_compare_exchange_32(&pMeta->fatalInfo.code, 0, code); + if (oldCode == 0) { + pMeta->fatalInfo.ts = taosGetTimestampMs(); + pMeta->fatalInfo.threadId = taosGetSelfPthreadId(); + tstrncpy(pMeta->fatalInfo.func, funcName, tListLen(pMeta->fatalInfo.func)); + pMeta->fatalInfo.line = lino; + stInfo("vgId:%d set global fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); + } else { + stFatal("vgId:%d existed global fatal eror:%s, failed to set new fatal error code:%s", pMeta->vgId, code); + } +} + +int32_t streamGetFatalError(const SStreamMeta* pMeta) { + return atomic_load_32((volatile int32_t*) &pMeta->fatalInfo.code); +} From 3332a0b822c41fd593e16444fa95e93d4da3fde5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 15 Sep 2024 01:19:30 +0800 Subject: [PATCH 10/16] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 26 +++++++---- source/dnode/mnode/impl/src/mndStream.c | 52 +++++++++++----------- source/dnode/vnode/src/tqCommon/tqCommon.c | 40 ++++++++++------- source/libs/stream/src/streamMeta.c | 22 +++++---- source/libs/stream/src/streamUtil.c | 5 ++- 5 files changed, 86 insertions(+), 59 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b77c8535f1..29759bc561 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -494,6 +494,14 @@ typedef struct SScanWalInfo { tmr_h scanTimer; } SScanWalInfo; +typedef struct SFatalErrInfo { + int32_t code; + int64_t ts; + int32_t threadId; + int32_t line; + char func[128]; +} SFatalErrInfo; + // meta typedef struct SStreamMeta { char* path; @@ -523,14 +531,13 @@ typedef struct SStreamMeta { int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfPausedTasks; int64_t rid; - - int64_t chkpId; - int32_t chkpCap; - SArray* chkpSaved; - SArray* chkpInUse; - SRWLatch chkpDirLock; - void* qHandle; // todo remove it - void* bkdChkptMgt; + SFatalErrInfo fatalInfo; // fatal error occurs, stream stop to execute + int64_t chkpId; + int32_t chkpCap; + SArray* chkpSaved; + SArray* chkpInUse; + SRWLatch chkpDirLock; + void* bkdChkptMgt; } SStreamMeta; typedef struct STaskUpdateEntry { @@ -776,6 +783,9 @@ void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); +void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino); +int32_t streamGetFatalError(const SStreamMeta* pMeta); + void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId); int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pTaskList); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e0b8caa938..1fb398d070 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1143,18 +1143,16 @@ int32_t extractStreamNodeList(SMnode *pMnode) { return taosArrayGetSize(execInfo.pNodeList); } -static bool taskNodeIsUpdated(SMnode *pMnode) { - bool allReady = true; - SArray *pNodeSnapshot = NULL; - - // check if the node update happens or not - streamMutexLock(&execInfo.lock); +static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) { + bool allReady = false; + bool nodeUpdated = false; + SVgroupChangeInfo changeInfo = {0}; int32_t numOfNodes = extractStreamNodeList(pMnode); + if (numOfNodes == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); execInfo.ts = taosGetTimestampSec(); - streamMutexUnlock(&execInfo.lock); return false; } @@ -1166,43 +1164,46 @@ static bool taskNodeIsUpdated(SMnode *pMnode) { if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); - streamMutexUnlock(&execInfo.lock); return true; } } - int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot); + int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); } if (!allReady) { mWarn("not all vnodes ready, quit from vnodes status check"); - taosArrayDestroy(pNodeSnapshot); - streamMutexUnlock(&execInfo.lock); return true; } - SVgroupChangeInfo changeInfo = {0}; - code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo); + code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo); if (code) { - streamMutexUnlock(&execInfo.lock); - return false; + nodeUpdated = false; + } else { + nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + if (nodeUpdated) { + mDebug("stream tasks not ready due to node update"); + } } - bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); - mndDestroyVgroupChangeInfo(&changeInfo); - taosArrayDestroy(pNodeSnapshot); - - if (nodeUpdated) { - mDebug("stream tasks not ready due to node update"); - } - - streamMutexUnlock(&execInfo.lock); return nodeUpdated; } +// check if the node update happens or not +static bool taskNodeIsUpdated(SMnode *pMnode) { + SArray *pNodeSnapshot = NULL; + + streamMutexLock(&execInfo.lock); + bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot); + streamMutexUnlock(&execInfo.lock); + + taosArrayDestroy(pNodeSnapshot); + return updated; +} + static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { bool ready = true; if (taskNodeIsUpdated(pMnode)) { @@ -1993,7 +1994,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { mndDestroyVgroupChangeInfo(pInfo); - return terrno; + TSDB_CHECK_NULL(NULL, code, lino, _err, terrno); } int32_t numOfNodes = taosArrayGetSize(pPrevNodeList); @@ -2048,6 +2049,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis return code; _err: + mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino); mndDestroyVgroupChangeInfo(pInfo); return code; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 3f4329f22b..116acd4636 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -160,6 +160,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tDecoderClear(&decoder); + int32_t gError = streamGetFatalError(pMeta); + if (gError != 0) { + tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError), + pMeta->fatalInfo.ts, pMeta->fatalInfo.func); + return 0; + } + // update the nodeEpset when it exists streamMetaWLock(pMeta); @@ -290,8 +297,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); } else { - if (streamMetaCommit(pMeta) < 0) { - // persist to disk + if ((code = streamMetaCommit(pMeta)) < 0) { + // always return true + streamMetaWUnLock(pMeta); + taosArrayDestroy(req.pNodeList); + return TSDB_CODE_SUCCESS; } streamMetaClearSetUpdateTaskListComplete(pMeta); @@ -754,8 +764,9 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored } streamMetaWUnLock(pMeta); + // always return success when handling the requirement issued by mnode during transaction. - return code; + return TSDB_CODE_SUCCESS; } static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { @@ -1197,10 +1208,6 @@ int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { ret int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } -int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) { - return doProcessDummyRspMsg(pMeta, pMsg); -} - int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont; @@ -1221,14 +1228,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; int32_t code = 0; SStreamTask* pTask = NULL; - SRestoreCheckpointInfo req = {0}; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int64_t now = taosGetTimestampMs(); + SDecoder decoder; + SRestoreCheckpointInfo req = {0}; - SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) { tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code)); tDecoderClear(&decoder); @@ -1239,16 +1245,15 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask == NULL || (code != 0)) { - tqError( - "vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", - pMeta->vgId, req.taskId); + tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already", + pMeta->vgId, req.taskId); // ignore this code to avoid error code over write int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); if (ret) { tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret)); } - return code; + return 0; } // discard the rsp, since it is expired. @@ -1272,7 +1277,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_STREAM_INTERNAL_ERROR; + return 0; } SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo; @@ -1299,10 +1304,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { if (pMeta->role == NODE_ROLE_LEADER) { code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId); + if (code) { + tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code)); + } } else { tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr); } streamMetaReleaseTask(pMeta, pTask); - return code; + return 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d8249666c3..f4202667ff 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -923,32 +923,38 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { streamMetaWLock(pMeta); int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + if (code) { + streamSetFatalError(pMeta, code, __func__, __LINE__); + } streamMetaWUnLock(pMeta); return code; } int32_t streamMetaCommit(SStreamMeta* pMeta) { - int32_t code = 0; - code = tdbCommit(pMeta->db, pMeta->txn); + int32_t code = tdbCommit(pMeta->db, pMeta->txn); if (code != 0) { - stError("vgId:%d failed to commit stream meta", pMeta->vgId); - return code; + streamSetFatalError(pMeta, code, __func__, __LINE__); + stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code), + pMeta->fatalInfo.line); } code = tdbPostCommit(pMeta->db, pMeta->txn); if (code != 0) { - stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId); + streamSetFatalError(pMeta, code, __func__, __LINE__); + stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code), + pMeta->fatalInfo.line); return code; } code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); if (code != 0) { - stError("vgId:%d failed to begin trans", pMeta->vgId); - return code; + streamSetFatalError(pMeta, code, __func__, __LINE__); + stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line); + } else { + stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); } - stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); return code; } diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index cef2ba35e7..5bf9370cb7 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -77,9 +77,10 @@ void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, pMeta->fatalInfo.threadId = taosGetSelfPthreadId(); tstrncpy(pMeta->fatalInfo.func, funcName, tListLen(pMeta->fatalInfo.func)); pMeta->fatalInfo.line = lino; - stInfo("vgId:%d set global fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); + stInfo("vgId:%d set fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); } else { - stFatal("vgId:%d existed global fatal eror:%s, failed to set new fatal error code:%s", pMeta->vgId, code); + stFatal("vgId:%d existed fatal error:%s, ts:%" PRId64 " failed to set new fatal error code:%s", pMeta->vgId, + pMeta->fatalInfo.ts, tstrerror(code)); } } From 1d2c00a4fff9fe1d788a09f14bc73bba0f5389f2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 15 Sep 2024 01:25:16 +0800 Subject: [PATCH 11/16] refactor: do some internal refactor. --- source/libs/stream/src/streamUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index 5bf9370cb7..4c481e6041 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -80,7 +80,7 @@ void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, stInfo("vgId:%d set fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino); } else { stFatal("vgId:%d existed fatal error:%s, ts:%" PRId64 " failed to set new fatal error code:%s", pMeta->vgId, - pMeta->fatalInfo.ts, tstrerror(code)); + tstrerror(pMeta->fatalInfo.code), pMeta->fatalInfo.ts, tstrerror(code)); } } From b4277e0e6532023e9b9eb2bc5b1d07f01cbc67f2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 15 Sep 2024 16:15:11 +0800 Subject: [PATCH 12/16] refactor:do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/streamCheckStatus.c | 11 +- source/libs/stream/src/streamCheckpoint.c | 179 +++++++++------- source/libs/stream/src/streamDispatch.c | 214 +++++++++++--------- source/libs/stream/src/streamHb.c | 6 +- source/libs/stream/src/streamSched.c | 34 ++-- source/libs/stream/src/streamStartHistory.c | 21 +- source/libs/stream/src/streamTaskSm.c | 4 +- source/libs/stream/src/streamTimer.c | 18 +- 9 files changed, 274 insertions(+), 215 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 29759bc561..cb10aeb6a0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -801,7 +801,7 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts); // timer int32_t streamTimerGetInstance(tmr_h* pTmr); -void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, +void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId, const char* pMsg); void streamTmrStop(tmr_h tmrId); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 76e74db33f..2688617823 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -303,13 +303,8 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref); - - if (pInfo->checkRspTmr == NULL) { - pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); - } else { - streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, - "check-status-monitor"); - } + streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, + "check-status-monitor"); streamMutexUnlock(&pInfo->checkInfoLock); } @@ -860,7 +855,7 @@ void rspMonitorFn(void* param, void* tmrId) { handleTimeoutDownstreamTasks(pTask, pTimeoutList); } - streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, + streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, "check-status-monitor"); streamMutexUnlock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1a8b5c1028..769116264d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -347,12 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); streamMetaAcquireOneTask(pTask); - - if (pTmrInfo->tmrHandle == NULL) { - pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer); - } else { - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); - } + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "trigger-recv-monitor"); pTmrInfo->launchChkptId = pActiveInfo->activeId; } else { // already launched, do nothing stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr); @@ -893,48 +889,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } -void checkpointTriggerMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - int64_t now = taosGetTimestampMs(); - const char* id = pTask->id.idStr; - +static int32_t doChkptStatusCheck(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; - - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - // check the status every 100ms - if (streamTaskShouldStop(pTask)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - if (++pTmrInfo->activeCounter < 50) { - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); - return; - } - - pTmrInfo->activeCounter = 0; - stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); - - streamMutexLock(&pTask->lock); - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state != TASK_STATUS__CK) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - - streamMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; // checkpoint-trigger recv flag is set, quit if (pActiveInfo->allUpstreamTriggerRecv) { @@ -942,48 +901,44 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - streamMutexUnlock(&pTask->lock); - - streamMutexLock(&pActiveInfo->lock); - - // send msg to retrieve checkpoint trigger msg - SArray* pList = pTask->upstreamInfo.pList; - SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); - if (pNotSendList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); - streamMutexUnlock(&pActiveInfo->lock); - - stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); - return; +// streamMutexUnlock(&pTask->lock); +// streamMetaReleaseTask(pTask->pMeta, pTask); + return -1; } if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", id, vgId, pTmrInfo->launchChkptId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; +// streamMutexUnlock(&pActiveInfo->lock); +// streamMetaReleaseTask(pTask->pMeta, pTask); + return -1; } // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; +// streamMutexUnlock(&pActiveInfo->lock); +// streamMetaReleaseTask(pTask->pMeta, pTask); + return -1; + } + + return 0; +} + +static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) { + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); + if (pNotSendList == NULL) { + stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); + return terrno; } for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { @@ -1007,13 +962,87 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { void* px = taosArrayPush(pNotSendList, pInfo); if (px == NULL) { stError("s-task:%s failed to record not send info, code: out of memory", id); + taosArrayDestroy(pNotSendList); + return terrno; } } } + *ppNotSendList = pNotSendList; + return 0; +} + +void checkpointTriggerMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + int64_t now = taosGetTimestampMs(); + const char* id = pTask->id.idStr; + + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; + + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // check the status every 100ms + if (streamTaskShouldStop(pTask)) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (++pTmrInfo->activeCounter < 50) { + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + return; + } + + pTmrInfo->activeCounter = 0; + stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); + + streamMutexLock(&pTask->lock); + SStreamTaskState state = streamTaskGetStatus(pTask); + streamMutexUnlock(&pTask->lock); + + if (state.state != TASK_STATUS__CK) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, + vgId, state.name, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + streamMutexLock(&pActiveInfo->lock); + + int32_t code = doChkptStatusCheck(pTask); + if (code) { + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // send msg to retrieve checkpoint trigger msg + SArray* pList = pTask->upstreamInfo.pList; + SArray* pNotSendList = NULL; + + code = doFindNotSendUpstream(pTask, pList, &pNotSendList); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref); + streamMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + + taosArrayDestroy(pNotSendList); + return; + } + // do send retrieve checkpoint trigger msg to upstream int32_t size = taosArrayGetSize(pNotSendList); - int32_t code = doSendRetrieveTriggerMsg(pTask, pNotSendList); + code = doSendRetrieveTriggerMsg(pTask, pNotSendList); if (code) { stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); } @@ -1023,7 +1052,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // check every 100ms if (size > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5f7b8dd39e..0f0015c7d9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -634,12 +634,8 @@ static void doMonitorDispatchData(void* param, void* tmrId) { void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) { int32_t vgId = pTask->pMeta->vgId; - if (pTask->msgInfo.pRetryTmr != NULL) { - streamTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId, - "dispatch-monitor-tmr"); - } else { - pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer); - } + streamTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId, + "dispatch-monitor"); } int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId, @@ -888,77 +884,42 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 return TSDB_CODE_SUCCESS; } -static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - const char* id = pTask->id.idStr; +static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; - // check the status every 100ms - if (streamTaskShouldStop(pTask)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - if (++pTmrInfo->activeCounter < 50) { - streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, - "chkpt-ready-monitor"); - return; - } - - pTmrInfo->activeCounter = 0; - stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id); - - streamMutexLock(&pTask->lock); - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state != TASK_STATUS__CK) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, - pState.name, ref); - streamMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - streamMutexUnlock(&pTask->lock); - - streamMutexLock(&pActiveInfo->lock); - - SArray* pList = pActiveInfo->pReadyMsgList; - int32_t num = taosArrayGetSize(pList); if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", id, vgId, pTmrInfo->launchChkptId, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; + return -1; } // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; + return -1; } - SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); - if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id, vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref); + return -1; + } - streamMetaReleaseTask(pTask->pMeta, pTask); - return; + return 0; +} + +static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level, + const char* id) { + SArray* pTmp = taosArrayInit(4, sizeof(int32_t)); + if (pTmp == NULL) { + return terrno; } for (int32_t i = 0; i < num; ++i) { @@ -971,63 +932,138 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { continue; } - void* p = taosArrayPush(pNotRspList, &pInfo->upstreamTaskId); + void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId); if (p == NULL) { stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); + return terrno; } else { stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, - pTask->info.taskLevel, pInfo->upstreamTaskId); + level, pInfo->upstreamTaskId); } } - int32_t checkpointId = pActiveInfo->activeId; + *ppNotRspList = pTmp; + return 0; +} - int32_t notRsp = taosArrayGetSize(pNotRspList); - if (notRsp > 0) { // send checkpoint-ready msg again - for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) { - int32_t* pTaskId = taosArrayGet(pNotRspList, i); - if (pTaskId == NULL) { +static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t checkpointId, SArray* pReadyList) { + int32_t code = 0; + int32_t num = taosArrayGetSize(pReadyList); + const char* id = pTask->id.idStr; + + for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) { + int32_t* pTaskId = taosArrayGet(pNotRspList, i); + if (pTaskId == NULL) { + continue; + } + + for (int32_t j = 0; j < num; ++j) { + STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pReadyList, j); + if (pReadyInfo == NULL) { continue; } - for (int32_t j = 0; j < num; ++j) { - STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j); - if (pReadyInfo == NULL) { - continue; - } + if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again - if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again - - SRpcMsg msg = {0}; - int32_t code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, - pReadyInfo->childId, checkpointId, &msg); + SRpcMsg msg = {0}; + code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, + pReadyInfo->childId, checkpointId, &msg); + if (code == TSDB_CODE_SUCCESS) { + code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg); if (code == TSDB_CODE_SUCCESS) { - code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg); - if (code == TSDB_CODE_SUCCESS) { - stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel, - pReadyInfo->upstreamTaskId); - } else { - stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); - } + stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel, + pReadyInfo->upstreamTaskId); } else { - stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); + stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); } + } else { + stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); } } } + } +} - streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, +static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + SArray* pNotRspList = NULL; + + // check the status every 100ms + if (streamTaskShouldStop(pTask)) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (++pTmrInfo->activeCounter < 50) { + streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "chkpt-ready-monitor"); + return; + } + + // reset tmr + pTmrInfo->activeCounter = 0; + stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id); + + streamMutexLock(&pTask->lock); + SStreamTaskState state = streamTaskGetStatus(pTask); + streamMutexUnlock(&pTask->lock); + + // 1. check status in the first place + if (state.state != TASK_STATUS__CK) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready, ref:%d", id, vgId, + state.name, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + streamMutexLock(&pActiveInfo->lock); + + SArray* pList = pActiveInfo->pReadyMsgList; + int32_t num = taosArrayGetSize(pList); + int32_t code = doTaskChkptStatusCheck(pTask, num); + if (code) { + streamMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id, + tstrerror(code), ref); + streamMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + + taosArrayDestroy(pNotRspList); + return; + } + + int32_t checkpointId = pActiveInfo->activeId; + int32_t notRsp = taosArrayGetSize(pNotRspList); + doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); + + if (notRsp > 0) { // send checkpoint-ready msg again + streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); streamMutexUnlock(&pActiveInfo->lock); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug( - "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg " - "and quit from timer, ref:%d", + "s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit " + "from timer, ref:%d", id, vgId, ref); streamClearChkptReadyMsg(pActiveInfo); streamMutexUnlock(&pActiveInfo->lock); + // release should be the last execution, since pTask may be destroy after it immidiately. streamMetaReleaseTask(pTask->pMeta, pTask); } @@ -1085,12 +1121,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); streamMetaAcquireOneTask(pTask); - if (pTmrInfo->tmrHandle == NULL) { - pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer); - } else { - streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, - "chkpt-ready-monitor"); - } + streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "chkpt-ready-monitor"); // mark the timer monitor checkpointId pTmrInfo->launchChkptId = pActiveInfo->activeId; diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 703e6a3256..72d2e89936 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -279,7 +279,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } if (!waitForEnoughDuration(pMeta->pHbInfo)) { - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId, + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId, "meta-hb-tmr"); code = taosReleaseRef(streamMetaId, rid); @@ -301,7 +301,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } streamMetaRUnLock(pMeta); - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, "meta-hb-tmr"); code = taosReleaseRef(streamMetaId, rid); @@ -317,7 +317,7 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) { return terrno; } - pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer, &pInfo->hbTmr, 0, "stream-hb"); pInfo->tickCounter = 0; pInfo->msgSendTs = -1; pInfo->hbCount = 0; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 2d54547aa2..63e24b0975 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -20,13 +20,14 @@ static void streamTaskResumeHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { + int64_t delaySchema = pTask->info.delaySchedParam; + if (delaySchema != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, pTask->info.delaySchedParam); - pTask->schedInfo.pDelayTimer = - taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer); + streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, + pTask->pMeta->vgId, "sched-tmr"); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } } @@ -76,13 +77,8 @@ void streamTaskResumeInFuture(SStreamTask* pTask) { // add one ref count for task streamMetaAcquireOneTask(pTask); - - if (pTask->schedInfo.pIdleTimer == NULL) { - pTask->schedInfo.pIdleTimer = taosTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer); - } else { - streamTmrReset(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, - &pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr"); - } + streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer, + pTask->pMeta->vgId, "resume-task-tmr"); } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -125,7 +121,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { - stDebug("s-task:%s jump out of schedTimer", id); + stDebug("s-task:%s should stop, jump out of schedTimer", id); return; } @@ -139,9 +135,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { if (code) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); terrno = code; - return; + goto _end; } pTrigger->type = STREAM_INPUT__GET_RES; @@ -149,10 +144,9 @@ void streamTaskSchedHelper(void* param, void* tmrId) { if (pTrigger->pBlock == NULL) { taosFreeQitem(pTrigger); - stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", + stError("s-task:%s failed to build retrieve data trigger, code:out of memory, try again in %dms", id, nextTrigger); - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); - return; + goto _end; } atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); @@ -160,8 +154,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); - return; + stError("s-task:%s failed to put retrieve block into trigger, code:%s", pTask->id.idStr, tstrerror(code)); + goto _end; } code = streamTrySchedExec(pTask); @@ -171,5 +165,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { } } - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); +_end: + streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, + "sched-run-tmr"); } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 21f0168434..1290aef6a3 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -80,6 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { } void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { + int32_t vgId = pTask->pMeta->vgId; int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; if (numOfTicks <= 0) { numOfTicks = 1; @@ -100,14 +101,8 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref); - - if (pTask->schedHistoryInfo.pTimer == NULL) { - pTask->schedHistoryInfo.pTimer = - taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); - } else { - streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, - &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); - } + streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, + &pTask->schedHistoryInfo.pTimer, vgId, "history-task"); } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { @@ -337,7 +332,7 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); - streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, + streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } } @@ -391,7 +386,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { pHTaskInfo->tickCount -= 1; if (pHTaskInfo->tickCount > 0) { - streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, + streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); streamMetaReleaseTask(pMeta, pTask); return; @@ -519,7 +514,7 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { } stDebug("s-task:%s set timer active flag, task timer not null", idStr); - streamTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer, + streamTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } @@ -621,8 +616,8 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) { // release the task. streamMetaReleaseTask(pTask->pMeta, pTask); } else { - streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, - &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); + streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, + &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 06286479a3..3709c4dfbd 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -491,8 +491,8 @@ static void keepPrevInfo(SStreamTaskSM* pSM) { int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) { SStreamTask* pTask = pSM->pTask; - const char* id = pTask->id.idStr; - int32_t code = 0; + const char* id = pTask->id.idStr; + int32_t code = 0; // do update the task status streamMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 5e12f51c9d..8b77fe7cb1 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -40,11 +40,23 @@ int32_t streamTimerGetInstance(tmr_h* pTmr) { return TSDB_CODE_SUCCESS; } -void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, +void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId, const char* pMsg) { - bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId); - if (ret) { + if (*pTmrId == NULL) { + *pTmrId = taosTmrStart(fp, mseconds, pParam, pHandle); + if (*pTmrId == NULL) { + stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno)); + return; + } + } else { + bool ret = taosTmrReset(fp, mseconds, pParam, pHandle, pTmrId); + if (ret) { + stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno)); + return; + } } + + stDebug("vgId:%d start %s tmr succ", vgId, pMsg); } void streamTmrStop(tmr_h tmrId) { From f0289293f327c97896c2ccf341081d2e6138781c Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Mon, 16 Sep 2024 10:16:42 +0800 Subject: [PATCH 13/16] mod python install doc --- docs/zh/08-develop/01-connect/index.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/zh/08-develop/01-connect/index.md b/docs/zh/08-develop/01-connect/index.md index 5cecd245e5..b16e96922f 100644 --- a/docs/zh/08-develop/01-connect/index.md +++ b/docs/zh/08-develop/01-connect/index.md @@ -132,7 +132,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速 pip3 install taospy[ws] ``` - - **安装验证** +- **安装验证** 对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入: @@ -198,18 +198,18 @@ taos = { version = "*", default-features = false, features = ["ws"] } - **安装** - 使用 npm 安装 Node.js 连接器 - ``` - npm install @tdengine/websocket - ``` + ``` + npm install @tdengine/websocket + ``` :::note Node.js 目前只支持 Websocket 连接 - **安装验证** - 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [nodejsChecker.js 源代码](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/nodejsChecker.js)到本地。 - 在命令行中执行以下命令。 - ```bash - npm init -y - npm install @tdengine/websocket - node nodejsChecker.js - ``` + ```bash + npm init -y + npm install @tdengine/websocket + node nodejsChecker.js + ``` - 执行以上步骤后,在命令行会输出 nodeChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。 From 98442b4383b791a2347b5009bb82e7d733425596 Mon Sep 17 00:00:00 2001 From: huskar-t <1172915550@qq.com> Date: Mon, 16 Sep 2024 11:42:52 +0800 Subject: [PATCH 14/16] docs: update http status code --- .../14-reference/05-connector/60-rest-api.mdx | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/docs/zh/14-reference/05-connector/60-rest-api.mdx b/docs/zh/14-reference/05-connector/60-rest-api.mdx index a804229d27..eae9fba1c4 100644 --- a/docs/zh/14-reference/05-connector/60-rest-api.mdx +++ b/docs/zh/14-reference/05-connector/60-rest-api.mdx @@ -123,8 +123,8 @@ curl -L -u username:password -d "" :/rest/sql/[db_name][?tz=timez | **说明** | **httpCodeServerError false** | **httpCodeServerError true** | |--------------------|-------------------------------|---------------------------------------| | taos_errno() 返回 0 | 200 | 200 | -| taos_errno() 返回 非0 | 200(除鉴权错误) | 500 (除鉴权错误和 400/502 错误) | -| 参数错误 | 400 (仅处理 HTTP 请求 URL 参数错误) | 400 (处理 HTTP 请求 URL 参数错误和 taosd 返回错误) | +| taos_errno() 返回 非0 | 200(除鉴权错误) | 500 (除鉴权错误和 400/502/503 错误) | +| 参数错误 | 400(仅处理 HTTP 请求 URL 参数错误) | 400 (处理 HTTP 请求 URL 参数错误和 taosd 返回错误) | | 鉴权错误 | 401 | 401 | | 接口不存在 | 404 | 404 | | 集群不可用错误 | 502 | 502 | @@ -132,27 +132,29 @@ curl -L -u username:password -d "" :/rest/sql/[db_name][?tz=timez 返回 400 的 C 错误码为: -- TSDB_CODE_TSC_SQL_SYNTAX_ERROR ( 0x0216) -- TSDB_CODE_TSC_LINE_SYNTAX_ERROR (0x021B) +- TSDB_CODE_TSC_SQL_SYNTAX_ERROR (0x0216) +- TSDB_CODE_TSC_LINE_SYNTAX_ERROR (0x021B) - TSDB_CODE_PAR_SYNTAX_ERROR (0x2600) - TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE (0x060B) -- TSDB_CODE_TSC_VALUE_OUT_OF_RANGE (0x0224) +- TSDB_CODE_TSC_VALUE_OUT_OF_RANGE (0x0224) - TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE (0x263B) 返回 401 的错误码为: -- TSDB_CODE_MND_USER_ALREADY_EXIST (0x0350) -- TSDB_CODE_MND_USER_NOT_EXIST ( 0x0351) -- TSDB_CODE_MND_INVALID_USER_FORMAT (0x0352) -- TSDB_CODE_MND_INVALID_PASS_FORMAT (0x0353) +- TSDB_CODE_MND_USER_ALREADY_EXIST (0x0350) +- TSDB_CODE_MND_USER_NOT_EXIST (0x0351) +- TSDB_CODE_MND_INVALID_USER_FORMAT (0x0352) +- TSDB_CODE_MND_INVALID_PASS_FORMAT (0x0353) - TSDB_CODE_MND_NO_USER_FROM_CONN (0x0354) - TSDB_CODE_MND_TOO_MANY_USERS (0x0355) - TSDB_CODE_MND_INVALID_ALTER_OPER (0x0356) - TSDB_CODE_MND_AUTH_FAILURE (0x0357) -返回 403 的错误码为: +返回 502 的错误码为: -- TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED (0x0020) +- TSDB_CODE_RPC_NETWORK_UNAVAIL (0x000B) + +错误码和错误描述请参考[错误码](../../../reference/error-code) ### HTTP body 结构 @@ -333,6 +335,8 @@ curl --location 'http://:/rest/sql' \ - code:(`int`)错误码。 - desc:(`string`)错误描述。 +错误码和错误描述请参考[错误码](../../../reference/error-code) + #### 返回key-value形式数据 当指定 url 参数 `row_with_meta=true` 时,返回的 data 中的数据会从数组的形式变成对象的形式,对象的 key 为列名,value 为数据,如下所示: From 8ca87f18e3a148d34ffaf093905913bbb89b35ff Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Mon, 16 Sep 2024 12:31:39 +0800 Subject: [PATCH 15/16] add c error code ref --- docs/zh/14-reference/05-connector/60-rest-api.mdx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/zh/14-reference/05-connector/60-rest-api.mdx b/docs/zh/14-reference/05-connector/60-rest-api.mdx index a804229d27..e1211bc252 100644 --- a/docs/zh/14-reference/05-connector/60-rest-api.mdx +++ b/docs/zh/14-reference/05-connector/60-rest-api.mdx @@ -118,7 +118,8 @@ curl -L -u username:password -d "" :/rest/sql/[db_name][?tz=timez ### HTTP 响应码 -从 `TDengine 3.0.3.0` 开始 `taosAdapter` 提供配置参数 `httpCodeServerError` 用来设置当 C 接口返回错误时是否返回非 200 的http状态码 +从 `TDengine 3.0.3.0` 开始 `taosAdapter` 提供配置参数 `httpCodeServerError` 用来设置当 C 接口返回错误时是否返回非 200 的http状态码。 +无论是否设置此参数,响应 body 里都有详细的错误码和错误信息,具体请参考 [错误](../rest-api/#错误) 。 | **说明** | **httpCodeServerError false** | **httpCodeServerError true** | |--------------------|-------------------------------|---------------------------------------| From e87bd926dab8e04c0381c9cc400b7f9c95a55e6d Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Wed, 18 Sep 2024 09:18:30 +0800 Subject: [PATCH 16/16] fix: Memory detection in macro definition --- include/os/osSystem.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/include/os/osSystem.h b/include/os/osSystem.h index 01fad7ad97..44910ba94d 100644 --- a/include/os/osSystem.h +++ b/include/os/osSystem.h @@ -81,8 +81,11 @@ int32_t taosResetTerminalMode(); unw_get_reg(&cursor, UNW_REG_IP, &pc); \ fname[0] = '\0'; \ (void)unw_get_proc_name(&cursor, fname, sizeof(fname), &offset); \ - size += 1; \ array[size] = (char *)taosMemoryMalloc(sizeof(char) * STACKSIZE + 1); \ + if(NULL == array[size]) { \ + break; \ + } \ + size += 1; \ snprintf(array[size], STACKSIZE, "0x%lx : (%s+0x%lx) [0x%lx]\n", (long)pc, fname, (long)offset, (long)pc); \ } \ if (ignoreNum < size && size > 0) { \