diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 904c8b1651..a4f765c9fb 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -4,7 +4,6 @@ import jenkins.model.CauseOfInterruption docs_only=0 node { } - def abortPreviousBuilds() { def currentJobName = env.JOB_NAME def currentBuildNumber = env.BUILD_NUMBER.toInteger() @@ -71,6 +70,7 @@ def check_docs() { } else { echo file_changed } + env.FILE_CHANGED = file_changed } } def pre_test(){ @@ -137,7 +137,7 @@ def pre_test(){ ''' } else { sh ''' - echo "unmatched reposiotry ${CHANGE_URL}" + echo "unmatched repository ${CHANGE_URL}" ''' } sh ''' @@ -247,7 +247,7 @@ def pre_test_win(){ ''' } else { bat ''' - echo "unmatched reposiotry %CHANGE_URL%" + echo "unmatched repository %CHANGE_URL%" ''' } } @@ -309,7 +309,7 @@ def pre_test_build_win() { python -m pip install taospy==2.7.13 python -m pip uninstall taos-ws-py -y python -m pip install taos-ws-py==0.3.1 - xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32 + xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32 ''' return 1 } @@ -350,7 +350,6 @@ pipeline { when { allOf { not { expression { env.CHANGE_BRANCH =~ /docs\// }} - not { expression { env.CHANGE_URL =~ /\/TDinternal\// }} } } parallel { @@ -419,6 +418,10 @@ pipeline { timeout(time: 200, unit: 'MINUTES'){ pre_test() script { + sh ''' + mkdir -p ${WKDIR}/tmp/${BRANCH_NAME}_${BUILD_ID} + echo "''' + env.FILE_CHANGED + '''" > ${WKDIR}/tmp/${BRANCH_NAME}_${BUILD_ID}/docs_changed.txt + ''' sh ''' date rm -rf ${WKC}/debug @@ -450,6 +453,10 @@ pipeline { } } } + sh ''' + cd ${WKC}/tests/parallel_test + ./run_scan_container.sh -d ${WKDIR} -b ${BRANCH_NAME}_${BUILD_ID} -f ${WKDIR}/tmp/${BRANCH_NAME}_${BUILD_ID}/docs_changed.txt ''' + extra_param + ''' + ''' sh ''' cd ${WKC}/tests/parallel_test export DEFAULT_RETRY_TIME=2 diff --git a/cmake/cmake.define b/cmake/cmake.define index f1a5cef67e..eb78b54cae 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -16,6 +16,12 @@ if (NOT DEFINED TD_GRANT) SET(TD_GRANT FALSE) endif() +IF (NOT DEFINED BUILD_WITH_RAND_ERR) + SET(BUILD_WITH_RAND_ERR FALSE) +ELSE () + SET(BUILD_WITH_RAND_ERR TRUE) +endif() + IF ("${WEBSOCKET}" MATCHES "true") SET(TD_WEBSOCKET TRUE) MESSAGE("Enable websocket") diff --git a/cmake/cmake.options b/cmake/cmake.options index fc17ddecf6..2158157780 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -146,6 +146,13 @@ option( ENDIF () +IF(NOT TD_ENTERPRISE) +MESSAGE("switch s3 off with community version") +set(BUILD_S3 OFF) +set(BUILD_WITH_S3 OFF) +set(BUILD_WITH_COS OFF) +ENDIF () + IF(${BUILD_S3}) IF(${BUILD_WITH_S3}) diff --git a/docs/en/12-taos-sql/12-distinguished.md b/docs/en/12-taos-sql/12-distinguished.md index bfc9ca32c0..8eecb706c0 100644 --- a/docs/en/12-taos-sql/12-distinguished.md +++ b/docs/en/12-taos-sql/12-distinguished.md @@ -102,6 +102,7 @@ The detailed beaviors of `NULL`, `NULL_F`, `VALUE`, and VALUE_F are described be 1. A huge volume of interpolation output may be returned using `FILL`, so it's recommended to specify the time range when using `FILL`. The maximum number of interpolation values that can be returned in a single query is 10,000,000. 2. The result set is in ascending order of timestamp when you aggregate by time window. 3. If aggregate by window is used on STable, the aggregate function is performed on all the rows matching the filter conditions. If `PARTITION BY` is not used in the query, the result set will be returned in strict ascending order of timestamp; otherwise the result set will be returned in the order of ascending timestamp in each group. +4. The output windows of Fill are related with time range of WHERE Clause. For asc fill, the first output window is the first window that conains the start time of WHERE clause. The last output window is the last window that contains the end time of WHERE clause. ::: diff --git a/docs/zh/12-taos-sql/12-distinguished.md b/docs/zh/12-taos-sql/12-distinguished.md index 0eaeb0dfa7..50bf36d2e1 100755 --- a/docs/zh/12-taos-sql/12-distinguished.md +++ b/docs/zh/12-taos-sql/12-distinguished.md @@ -97,6 +97,7 @@ NULL, NULL_F, VALUE, VALUE_F 这几种填充模式针对不同场景区别如下 1. 使用 FILL 语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过 1 千万条具有插值的结果。 2. 在时间维度聚合中,返回的结果中时间序列严格单调递增。 3. 如果查询对象是超级表,则聚合函数会作用于该超级表下满足值过滤条件的所有表的数据。如果查询中没有使用 PARTITION BY 语句,则返回的结果按照时间序列严格单调递增;如果查询中使用了 PARTITION BY 语句分组,则返回结果中每个 PARTITION 内按照时间序列严格单调递增。 +4. Fill输出的起始和结束窗口与WHERE条件的时间范围有关, 如增序Fill时, 第一个窗口是包含WHERE条件开始时间的第一个窗口, 最后一个窗口是包含WHERE条件结束时间的最后一个窗口。 ::: diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0d1e62cdbe..90cb06ff42 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -641,12 +641,13 @@ bool streamTaskShouldPause(const SStreamTask* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus); -int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId); -SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask); -const char* streamTaskGetStatusStr(ETaskStatus status); -void streamTaskResetStatus(SStreamTask* pTask); -void streamTaskSetStatusReady(SStreamTask* pTask); -ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); +int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId); +SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask); +const char* streamTaskGetStatusStr(ETaskStatus status); +void streamTaskResetStatus(SStreamTask* pTask); +void streamTaskSetStatusReady(SStreamTask* pTask); +ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); +const char* streamTaskGetExecType(int32_t type); bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); diff --git a/include/os/os.h b/include/os/os.h index ea2a37311a..e5a7587f96 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -125,6 +125,9 @@ extern "C" { #include "taoserror.h" #include "tlog.h" +extern int32_t tsRandErrChance; +extern threadlocal bool tsEnableRandErr; + #ifdef __cplusplus } #endif diff --git a/include/os/osDef.h b/include/os/osDef.h index 75c6a0dc73..439f4b5c6a 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -188,9 +188,10 @@ void syslog(int unused, const char *format, ...); #define ALIGN8(n) ALIGN_NUM(n, 8) #undef threadlocal -#ifdef _ISOC11_SOURCE -#define threadlocal _Thread_local -#elif defined(__APPLE__) +//#ifdef _ISOC11_SOURCE +//#define threadlocal _Thread_local +//#elif defined(__APPLE__) +#if defined(__APPLE__) #define threadlocal __thread #elif defined(__GNUC__) && !defined(threadlocal) #define threadlocal __thread diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index f21a6a4b32..4f6d25b5e9 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -96,8 +96,8 @@ extern threadlocal void* threadPoolHandle; extern threadlocal void* threadPoolSession; -#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; } while (0) -#define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL) +#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; tsEnableRandErr = true;} while (0) +#define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL, tsEnableRandErr = false) #define taosSaveDisableMemoryPoolUsage(_handle) do { (_handle) = threadPoolHandle; threadPoolHandle = NULL; } while (0) #define taosRestoreEnableMemoryPoolUsage(_handle) (threadPoolHandle = (_handle)) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 928afdaecf..30424adecd 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -373,6 +373,7 @@ int taos_options_imp(TSDB_OPTION option, const char* str); int32_t openTransporter(const char* user, const char* auth, int32_t numOfThreads, void **pDnodeConn); void tscStopCrashReport(); +void cleanupAppInfo(); typedef struct AsyncArg { SRpcMsg msg; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 2258a4bc8c..35e6651c41 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -688,7 +688,6 @@ void doDestroyRequest(void *p) { taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->targetTableList); - destroyQueryExecRes(&pRequest->body.resInfo.execRes); if (pRequest->self) { @@ -702,12 +701,7 @@ void doDestroyRequest(void *p) { } taosMemoryFree(pRequest->body.interParam); - if (TSDB_CODE_SUCCESS == nodesSimAcquireAllocator(pRequest->allocatorRefId)) { - qDestroyQuery(pRequest->pQuery); - if (TSDB_CODE_SUCCESS != nodesSimReleaseAllocator(pRequest->allocatorRefId)) { - tscError("failed to release allocator"); - } - } + qDestroyQuery(pRequest->pQuery); nodesDestroyAllocator(pRequest->allocatorRefId); taosMemoryFreeClear(pRequest->effectiveUser); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 75c1eabe7e..4aa78caa15 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -68,6 +68,10 @@ bool chkRequestKilled(void* param) { return killed; } +void cleanupAppInfo() { + taosHashCleanup(appInfo.pInstMap); +} + static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 0a5fb1a7b4..12702a93f3 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -67,7 +67,6 @@ void taos_cleanup(void) { fmFuncMgtDestroy(); qCleanupKeywordsTable(); - nodesDestroyAllocatorSet(); if (TSDB_CODE_SUCCESS != cleanupTaskQueue()) { tscWarn("failed to cleanup task queue"); @@ -85,6 +84,8 @@ void taos_cleanup(void) { tscWarn("failed to close clientReqRefPool"); } + nodesDestroyAllocatorSet(); + cleanupAppInfo(); rpcCleanup(); tscDebug("rpc cleanup"); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a3576504e2..a808385c48 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -329,6 +329,7 @@ int32_t tsMaxTsmaNum = 3; int32_t tsMaxTsmaCalcDelay = 600; int64_t tsmaDataDeleteMark = 1000 * 60 * 60 * 24; // in ms, default to 1d + #define TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, pName) \ if ((pItem = cfgGetItem(pCfg, pName)) == NULL) { \ TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND); \ @@ -746,6 +747,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "compactPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "randErrorChance", tsRandErrChance, 0, 10000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); @@ -1226,7 +1228,17 @@ static int32_t taosSetSystemCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "charset"); const char *charset = pItem->str; - (void)taosSetSystemLocale(locale, charset); // ignore this error temporarily + int32_t code = taosSetSystemLocale(locale, charset); + if (TSDB_CODE_SUCCESS != code) { + uInfo("failed to set locale %s, since: %s", locale, tstrerror(code)); + char curLocale[TD_LOCALE_LEN] = {0}; + char curCharset[TD_CHARSET_LEN] = {0}; + taosGetSystemLocale(curLocale, curCharset); + if (0 != strlen(curLocale) && 0 != strlen(curCharset)) { + uInfo("current locale: %s, charset: %s", curLocale, curCharset); + } + } + osSetSystemLocale(locale, charset); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "enableCoreFile"); @@ -1422,6 +1434,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "mqRebalanceInterval"); tsMqRebalanceInterval = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "randErrorChance"); + tsRandErrChance = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "ttlUnit"); tsTtlUnit = pItem->i32; @@ -1692,25 +1707,28 @@ static int32_t cfgInitWrapper(SConfig **pCfg) { } TAOS_RETURN(TSDB_CODE_SUCCESS); } + int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) { if (tsCfg != NULL) TAOS_RETURN(TSDB_CODE_SUCCESS); - TAOS_CHECK_RETURN(cfgInitWrapper(&tsCfg)); + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = -1; + + TAOS_CHECK_GOTO(cfgInitWrapper(&tsCfg), &lino, _exit); if (tsc) { - TAOS_CHECK_RETURN(taosAddClientCfg(tsCfg)); - TAOS_CHECK_RETURN(taosAddClientLogCfg(tsCfg)); + TAOS_CHECK_GOTO(taosAddClientCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosAddClientLogCfg(tsCfg), &lino, _exit); } else { - TAOS_CHECK_RETURN(taosAddClientCfg(tsCfg)); - TAOS_CHECK_RETURN(taosAddServerCfg(tsCfg)); - TAOS_CHECK_RETURN(taosAddClientLogCfg(tsCfg)); - TAOS_CHECK_RETURN(taosAddServerLogCfg(tsCfg)); + TAOS_CHECK_GOTO(taosAddClientCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosAddServerCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosAddClientLogCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosAddServerLogCfg(tsCfg), &lino, _exit); } - TAOS_CHECK_RETURN(taosAddSystemCfg(tsCfg)); + TAOS_CHECK_GOTO(taosAddSystemCfg(tsCfg), &lino, _exit); - int32_t code = TSDB_CODE_SUCCESS; if ((code = taosLoadCfg(tsCfg, envCmd, cfgDir, envFile, apolloUrl)) != 0) { uError("failed to load cfg since %s", tstrerror(code)); cfgCleanup(tsCfg); @@ -1726,31 +1744,38 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile } if (tsc) { - TAOS_CHECK_RETURN(taosSetClientCfg(tsCfg)); + TAOS_CHECK_GOTO(taosSetClientCfg(tsCfg), &lino, _exit); } else { - TAOS_CHECK_RETURN(taosSetClientCfg(tsCfg)); - TAOS_CHECK_RETURN(taosUpdateServerCfg(tsCfg)); - TAOS_CHECK_RETURN(taosSetServerCfg(tsCfg)); - TAOS_CHECK_RETURN(taosSetReleaseCfg(tsCfg)); - TAOS_CHECK_RETURN(taosSetTfsCfg(tsCfg)); - TAOS_CHECK_RETURN(taosSetS3Cfg(tsCfg)); + TAOS_CHECK_GOTO(taosSetClientCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosUpdateServerCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosSetServerCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosSetReleaseCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosSetTfsCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosSetS3Cfg(tsCfg), &lino, _exit); } - TAOS_CHECK_RETURN(taosSetSystemCfg(tsCfg)); - TAOS_CHECK_RETURN(taosSetFileHandlesLimit()); + TAOS_CHECK_GOTO(taosSetSystemCfg(tsCfg), &lino, _exit); + TAOS_CHECK_GOTO(taosSetFileHandlesLimit(), &lino, _exit); SConfigItem *pItem = cfgGetItem(tsCfg, "debugFlag"); if (NULL == pItem) { uError("debugFlag not found in cfg"); TAOS_RETURN(TSDB_CODE_CFG_NOT_FOUND); } - TAOS_CHECK_RETURN(taosSetAllDebugFlag(tsCfg, pItem->i32)); + TAOS_CHECK_GOTO(taosSetAllDebugFlag(tsCfg, pItem->i32), &lino, _exit); cfgDumpCfg(tsCfg, tsc, false); - TAOS_CHECK_RETURN(taosCheckGlobalCfg()); + TAOS_CHECK_GOTO(taosCheckGlobalCfg(), &lino, _exit); - TAOS_RETURN(TSDB_CODE_SUCCESS); +_exit: + if (TSDB_CODE_SUCCESS != code) { + cfgCleanup(tsCfg); + tsCfg = NULL; + uError("failed to init cfg at %d since %s", lino, tstrerror(code)); + } + + TAOS_RETURN(code); } void taosCleanupCfg() { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index c84e016459..3dc4beca57 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -82,7 +82,7 @@ static void doStartScanWal(void* param, void* tmrId) { taosMemoryFree(pParam); if (code) { - tqError("vgId:%d failed sched task to scan wal", vgId); + tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 853b2865bb..5957d08a18 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2001,6 +2001,7 @@ _end: taosMemoryFree(pFuncCtx[i].input.pData); taosMemoryFree(pFuncCtx[i].input.pColumnDataAgg); } + taosMemoryFreeClear(*rowEntryInfoOffset); taosMemoryFreeClear(pFuncCtx); return NULL; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 4b71c5ee3f..c4ef74608a 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -59,6 +59,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i static void destroyFillOperatorInfo(void* param); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static void fillResetPrevForNewGroup(SFillInfo* pFillInfo); +static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order); static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, int32_t order) { @@ -74,7 +75,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp blockDataCleanup(pInfo->pRes); doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag); - revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order); + reviseFillStartAndEndKey(pOperator->info, order); int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey : pInfo->existNewGroupBlock->info.window.skey; @@ -258,7 +259,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) { if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) { - revisedFillStartKey(pInfo, pBlock, order); + reviseFillStartAndEndKey(pInfo, order); } pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block @@ -549,3 +550,31 @@ _error: taosMemoryFreeClear(pOperator); return code; } + +static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) { + int64_t skey, ekey, next; + if (order == TSDB_ORDER_ASC) { + skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval); + taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); + + ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval); + next = ekey; + while (next < pInfo->win.ekey) { + next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit, + pInfo->pFillInfo->interval.precision); + ekey = next > pInfo->win.ekey ? ekey : next; + } + pInfo->win.ekey = ekey; + } else { + assert(order == TSDB_ORDER_DESC); + skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval); + next = skey; + while (next < pInfo->win.skey) { + next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit, + pInfo->pFillInfo->interval.precision); + skey = next > pInfo->win.skey ? skey : next; + } + taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); + pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval); + } +} diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 706cf50270..d88aef8fb7 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -551,6 +551,10 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo pOperator->exprSupp.hasWindowOrGroup = true; SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); + if (pResBlock == NULL) { + code = terrno; + goto _error; + } initBasicInfo(&pInfo->binfo, pResBlock); int32_t numOfScalarExpr = 0; @@ -602,6 +606,7 @@ _error: if (pInfo != NULL) { destroyGroupOperatorInfo(pInfo); } + destroyOperator(pOperator); taosMemoryFreeClear(pOperator); return code; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index f9ef57ec5e..701ed0ddbc 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -479,6 +479,9 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); code = createOperator(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser, dbname, &ops[i]); if (ops[i] == NULL || code != 0) { + for (int32_t j = 0; j < i; ++j) { + destroyOperator(ops[j]); + } taosMemoryFree(ops); return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 33e2241f1e..41de64769d 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5401,7 +5401,7 @@ static int32_t translateFill(STranslateContext* pCxt, SSelectStmt* pSelect, SInt return checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval, false); } -static int32_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit, int64_t* pMonth) { +static int32_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char unit, double* pMonth) { int64_t days = -1; int32_t code = convertTimeFromPrecisionToUnit(val, fromPrecision, 'd', &days); if (TSDB_CODE_SUCCESS != code) { @@ -5416,7 +5416,7 @@ static int32_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni case 'h': case 'd': case 'w': - *pMonth = days / 28; + *pMonth = days / 28.0; return code; case 'n': *pMonth = val; @@ -5499,7 +5499,7 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG); } if (!fixed) { - int64_t offsetMonth = 0, intervalMonth = 0; + double offsetMonth = 0, intervalMonth = 0; int32_t code = getMonthsFromTimeVal(pOffset->datum.i, precision, pOffset->unit, &offsetMonth); if (TSDB_CODE_SUCCESS != code) { return code; @@ -5530,7 +5530,21 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* (pInter->datum.i / pSliding->datum.i > INTERVAL_SLIDING_FACTOR)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL); } - if (pSliding->datum.i > pInter->datum.i) { + if (valInter) { + double slidingMonth = 0, intervalMonth = 0; + int32_t code = getMonthsFromTimeVal(pSliding->datum.i, precision, pSliding->unit, &slidingMonth); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + code = getMonthsFromTimeVal(pInter->datum.i, precision, pInter->unit, &intervalMonth); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + if (slidingMonth > intervalMonth) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG); + } + } + if (!valInter && pSliding->datum.i > pInter->datum.i) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG); } } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index aa2fcb67ed..ada47403ec 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1305,7 +1305,10 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { QW_ERR_JRET(code); } + tsEnableRandErr = true; code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); + tsEnableRandErr = false; + if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index c606857a65..266fac9be1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -355,43 +355,15 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } -/** - * All down stream tasks have successfully completed the check point task. - * Current stream task is allowed to start to do checkpoint things in ASYNC model. - */ -int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, - int32_t downstreamTaskId) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - - const char* id = pTask->id.idStr; - bool received = false; - int32_t total = streamTaskGetNumOfDownstream(pTask); - ASSERT(total > 0); - - // 1. not in checkpoint status now - SStreamTaskState pStat = streamTaskGetStatus(pTask); - if (pStat.state != TASK_STATUS__CK) { - stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - - // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg - if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) { - stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64 - ") from task:0x%x, expired and discard ", - id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId); - return -1; - } - - streamMutexLock(&pInfo->lock); - - // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task +// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task +static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream, + int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId, + const char* id, int32_t* pNotReady, int32_t* pTransId) { + bool received = false; int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); for (int32_t i = 0; i < size; ++i) { STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); if (p == NULL) { - streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -403,27 +375,69 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId if (received) { stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id, - downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total); + downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), + numOfDownstream); } else { STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(), .downstreamTaskId = downstreamTaskId, .checkpointId = pInfo->activeId, .transId = pInfo->transId, - .streamId = pTask->id.streamId, + .streamId = streamId, .downstreamNodeId = downstreamNodeId}; - (void)taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); + void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); + if (p == NULL) { + stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno)); + return terrno; + } } - int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); - int32_t transId = pInfo->transId; + *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); + *pTransId = pInfo->transId; + return 0; +} + +/** + * All down stream tasks have successfully completed the check point task. + * Current stream task is allowed to start to do checkpoint things in ASYNC model. + */ +int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, + int32_t downstreamTaskId) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + const char* id = pTask->id.idStr; + int32_t total = streamTaskGetNumOfDownstream(pTask); + int32_t code = 0; + int32_t notReady = 0; + int32_t transId = 0; + + ASSERT(total > 0 && (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG)); + + // 1. not in checkpoint status now + SStreamTaskState pStat = streamTaskGetStatus(pTask); + if (pStat.state != TASK_STATUS__CK) { + stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg + if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) { + stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64 + ") from task:0x%x, expired and discard", + id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId); + return TSDB_CODE_INVALID_MSG; + } + + streamMutexLock(&pInfo->lock); + code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, ¬Ready, + &transId); streamMutexUnlock(&pInfo->lock); - if (notReady == 0) { + if ((notReady == 0) && (code == 0)) { stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); } - return 0; + return code; } int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) { @@ -1034,8 +1048,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); if (p == NULL) { - streamMutexUnlock(&pInfo->lock); - return num; + continue; } if (p->recved) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 08cf490b94..87293c59ec 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1379,7 +1379,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } } streamMetaReleaseTask(pMeta, pTask); @@ -1464,14 +1467,16 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { } int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t code = 0; - int32_t vgId = pMeta->vgId; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + SStreamTask* pTask = NULL; + bool continueExec = true; + stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); - SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId); + stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId); (void)streamMetaAddFailedTask(pMeta, streamId, taskId); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -1479,10 +1484,28 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas // fill-history task can only be launched by related stream tasks. STaskExecStatisInfo* pInfo = &pTask->execInfo; if (pTask->info.fillHistory == 1) { + stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } + // the start all tasks procedure may happen to start the newly deployed stream task, and results in the + // concurrently start this task by two threads. + streamMutexLock(&pTask->lock); + SStreamTaskState status = streamTaskGetStatus(pTask); + if (status.state != TASK_STATUS__UNINIT) { + stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name); + continueExec = false; + } else { + continueExec = true; + } + streamMutexUnlock(&pTask->lock); + + if (!continueExec) { + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + ASSERT(pTask->status.downstreamReady == 0); // avoid initialization and destroy running concurrently. @@ -1498,11 +1521,17 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas streamMutexUnlock(&pTask->lock); } + // concurrently start task may cause the later started task be failed, and also failed to added into meta result. if (code == TSDB_CODE_SUCCESS) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, + tstrerror(code)); + + // do no added into result hashmap if it is failed due to concurrently starting of this stream task. + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } } } @@ -1536,11 +1565,12 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 int64_t endTs, bool ready) { STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskId id = {.streamId = streamId, .taskId = taskId}; + int32_t vgId = pMeta->vgId; streamMetaWLock(pMeta); SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { // task does not exists in current vnode, not record the complete info - stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); + stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId); streamMetaWUnLock(pMeta); return 0; } @@ -1555,7 +1585,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 stDebug( "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed " "time:%" PRId64 "ms", - pMeta->vgId, taskId, ready, el); + vgId, taskId, ready, el); streamMetaWUnLock(pMeta); return 0; } @@ -1565,6 +1595,15 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); if (code) { + if (code == TSDB_CODE_DUP_KEY) { + stError("vgId:%d record start task result failed, s-task:0x%" PRIx64 + " already exist start results in meta start task result hashmap", + vgId, id.taskId); + } else { + stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId); + } + streamMetaWUnLock(pMeta); + return code; } int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); @@ -1576,20 +1615,20 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, + vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); // print the initialization elapsed time and info displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo, pMeta->vgId); + streamMetaResetStartInfo(pStartInfo, vgId); streamMetaWUnLock(pMeta); code = pStartInfo->completeFn(pMeta); } else { streamMetaWUnLock(pMeta); - stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId, - ready, numOfRecv, numOfTotal); + stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready, + numOfRecv, numOfTotal); } return code; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index f9fcf36668..a83a0e4cc8 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -48,14 +48,15 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, - terrstr()); - return TSDB_CODE_OUT_OF_MEMORY; + terrstr(terrno)); + return terrno; } if (streamId != 0) { - stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType); + stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType, + streamTaskGetExecType(execType)); } else { - stDebug("vgId:%d create msg to exec, type:%d", vgId, execType); + stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType)); } pRunReq->head.vgId = vgId; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a249bad724..f07fd81953 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1149,4 +1149,25 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList); +} + +const char* streamTaskGetExecType(int32_t type) { + switch (type) { + case STREAM_EXEC_T_EXTRACT_WAL_DATA: + return "scan-wal-file"; + case STREAM_EXEC_T_START_ALL_TASKS: + return "start-all-tasks"; + case STREAM_EXEC_T_START_ONE_TASK: + return "start-one-task"; + case STREAM_EXEC_T_RESTART_ALL_TASKS: + return "restart-all-tasks"; + case STREAM_EXEC_T_STOP_ALL_TASKS: + return "stop-all-tasks"; + case STREAM_EXEC_T_RESUME_TASK: + return "resume-task-from-idle"; + case STREAM_EXEC_T_ADD_FAILED_TASK: + return "record-start-failed-task"; + default: + return "invalid-exec-type"; + } } \ No newline at end of file diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 275c9255d2..d3c39da6bd 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -322,12 +322,11 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt if (pTrans->attachEvent.event != 0) { code = attachWaitedEvent(pTask, &pTrans->attachEvent); + streamMutexUnlock(&pTask->lock); if (code) { return code; } - streamMutexUnlock(&pTask->lock); - while (1) { // wait for the task to be here streamMutexLock(&pTask->lock); @@ -406,7 +405,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { EStreamTaskEvent evt = pSM->pActiveTrans->event; streamMutexUnlock(&pTask->lock); - stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", + stDebug("s-task:%s status:%s handling event:%s by another thread, wait for 100ms and check if completed", pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); taosMsleep(100); } else { @@ -419,6 +418,13 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { } if (pSM->pActiveTrans != NULL) { + // not allowed concurrently initialization + if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) { + streamMutexUnlock(&pTask->lock); + stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr); + return TSDB_CODE_STREAM_INVALID_STATETRANS; + } + // currently in some state transfer procedure, not auto invoke transfer, abort it stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name, @@ -557,6 +563,11 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) { } const char* streamTaskGetStatusStr(ETaskStatus status) { + int32_t index = status; + if (index < 0 || index > tListLen(StreamTaskStatusList)) { + return ""; + } + return StreamTaskStatusList[status].name; } diff --git a/source/os/CMakeLists.txt b/source/os/CMakeLists.txt index b8362bbd3c..521669e360 100644 --- a/source/os/CMakeLists.txt +++ b/source/os/CMakeLists.txt @@ -22,6 +22,9 @@ endif () if(USE_TD_MEMORY) add_definitions(-DUSE_TD_MEMORY) endif () +if(BUILD_WITH_RAND_ERR) + add_definitions(-DBUILD_WITH_RAND_ERR) +endif () if(BUILD_ADDR2LINE) if(NOT TD_WINDOWS) target_include_directories( diff --git a/source/os/src/osLocale.c b/source/os/src/osLocale.c index c846ca82a3..2f835a7a27 100644 --- a/source/os/src/osLocale.c +++ b/source/os/src/osLocale.c @@ -75,7 +75,7 @@ char *taosCharsetReplace(char *charsetstr) { * * In case that the setLocale failed to be executed, the right charset needs to be set. */ -int32_t taosSetSystemLocale(const char *inLocale, const char *inCharSet) {\ +int32_t taosSetSystemLocale(const char *inLocale, const char *inCharSet) { if (!taosValidateEncodec(inCharSet)) { return terrno; } diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 3ec22d9187..4a7befb9c0 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -21,6 +21,10 @@ #endif #include "os.h" +int32_t tsRandErrChance = 1; +threadlocal bool tsEnableRandErr = 0; + + #if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE) #define TD_MEMORY_SYMBOL ('T' << 24 | 'A' << 16 | 'O' << 8 | 'S') @@ -266,6 +270,16 @@ void *taosMemMalloc(int64_t size) { return (char *)tmp + sizeof(TdMemoryInfo); #else + +#ifdef BUILD_WITH_RAND_ERR + if (tsEnableRandErr) { + uint32_t r = taosRand() % 10001; + if ((r + 1) <= tsRandErrChance) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } +#endif void *p = malloc(size); if (NULL == p) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -287,6 +301,16 @@ void *taosMemCalloc(int64_t num, int64_t size) { return (char *)tmp + sizeof(TdMemoryInfo); #else +#ifdef BUILD_WITH_RAND_ERR + if (tsEnableRandErr) { + uint32_t r = taosRand() % 10001; + if ((r + 1) <= tsRandErrChance) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } +#endif + void *p = calloc(num, size); if (NULL == p) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -317,6 +341,16 @@ void *taosMemRealloc(void *ptr, int64_t size) { return (char *)tmp + sizeof(TdMemoryInfo); #else +#ifdef BUILD_WITH_RAND_ERR + if (tsEnableRandErr) { + uint32_t r = taosRand() % 10001; + if ((r + 1) <= tsRandErrChance) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } +#endif + void *p = realloc(ptr, size); if (size > 0 && NULL == p) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -342,6 +376,16 @@ char *taosStrdupi(const char *ptr) { return (char *)tmp + sizeof(TdMemoryInfo); #else +#ifdef BUILD_WITH_RAND_ERR + if (tsEnableRandErr) { + uint32_t r = taosRand() % 10001; + if ((r + 1) <= tsRandErrChance) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } +#endif + return tstrdup(ptr); #endif } @@ -398,6 +442,16 @@ void *taosMemMallocAlign(uint32_t alignment, int64_t size) { ASSERT(0); #else #if defined(LINUX) +#ifdef BUILD_WITH_RAND_ERR + if (tsEnableRandErr) { + uint32_t r = taosRand() % 10001; + if ((r + 1) <= tsRandErrChance) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } +#endif + void *p = memalign(alignment, size); if (NULL == p) { if (ENOMEM == errno) { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 874df5a7c4..e185a9eb5a 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -45,28 +45,40 @@ namespace { -#define MP_PRINTF (void)printf +#define MPT_PRINTF (void)printf +#define MPT_MAX_SESSION_NUM 256 +#define MPT_MAX_JOB_NUM 20000 -bool jtErrorRerun = false; -bool jtInRerun = false; threadlocal void* mptThreadPoolHandle = NULL; threadlocal void* mptThreadPoolSession = NULL; -#define taosEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0) -#define taosDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL) -#define taosSaveDisableMemoryPoolUsage(_handle) do { (_handle) = mptThreadPoolHandle; mptThreadPoolHandle = NULL; } while (0) -#define taosRestoreEnableMemoryPoolUsage(_handle) (mptThreadPoolHandle = (_handle)) +#define mptEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0) +#define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL) +#define mptSaveDisableMemoryPoolUsage(_handle) do { (_handle) = mptThreadPoolHandle; mptThreadPoolHandle = NULL; } while (0) +#define mptRestoreEnableMemoryPoolUsage(_handle) (mptThreadPoolHandle = (_handle)) -#define taosMemoryMalloc(_size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolMalloc(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size))) -#define taosMemoryCalloc(_num, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolCalloc(mptThreadPoolHandle, mptThreadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) -#define taosMemoryRealloc(_ptr, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolRealloc(mptThreadPoolHandle, mptThreadPoolSession, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) -#define taosStrdup(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolStrdup(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr))) -#define taosMemoryFree(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolFree(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr))) -#define taosMemorySize(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolGetMemorySize(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr))) -#define taosMemoryTrim(_size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolTrim(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size))) -#define taosMemoryMallocAlign(_alignment, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolMallocAlign(mptThreadPoolHandle, mptThreadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) +#define mptMemoryMalloc(_size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolMalloc(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size))) +#define mptMemoryCalloc(_num, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolCalloc(mptThreadPoolHandle, mptThreadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) +#define mptMemoryRealloc(_ptr, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolRealloc(mptThreadPoolHandle, mptThreadPoolSession, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) +#define mptStrdup(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolStrdup(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr))) +#define mptMemoryFree(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolFree(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr))) +#define mptMemorySize(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolGetMemorySize(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr))) +#define mptMemoryTrim(_size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolTrim(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size))) +#define mptMemoryMallocAlign(_alignment, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolMallocAlign(mptThreadPoolHandle, mptThreadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) +typedef enum { + MPT_SMALL_MSIZE = 0, + MPT_BIG_MSIZE, +}; + +typedef struct { + int32_t jobNum; + int32_t sessionNum; + bool memSize[2]; + bool jobQuotaRetire; + bool poolRetire; +} SMPTCaseParam; typedef struct SMPTJobInfo { int8_t retired; @@ -79,25 +91,36 @@ typedef struct SMPTJobInfo { typedef struct { bool printTestInfo; bool printInputRow; -} SJoinTestCtrl; +} SMPTestCtrl; +typedef struct { + uint64_t jobId; + int32_t taskNum; + int64_t poolMaxSize; + int64_t npoolSize; + void* pSessions[MPT_MAX_SESSION_NUM]; + SMPTestTaskCtx taskCtxs[MPT_MAX_SESSION_NUM]; +} SMPTestJobCtx; + +typedef struct { + int64_t poolMaxSize; + int64_t npoolSize; + int32_t memActTimes; +} SMPTestTaskCtx; typedef struct SMPTestCtx { - SHashObj* pJobs; - BoundedQueue* pJobQueue; - void* memPoolHandle; + uint64_t qId; + uint64_t tId; + int32_t eId; + SHashObj* pJobs; + BoundedQueue* pJobQueue; + void* memPoolHandle; + SMPTestJobCtx jobCtxs[MPT_MAX_JOB_NUM]; } SMPTestCtx; SMPTestCtx mptCtx = {0}; -void rerunBlockedHere() { - while (jtInRerun) { - taosSsleep(1); - } -} - - void joinTestReplaceRetrieveFp() { static Stub stub; stub.set(getNextBlockFromDownstreamRemain, getDummyInputBlock); @@ -361,6 +384,38 @@ void mptInitPool(int64_t jobQuota, bool autoMaxSize, int64_t maxSize) { ASSERT_TRUE(0 == taosMemPoolOpen("SingleThreadTest", &cfg, &mptCtx.memPoolHandle)); } +void mptMemorySimulate(SMPTestTaskCtx* pCtx) { + +} + +void mptTaskRun(int32_t idx, uint64_t qId, uint64_t tId, int32_t eId) { + ASSERT_TRUE(0 == mptInitSession(qId, tId, eId, &mptCtx.pSessions[idx])); + + mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, mptCtx.pSessions[idx]); + mptMemorySimulate(&mptCtx.taskCtxs[idx]); + mptDisableMemoryPoolUsage(); + +} + +void* mptThreadFunc(void* param) { + int32_t* threadIdx = (int32_t*)param; + +} + +void mptStartThreadTest(int32_t threadIdx) { + TdThread t1; + TdThreadAttr thattr; + ASSERT_EQ(0, taosThreadAttrInit(&thattr)); + ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE)); + ASSERT_EQ(0, taosThreadCreate(&(t1), &thattr, mptThreadFunc, &threadIdx)); + ASSERT_EQ(0, taosThreadAttrDestroy(&thattr)); + +} + +void mptRunCase() { + +} + } // namespace #if 1 @@ -372,7 +427,6 @@ TEST(FuncTest, SingleThreadTest) { mptInitPool(0, false, 5*1048576UL); - ASSERT_TRUE(0 == mptInitSession(1, 1, 1, &pSession)); } diff --git a/tests/ci/filter_for_return_values b/tests/ci/filter_for_return_values new file mode 100644 index 0000000000..734619a8af --- /dev/null +++ b/tests/ci/filter_for_return_values @@ -0,0 +1,24 @@ +match callExpr( + hasParent(anyOf( + compoundStmt(), + doStmt(hasCondition(expr().bind("cond")))) + ), + unless(hasType(voidType())), + unless(callee(functionDecl(hasName("memcpy")))), + unless(callee(functionDecl(hasName("strcpy")))), + unless(callee(functionDecl(hasName("strcat")))), + unless(callee(functionDecl(hasName("strncpy")))), + unless(callee(functionDecl(hasName("memset")))), + unless(callee(functionDecl(hasName("memmove")))), + unless(callee(functionDecl(hasName("sprintf")))), + unless(callee(functionDecl(hasName("snprintf")))), + unless(callee(functionDecl(hasName("scanf")))), + unless(callee(functionDecl(hasName("sncanf")))), + unless(callee(functionDecl(hasName("printf")))), + unless(callee(functionDecl(hasName("printRow")))), + unless(callee(functionDecl(hasName("puts")))), + unless(callee(functionDecl(hasName("sleep")))), + unless(callee(functionDecl(hasName("printResult")))), + unless(callee(functionDecl(hasName("getchar")))), + unless(callee(functionDecl(hasName("taos_print_row")))), + unless(callee(functionDecl(hasName("fprintf"))))) \ No newline at end of file diff --git a/tests/ci/scan.py b/tests/ci/scan.py new file mode 100644 index 0000000000..c542fdf448 --- /dev/null +++ b/tests/ci/scan.py @@ -0,0 +1,106 @@ +import os +import subprocess +import csv +from datetime import datetime +from loguru import logger + +# log file path +log_file_path = "/root/charles/scan.log" +logger.add(log_file_path, rotation="10MB", retention="7 days", level="DEBUG") +# scan result base path +scan_result_base_path = "/root/charles/clang_scan_result/" +# the base source code file path +source_path = "/root/charles/TDinternal/" +# the compile commands json file path +compile_commands_path = "/root/charles/TDinternal/debug/compile_commands.json" +# the ast parser rule for c file +clang_scan_rules_path = "/root/charles/clang_scan_rules" +# all the c files path will be checked +all_file_path = [] + +class CommandExecutor: + def __init__(self): + self._process = None + + def execute(self, command, timeout=None): + try: + self._process = subprocess.Popen(command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = self._process.communicate(timeout=timeout) + return stdout.decode('utf-8'), stderr.decode('utf-8') + except subprocess.TimeoutExpired: + self._process.kill() + self._process.communicate() + raise Exception("Command execution timeout") + except Exception as e: + raise Exception("Command execution failed: %s" % e) + +def scan_files_path(source_file_path): + # scan_dir_list = ["source", "include", "docs/examples", "tests/script/api", "src/plugins"] + scan_dir_list = ["source", "include", "docs/examples", "src/plugins"] + scan_skip_file_list = ["/root/charles/TDinternal/community/tools/taosws-rs/target/release/build/openssl-sys-7811e597b848e397/out/openssl-build/install/include/openssl", + "/test/", "contrib", "debug", "deps", "/root/charles/TDinternal/community/source/libs/parser/src/sql.c", "/root/charles/TDinternal/community/source/client/jni/windows/win32/bridge/AccessBridgeCalls.c"] + for root, dirs, files in os.walk(source_file_path): + for file in files: + if any(item in root for item in scan_dir_list): + file_path = os.path.join(root, file) + if (file_path.endswith(".c") or file_path.endswith(".h") or file_path.endswith(".cpp")) and all(item not in file_path for item in scan_skip_file_list): + all_file_path.append(file_path) + logger.info("Found %s files" % len(all_file_path)) + +def save_scan_res(res_base_path, file_path, out, err): + file_res_path = os.path.join(res_base_path, file_path.replace("/root/charles/", "").split(".")[0] + ".res") + if not os.path.exists(os.path.dirname(file_res_path)): + os.makedirs(os.path.dirname(file_res_path)) + logger.info("Save scan result to: %s" % file_res_path) + # save scan result + with open(file_res_path, "w") as f: + f.write(out) + f.write(err) + +def write_csv(file_path, data): + try: + with open(file_path, 'w') as f: + writer = csv.writer(f) + writer.writerows(data) + except Exception as ex: + raise Exception("Failed to write the csv file: {} with msg: {}".format(file_path, repr(ex))) + +if __name__ == "__main__": + command_executor = CommandExecutor() + # get all the c files path + scan_files_path(source_path) + res = [] + # create dir + current_time = datetime.now().strftime("%Y%m%d%H%M%S") + scan_result_path = os.path.join(scan_result_base_path, current_time) + if not os.path.exists(scan_result_path): + os.makedirs(scan_result_path) + for file in all_file_path: + cmd = "clang-query -p %s %s -f %s" % (compile_commands_path, file, clang_scan_rules_path) + try: + stdout, stderr = command_executor.execute(cmd) + lines = stdout.split("\n") + if lines[-2].endswith("matches.") or lines[-2].endswith("match."): + match_num = int(lines[-2].split(" ")[0]) + logger.info("The match lines of file %s: %s" % (file, match_num)) + if match_num > 0: + save_scan_res(scan_result_path, file, stdout, stderr) + res.append([file, match_num, 'Pass' if match_num == 0 else 'Fail']) + else: + logger.warning("The result of scan is invalid for: %s" % file) + except Exception as e: + logger.error("Execute command failed: %s" % e) + # data = "" + # for item in res: + # data += item[0] + "," + str(item[1]) + "\n" + # logger.info("Csv data: %s" % data) + write_csv(os.path.join(scan_result_path, "scan_res.csv"), res) + logger.info("The result of scan: \n") + logger.info("Total files: %s" % len(res)) + logger.info("Total match lines: %s" % sum([item[1] for item in res])) + logger.info("Pass files: %s" % len([item for item in res if item[2] == 'Pass'])) + logger.info("Fail files: %s" % len([item for item in res if item[2] == 'Fail'])) + diff --git a/tests/ci/scan_file_path.py b/tests/ci/scan_file_path.py new file mode 100644 index 0000000000..03f2d6ee4f --- /dev/null +++ b/tests/ci/scan_file_path.py @@ -0,0 +1,229 @@ +import os +import sys +import subprocess +import csv +from datetime import datetime +from loguru import logger +import getopt + + +opts, args = getopt.gnu_getopt(sys.argv[1:], 'b:f:w:', [ + 'branch_name=']) +for key, value in opts: + if key in ['-h', '--help']: + print( + 'Usage: python3 scan.py -b -f ') + print('-b branch name or PR ID to scan') + print('-f change files list') + print('-w web server') + + sys.exit(0) + + if key in ['-b', '--branchName']: + branch_name = value + if key in ['-f', '--filesName']: + change_file_list = value + if key in ['-w', '--webServer']: + web_server = value + + +# the base source code file path +self_path = os.path.dirname(os.path.realpath(__file__)) + +# if ("community" in self_path): +# TD_project_path = self_path[:self_path.find("community")] +# work_path = TD_project_path[:TD_project_path.find("TDinternal")] + +# else: +# TD_project_path = self_path[:self_path.find("tests")] +# work_path = TD_project_path[:TD_project_path.find("TDengine")] + +# Check if "community" or "tests" is in self_path +index_community = self_path.find("community") +if index_community != -1: + TD_project_path = self_path[:index_community] + index_TDinternal = TD_project_path.find("TDinternal") + # Check if index_TDinternal is valid and set work_path accordingly + if index_TDinternal != -1: + work_path = TD_project_path[:index_TDinternal] +else: + index_tests = self_path.find("tests") + if index_tests != -1: + TD_project_path = self_path[:index_tests] + # Check if index_TDengine is valid and set work_path accordingly + index_TDengine = TD_project_path.find("TDengine") + if index_TDengine != -1: + work_path = TD_project_path[:index_TDengine] + + +# log file path +current_time = datetime.now().strftime("%Y%m%d-%H%M%S") +log_file_path = f"{work_path}/scan_log/scan_{branch_name}_{current_time}/" + +os.makedirs(log_file_path, exist_ok=True) + +scan_log_file = f"{log_file_path}/scan_log.txt" +logger.add(scan_log_file, rotation="10MB", retention="7 days", level="DEBUG") +#if error happens, open this to debug +# print(self_path,work_path,TD_project_path,log_file_path,change_file_list) + +# scan result base path +scan_result_base_path = f"{log_file_path}/clang_scan_result/" + + +# the compile commands json file path +# compile_commands_path = f"{work_path}/debugNoSan/compile_commands.json" +compile_commands_path = f"{TD_project_path}/debug/compile_commands.json" + +#if error happens, open this to debug +# print(f"compile_commands_path:{compile_commands_path}") + +# # replace the docerk worf path with real work path in compile_commands.json +# docker_work_path = "home" +# replace_path= work_path[1:-1] +# replace_path = replace_path.replace("/", "\/") +# sed_command = f"sed -i 's/{docker_work_path}/{replace_path}/g' {compile_commands_path}" +# print(sed_command) +# result = subprocess.run(sed_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) +# logger.debug(f"STDOUT: {result.stdout} STDERR: {result.stderr}") + +# the ast parser rule for c file +clang_scan_rules_path = f"{self_path}/filter_for_return_values" + +# +# all the c files path will be checked +all_file_path = [] + +class CommandExecutor: + def __init__(self): + self._process = None + + def execute(self, command, timeout=None): + try: + self._process = subprocess.Popen(command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = self._process.communicate(timeout=timeout) + return stdout.decode('utf-8'), stderr.decode('utf-8') + except subprocess.TimeoutExpired: + self._process.kill() + self._process.communicate() + raise Exception("Command execution timeout") + except Exception as e: + raise Exception("Command execution failed: %s" % e) + +def scan_files_path(source_file_path): + # scan_dir_list = ["source", "include", "docs/examples", "tests/script/api", "src/plugins"] + scan_dir_list = ["source", "include", "docs/examples", "src/plugins"] + scan_skip_file_list = ["/root/charles/TDinternal/community/tools/taosws-rs/target/release/build/openssl-sys-7811e597b848e397/out/openssl-build/install/include/openssl", + "/test/", "contrib", "debug", "deps", "/root/charles/TDinternal/community/source/libs/parser/src/sql.c", "/root/charles/TDinternal/community/source/client/jni/windows/win32/bridge/AccessBridgeCalls.c"] + for root, dirs, files in os.walk(source_file_path): + for file in files: + if any(item in root for item in scan_dir_list): + file_path = os.path.join(root, file) + if (file_path.endswith(".c") or file_path.endswith(".h") or file_path.endswith(".cpp")) and all(item not in file_path for item in scan_skip_file_list): + all_file_path.append(file_path) + logger.info("Found %s files" % len(all_file_path)) + +def input_files(change_files): + # scan_dir_list = ["source", "include", "docs/examples", "tests/script/api", "src/plugins"] + scan_dir_list = ["source", "include", "docs/examples", "src/plugins"] + scan_skip_file_list = [f"{TD_project_path}/TDinternal/community/tools/taosws-rs/target/release/build/openssl-sys-7811e597b848e397/out/openssl-build/install/include/openssl", "/test/", "contrib", "debug", "deps", f"{TD_project_path}/TDinternal/community/source/libs/parser/src/sql.c", f"{TD_project_path}/TDinternal/community/source/client/jni/windows/win32/bridge/AccessBridgeCalls.c"] + with open(change_files, 'r') as file: + for line in file: + file_name = line.strip() + if any(dir_name in file_name for dir_name in scan_dir_list): + if (file_name.endswith(".c") or file_name.endswith(".h") or line.endswith(".cpp")) and all(dir_name not in file_name for dir_name in scan_skip_file_list): + if "enterprise" in file_name: + file_name = os.path.join(TD_project_path, file_name) + else: + tdc_file_path = os.path.join(TD_project_path, "community/") + file_name = os.path.join(tdc_file_path, file_name) + all_file_path.append(file_name) + # print(f"all_file_path:{all_file_path}") + logger.info("Found %s files" % len(all_file_path)) +file_res_path = "" + +def save_scan_res(res_base_path, file_path, out, err): + global file_res_path + file_res_path = os.path.join(res_base_path, file_path.replace(f"{work_path}", "").split(".")[0] + ".txt") + # print(f"file_res_path:{file_res_path},res_base_path:{res_base_path},file_path:{file_path}") + if not os.path.exists(os.path.dirname(file_res_path)): + os.makedirs(os.path.dirname(file_res_path)) + logger.info("Save scan result to: %s" % file_res_path) + + # save scan result + with open(file_res_path, "w") as f: + f.write(err) + f.write(out) + logger.debug(f"file_res_file: {file_res_path}") + +def write_csv(file_path, data): + try: + with open(file_path, 'w') as f: + writer = csv.writer(f) + writer.writerows(data) + except Exception as ex: + raise Exception("Failed to write the csv file: {} with msg: {}".format(file_path, repr(ex))) + +if __name__ == "__main__": + command_executor = CommandExecutor() + # get all the c files path + # scan_files_path(TD_project_path) + input_files(change_file_list) + # print(f"all_file_path:{all_file_path}") + res = [] + web_path = [] + res.append(["scan_source_file", "scan_result_file", "match_num", "check_result"]) + # create dir + # current_time = datetime.now().strftime("%Y%m%d%H%M%S") + # scan_result_path = os.path.join(scan_result_base_path, current_time) + # scan_result_path = scan_result_base_path + # if not os.path.exists(scan_result_path): + # os.makedirs(scan_result_path) + + for file in all_file_path: + cmd = f"clang-query-10 -p {compile_commands_path} {file} -f {clang_scan_rules_path}" + logger.debug(f"cmd:{cmd}") + try: + stdout, stderr = command_executor.execute(cmd) + #if "error" in stderr: + # print(stderr) + lines = stdout.split("\n") + if lines[-2].endswith("matches.") or lines[-2].endswith("match."): + match_num = int(lines[-2].split(" ")[0]) + logger.info("The match lines of file %s: %s" % (file, match_num)) + if match_num > 0: + logger.info(f"log_file_path: {log_file_path} ,file:{file}") + save_scan_res(log_file_path, file, stdout, stderr) + index_tests = file_res_path.find("scan_log") + if index_tests != -1: + web_path_file = file_res_path[index_tests:] + web_path_file = os.path.join(web_server, web_path_file) + web_path.append(web_path_file) + res.append([file, file_res_path, match_num, 'Pass' if match_num == 0 else 'Fail']) + + else: + logger.warning("The result of scan is invalid for: %s" % file) + except Exception as e: + logger.error("Execute command failed: %s" % e) + # data = "" + # for item in res: + # data += item[0] + "," + str(item[1]) + "\n" + # logger.info("Csv data: %s" % data) + write_csv(os.path.join(log_file_path, "scan_res.txt"), res) + scan_result_log = f"{log_file_path}/scan_res.txt" + # delete the first element of res + res= res[1:] + logger.info("The result of scan: \n") + logger.info("Total scan files: %s" % len(res)) + logger.info("Total match lines: %s" % sum([item[2] for item in res])) + logger.info(f"scan log file : {scan_result_log}") + logger.info("Pass files: %s" % len([item for item in res if item[3] == 'Pass'])) + logger.info("Fail files: %s" % len([item for item in res if item[3] == 'Fail'])) + if len([item for item in res if item[3] == 'Fail']) > 0: + logger.error(f"Scan failed,please check the log file:{scan_result_log}") + for index, failed_result_file in enumerate(web_path): + logger.error(f"failed number: {index}, failed_result_file: {failed_result_file}") + exit(1) \ No newline at end of file diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh index 85e3d2ab73..26cabad107 100755 --- a/tests/parallel_test/container_build.sh +++ b/tests/parallel_test/container_build.sh @@ -83,7 +83,7 @@ docker run \ -v ${REP_REAL_PATH}/community/contrib/xml2/:${REP_DIR}/community/contrib/xml2 \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/zstd/:${REP_DIR}/community/contrib/zstd \ - --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0;make -j 10|| exit 1" + --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0 -DCMAKE_EXPORT_COMPILE_COMMANDS=1 ;make -j 10|| exit 1" # -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ if [[ -d ${WORKDIR}/debugNoSan ]] ;then diff --git a/tests/parallel_test/run_scan_container.sh b/tests/parallel_test/run_scan_container.sh new file mode 100755 index 0000000000..d16d1c3017 --- /dev/null +++ b/tests/parallel_test/run_scan_container.sh @@ -0,0 +1,98 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -d work dir" + echo -e "\t -b pr and id" + echo -e "\t -w web server " + echo -e "\t -f scan file " + echo -e "\t -h help" +} + +while getopts "d:b:w:f:h" opt; do + case $opt in + d) + WORKDIR=$OPTARG + ;; + b) + branch_name_id=$OPTARG + ;; + f) + scan_file_name=$OPTARG + ;; + w) + web_server=$OPTARG + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$branch_name_id" ]; then + usage + exit 1 +fi + +if [ -z "$scan_file_name" ]; then + usage + exit 1 +fi +if [ -z "$WORKDIR" ]; then + usage + exit 1 +fi +if [ -z "$web_server" ]; then + usage + exit 1 +fi + + # enterprise edition +INTERNAL_REPDIR=$WORKDIR/TDinternal +REPDIR_DEBUG=$WORKDIR/debugNoSan/ + +REP_MOUNT_DEBUG="${REPDIR_DEBUG}:/home/TDinternal/debug/" +REP_MOUNT_PARAM="$INTERNAL_REPDIR:/home/TDinternal" + +CONTAINER_TESTDIR=/home/TDinternal/community + +#scan change file path +scan_changefile_temp_path="$WORKDIR/tmp/${branch_name_id}/" +docker_can_changefile_temp_path="/home/tmp/${branch_name_id}/" +mkdir -p $scan_changefile_temp_path +scan_file_name="$docker_can_changefile_temp_path/docs_changed.txt" + +#scan log file path +scan_log_temp_path="$WORKDIR/log/scan_log/" +docker_scan_log_temp_path="/home/scan_log/" +mkdir -p $scan_log_temp_path + + +scan_scripts="$CONTAINER_TESTDIR/tests/ci/scan_file_path.py" + +ulimit -c unlimited +cat << EOF +docker run \ + -v $REP_MOUNT_PARAM \ + -v $REP_MOUNT_DEBUG \ + -v $scan_changefile_temp_path:$docker_can_changefile_temp_path \ + -v $scan_log_temp_path:$docker_scan_log_temp_path \ + --rm --ulimit core=-1 taos_test:v1.0 python3 $scan_scripts -b "${branch_name_id}" -f "${scan_file_name}" -w ${web_server} +EOF +docker run \ + -v $REP_MOUNT_PARAM \ + -v $REP_MOUNT_DEBUG \ + -v $scan_changefile_temp_path:$docker_can_changefile_temp_path \ + -v $scan_log_temp_path:$docker_scan_log_temp_path \ + --rm --ulimit core=-1 taos_test:v1.0 python3 $scan_scripts -b "${branch_name_id}" -f "${scan_file_name}" -w ${web_server} + + +ret=$? +exit $ret + diff --git a/tests/script/tsim/mnode/basic3.sim b/tests/script/tsim/mnode/basic3.sim index 02650ba10d..ff7c44b67d 100644 --- a/tests/script/tsim/mnode/basic3.sim +++ b/tests/script/tsim/mnode/basic3.sim @@ -36,7 +36,7 @@ if $data(3)[4] != ready then goto step1 endi -print =============== step2: create mnode 2 +print =============== step2: create mnode 2 3 sql create mnode on dnode 2 sql create mnode on dnode 3 sql_error create mnode on dnode 4 @@ -115,7 +115,7 @@ if $data(3)[4] != ready then goto step41 endi -print =============== step5: stop dnode1 +print =============== step5: stop dnode2 system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode2 -s stop @@ -154,7 +154,7 @@ if $data(3)[4] != ready then goto step51 endi -print =============== step6: stop dnode1 +print =============== step6: stop dnode3 system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode3 -s stop diff --git a/tests/script/tsim/query/interval-offset.sim b/tests/script/tsim/query/interval-offset.sim index 0d796af0a0..fe3e4c9844 100644 --- a/tests/script/tsim/query/interval-offset.sim +++ b/tests/script/tsim/query/interval-offset.sim @@ -200,8 +200,9 @@ if $data02 != 2678400000 then return -1 endi -sql_error select _wstart, count(tbcol), _wduration, _wstart, count(*) from ct3 interval(1n, 1w) sliding(2w) -sql_error select _wstart, count(tbcol), _wduration, _wstart, count(*) from ct3 interval(1n, 1w) sliding(4w) +sql select _wstart, count(tbcol), _wduration, _wstart, count(*) from ct3 interval(1n, 1w) sliding(2w) +sql select _wstart, count(tbcol), _wduration, _wstart, count(*) from ct3 interval(1n, 1w) sliding(4w) +sql_error select _wstart, count(tbcol), _wduration, _wstart, count(*) from ct3 interval(1n, 1w) sliding(5w) sql select _wstart, count(tbcol), _wduration, _wstart, count(*) from ct4 interval(1y, 6n) print ===> select _wstart, count(tbcol), _wduration, _wstart, count(*) from ct4 interval(1y, 6n) diff --git a/tests/script/tsim/query/interval.sim b/tests/script/tsim/query/interval.sim index 7f950ea69c..bce55eda27 100644 --- a/tests/script/tsim/query/interval.sim +++ b/tests/script/tsim/query/interval.sim @@ -276,6 +276,10 @@ sql insert into t6 values ("2024-03-01 14:34:07.051", 66); sleep 300 +sql select _wstart, count(*) from stb interval(1n) sliding(1d); +sql select _wstart, count(*) from stb interval(1n) sliding(28d); +sql_error select _wstart, count(*) from stb interval(1n) sliding(29d); + sql select _wstart, count(*) from (select * from stb partition by tbname) interval(2s); print $data00,$data01 diff --git a/tests/system-test/2-query/fill.py b/tests/system-test/2-query/fill.py index f5cd2d5855..64a43bd80a 100644 --- a/tests/system-test/2-query/fill.py +++ b/tests/system-test/2-query/fill.py @@ -1,6 +1,11 @@ +import queue +import random +from fabric2.runners import threading +from pandas._libs import interval import taos import sys +from util.common import TDCom from util.log import * from util.sql import * from util.cases import * @@ -8,6 +13,7 @@ from util.cases import * class TDTestCase: + updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'numOfVnodeQueryThreads': 80} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -15,7 +21,115 @@ class TDTestCase: #tdSql.init(conn.cursor()) tdSql.init(conn.cursor(), logSql) # output sql.txt file + def generate_fill_range(self, data_start: int, data_end: int, interval: int, step: int): + ret = [] + begin = data_start - 10 * interval + end = data_end + 10 * interval + for i in range(begin, end, step): + for j in range(begin, end, step): + ret.append((i,j)) + return ret + + def check_fill_range(self, where_start, where_end, res_asc, res_desc, sql: str, interval): + if len(res_asc) != len(res_desc): + tdLog.exit(f"err, asc desc with different rows, asc: {len(res_asc)}, desc: {len(res_desc)} sql: {sql}") + if len(res_asc) == 0: + tdLog.info(f'from {where_start} to {where_end} no rows returned') + return + asc_first = res_asc[0] + asc_last = res_asc[-1] + desc_first = res_desc[0] + desc_last = res_desc[-1] + if asc_first[0] != desc_last[0] or asc_last[0] != desc_first[0]: + tdLog.exit(f'fill sql different row data {sql}: asc<{asc_first[0].timestamp()}, {asc_last[0].timestamp()}>, desc<{desc_last[0].timestamp()}, {desc_first[0].timestamp()}>') + else: + tdLog.info(f'from {where_start} to {where_end} same time returned asc<{asc_first[0].timestamp()}, {asc_last[0].timestamp()}>, desc<{desc_last[0].timestamp()}, {desc_first[0].timestamp()}> interval: {interval}') + + def generate_partition_by(self): + val = random.random() + if val < 0.6: + return "" + elif val < 0.8: + return "partition by location" + else: + return "partition by tbname" + + def generate_fill_interval(self): + ret = [] + #intervals = [60, 90, 120, 300, 3600] + intervals = [120, 300, 3600] + for i in range(0, len(intervals)): + for j in range(0, i+1): + ret.append((intervals[i], intervals[j])) + return ret + + def generate_fill_sql(self, where_start, where_end, fill_interval: tuple): + partition_by = self.generate_partition_by() + where = f'where ts >= {where_start} and ts < {where_end}' + sql = f'select first(_wstart), last(_wstart) from (select _wstart, _wend, count(*) from test.meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)' + sql_asc = sql + " order by _wstart asc) t" + sql_desc = sql + " order by _wstart desc) t" + return sql_asc, sql_desc + + def fill_test_thread_routine(self, cli: TDSql, interval, data_start, data_end, step): + ranges = self.generate_fill_range(data_start, data_end, interval[0], step) + for range in ranges: + sql_asc, sql_desc = self.generate_fill_sql(range[0], range[1], interval) + cli.query(sql_asc, queryTimes=1) + asc_res = cli.queryResult + cli.query(sql_desc, queryTimes=1) + desc_res = cli.queryResult + self.check_fill_range(range[0], range[1], asc_res,desc_res , sql_asc, interval) + + def fill_test_task_routine(self, tdCom: TDCom, queue: queue.Queue): + cli = tdCom.newTdSql() + while True: + m: list = queue.get() + if len(m) == 0: + break + interval = m[0] + range = m[1] + sql_asc, sql_desc = self.generate_fill_sql(range[0], range[1], interval) + cli.query(sql_asc, queryTimes=1) + asc_res = cli.queryResult + cli.query(sql_desc, queryTimes=1) + desc_res = cli.queryResult + self.check_fill_range(range[0], range[1], asc_res,desc_res , sql_asc, interval) + cli.close() + + def schedule_fill_test_tasks(self): + num: int = 20 + threads = [] + tdCom = TDCom() + q: queue.Queue = queue.Queue() + for _ in range(num): + t = threading.Thread(target=self.fill_test_task_routine, args=(tdCom, q)) + t.start() + threads.append(t) + + data_start = 1500000000000 + data_end = 1500319968000 + step = 30000000 + + fill_intervals: list[tuple] = self.generate_fill_interval() + for interval in fill_intervals: + ranges = self.generate_fill_range(data_start, data_end, interval[0], step) + for r in ranges: + q.put([interval, r]) + + for _ in range(num): + q.put([]) + + for t in threads: + t.join() + + def test_fill_range(self): + os.system('taosBenchmark -t 10 -n 10000 -v 8 -S 32000 -y') + self.schedule_fill_test_tasks() + tdSql.execute('drop database test') + def run(self): + self.test_fill_range() dbname = "db" tbname = "tb" diff --git a/tests/system-test/2-query/test_td28163.py b/tests/system-test/2-query/test_td28163.py index a101549b66..005d78d075 100644 --- a/tests/system-test/2-query/test_td28163.py +++ b/tests/system-test/2-query/test_td28163.py @@ -176,7 +176,7 @@ class TDTestCase: def test_query_with_window(self): # time window tdSql.query("select sum(c_int_empty) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m) fill(value, 10);") - tdSql.checkRows(841) + tdSql.checkRows(845) tdSql.checkData(0, 0, 10) tdSql.query("select _wstart, _wend, sum(c_int) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m);") diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 29a7562b45..fccf6291b5 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1504,9 +1504,9 @@ class TDTestCase: # max number of list is 4093: 4096 - 3 - 2(原始表tag个数) - 1(tbname) tdSql.execute('use db4096') - self.create_tsma('tsma_4050', 'db4096', 'stb0', self.generate_tsma_function_list_columns(4050), '5m',check_tsma_calculation=False) + self.create_tsma('tsma_4050', 'db4096', 'stb0', self.generate_tsma_function_list_columns(4050), '5m',check_tsma_calculation=True) - self.create_tsma('tsma_4090', 'db4096', 'stb0', self.generate_tsma_function_list_columns(4090), '6m',check_tsma_calculation=False) + self.create_tsma('tsma_4090', 'db4096', 'stb0', self.generate_tsma_function_list_columns(4090), '6m',check_tsma_calculation=True) self.create_error_tsma('tsma_4091', 'db4096', 'stb0', self.generate_tsma_function_list_columns(4091), '5m', -2147473856) #Too many columns diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index dbaf108e6a..0ccbd683dc 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1170,6 +1170,7 @@ bool shellGetGrantInfo(char* buf) { code != TSDB_CODE_PAR_PERMISSION_DENIED) { fprintf(stderr, "Failed to check Server Edition, Reason:0x%04x:%s\r\n\r\n", code, taos_errstr(tres)); } + taos_free_result(tres); return community; }