From d007d31c4d205ef6558d59eeda471b3f31bf6574 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 27 Nov 2023 18:15:09 +0800 Subject: [PATCH 01/21] fix:process snode when drop stream --- source/dnode/mnode/impl/src/mndStream.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 235703428f..bc265db766 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -700,8 +700,17 @@ static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask STransAction action = {0}; SEpSet epset = {0}; if(pTask->info.nodeId == SNODE_HANDLE){ - SSnodeObj* pObj = mndAcquireSnode(pMnode, pTask->info.nodeId); - addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); + SSnodeObj *pObj = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); + sdbRelease(pMnode->pSdb, pObj); + } }else{ SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); epset = mndGetVgroupEpset(pMnode, pVgObj); From e66a1d6c9aa51aa71a9b58ffa0699813f91a5559 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 29 Nov 2023 09:14:44 +0800 Subject: [PATCH 02/21] enh: limit snap replication msg by size of block data in tsdbSnapReadTimeSeriesData --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index a9da0fbcec..8a7d1f8b97 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -278,6 +278,15 @@ _exit: return code; } +static int64_t tBlockDataSize(SBlockData* pBlockData) { + int64_t nData = 0; + for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) { + SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); + nData += pColData->nData; + } + return nData; +} + static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) { int32_t code = 0; int32_t lino = 0; @@ -320,8 +329,11 @@ static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** dat code = tsdbIterMergerNext(reader->dataIterMerger); TSDB_CHECK_CODE(code, lino, _exit); - if (reader->blockData->nRow >= 81920) { - break; + if (!(reader->blockData->nRow % 16)) { + int64_t nData = tBlockDataSize(reader->blockData); + if (nData >= 1 * 1024 * 1024) { + break; + } } } From 9e8944d17cf9d7433b956a5073766c1125908bae Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 29 Nov 2023 09:18:05 +0800 Subject: [PATCH 03/21] enh: adjust buffer size of sync snap replication --- include/util/tdef.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 69d0c1126d..1a440c7268 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -305,7 +305,7 @@ typedef enum ELogicConditionType { #define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 #define TSDB_SYNC_NEGOTIATION_WIN 512 -#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048 +#define TSDB_SYNC_SNAP_BUFFER_SIZE 1024 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta From 4f168b4b2dd579082db6d7b4b449fc400c97afed Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 29 Nov 2023 17:22:43 +0800 Subject: [PATCH 04/21] fix:tmqVnodeSplit-column.py failed in arm64 because of rebalance after unsubscribe while split vnode --- source/dnode/mnode/impl/src/mndStream.c | 31 +------------------ source/dnode/mnode/impl/src/mndSubscribe.c | 1 + source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqPush.c | 2 +- .../system-test/7-tmq/tmqVnodeSplit-column.py | 2 +- 5 files changed, 5 insertions(+), 33 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bc265db766..08c60e7afa 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -328,35 +328,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) { - if (NULL == ast) { - return TSDB_CODE_SUCCESS; - } - - SNode * pAst = NULL; - int32_t code = nodesStringToNode(ast, &pAst); - - SQueryPlan *pPlan = NULL; - if (TSDB_CODE_SUCCESS == code) { - SPlanContext cxt = { - .pAstRoot = pAst, - .topicQuery = false, - .streamQuery = true, - .triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType, - .watermark = watermark, - }; - code = qCreateQueryPlan(&cxt, &pPlan, NULL); - } - - if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL); - } - nodesDestroyNode(pAst); - nodesDestroyNode((SNode *)pPlan); - terrno = code; - return code; -} - static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) { SNode * pAst = NULL; SQueryPlan *pPlan = NULL; @@ -777,6 +748,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SDbObj * pDb = NULL; SCMCreateStreamReq createStreamReq = {0}; SStreamObj streamObj = {0}; + char* sql = NULL; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -808,7 +780,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - char* sql = NULL; int32_t sqlLen = 0; if(createStreamReq.sql != NULL){ sqlLen = strlen(createStreamReq.sql); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 408b664e50..44ee804d22 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -161,6 +161,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg, SSubplan* pPlan) { if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { + if(pRebVg->oldConsumerId == -1) return 0; //drop stream, no consumer, while split vnode,all consumerId is -1 terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9d16402ee6..f077caace7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -337,7 +337,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) { while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); + tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index f367bc96f8..8fee1d5904 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -72,7 +72,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); - tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py index 54a43465e7..87a73e981e 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -188,7 +188,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) From d9c84b4b720a502fe0b7d2b22709dbf3e3debea9 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 30 Nov 2023 11:55:17 +0800 Subject: [PATCH 05/21] enh: tolerate out-of-order of closeTS in walEndSnapshot --- source/libs/wal/src/walWrite.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 33d8d34514..341d989f8f 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -327,15 +327,19 @@ int32_t walEndSnapshot(SWal *pWal) { // iterate files, until the searched result // delete according to file size or close time + SWalFileInfo *pUntil = NULL; for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) || (pWal->cfg.retentionPeriod == 0 || pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) { - deleteCnt++; newTotSize -= iter->fileSize; - taosArrayPush(pWal->toDeleteFiles, iter); + pUntil = iter; } } + for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter <= pUntil; iter++) { + deleteCnt++; + taosArrayPush(pWal->toDeleteFiles, iter); + } // make new array, remove files taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); From f96044e09fcfa0286a31e049cbb9a74d00bf9219 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 30 Nov 2023 13:39:54 +0800 Subject: [PATCH 06/21] Revert "enh: grant and active codes" --- include/common/tgrant.h | 7 --- include/util/taoserror.h | 1 - source/dnode/mnode/impl/inc/mndCluster.h | 2 - source/dnode/mnode/impl/inc/mndDef.h | 2 - source/dnode/mnode/impl/src/mndCluster.c | 62 +------------------ source/dnode/mnode/impl/src/mndDnode.c | 20 ++---- source/util/src/terror.c | 1 - .../0-others/information_schema.py | 9 +-- 8 files changed, 8 insertions(+), 96 deletions(-) diff --git a/include/common/tgrant.h b/include/common/tgrant.h index a5f3ab2e3f..f06fca8014 100644 --- a/include/common/tgrant.h +++ b/include/common/tgrant.h @@ -31,8 +31,6 @@ extern "C" { #endif #define GRANT_HEART_BEAT_MIN 2 -#define GRANT_ACTIVE_CODE "activeCode" -#define GRANT_C_ACTIVE_CODE "cActiveCode" typedef enum { TSDB_GRANT_ALL, @@ -52,11 +50,6 @@ typedef enum { TSDB_GRANT_TABLE, } EGrantType; -typedef struct { - int64_t grantedTime; - int64_t connGrantedTime; -} SGrantedInfo; - int32_t grantCheck(EGrantType grant); int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0256a496df..6ab06d06a3 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -558,7 +558,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_GRANT_GEN_IVLD_KEY TAOS_DEF_ERROR_CODE(0, 0x0812) #define TSDB_CODE_GRANT_GEN_APP_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0813) #define TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN TAOS_DEF_ERROR_CODE(0, 0x0814) -#define TSDB_CODE_GRANT_PAR_IVLD_DIST TAOS_DEF_ERROR_CODE(0, 0x0815) // sync // #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x diff --git a/source/dnode/mnode/impl/inc/mndCluster.h b/source/dnode/mnode/impl/inc/mndCluster.h index 2b59d9dbf5..e33ffdb372 100644 --- a/source/dnode/mnode/impl/inc/mndCluster.h +++ b/source/dnode/mnode/impl/inc/mndCluster.h @@ -27,8 +27,6 @@ void mndCleanupCluster(SMnode *pMnode); int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len); int64_t mndGetClusterId(SMnode *pMnode); int64_t mndGetClusterCreateTime(SMnode *pMnode); -int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo); -int32_t mndSetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo); int64_t mndGetClusterUpTime(SMnode *pMnode); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 27fef1e81e..aa82dff7ec 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -192,8 +192,6 @@ typedef struct { int64_t createdTime; int64_t updateTime; int32_t upTime; - int64_t grantedTime; - int64_t connGrantedTime; } SClusterObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 26c678b513..4c799e1e1e 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -19,7 +19,7 @@ #include "mndTrans.h" #define CLUSTER_VER_NUMBE 1 -#define CLUSTER_RESERVE_SIZE 44 +#define CLUSTER_RESERVE_SIZE 60 int64_t tsExpireTime = 0; static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster); @@ -112,19 +112,6 @@ int64_t mndGetClusterCreateTime(SMnode *pMnode) { return createTime; } -int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo) { - void *pIter = NULL; - SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); - if (pCluster != NULL) { - pInfo->grantedTime = pCluster->grantedTime; - pInfo->connGrantedTime = pCluster->connGrantedTime; - mndReleaseCluster(pMnode, pCluster, pIter); - return 0; - } - - return -1; -} - static int32_t mndGetClusterUpTimeImp(SClusterObj *pCluster) { #if 0 int32_t upTime = taosGetTimestampSec() - pCluster->updateTime / 1000; @@ -159,8 +146,6 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER) SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER) - SDB_SET_INT64(pRaw, dataPos, pCluster->grantedTime, _OVER) - SDB_SET_INT64(pRaw, dataPos, pCluster->connGrantedTime, _OVER) SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) terrno = 0; @@ -201,8 +186,6 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER) SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER) SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER) - SDB_GET_INT64(pRaw, dataPos, &pCluster->grantedTime, _OVER); - SDB_GET_INT64(pRaw, dataPos, &pCluster->connGrantedTime, _OVER); SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER) terrno = 0; @@ -235,8 +218,6 @@ static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p, uptime from %d to %d", pOld->id, pOld, pNew, pOld->upTime, pNew->upTime); pOld->upTime = pNew->upTime; - pOld->grantedTime = pNew->grantedTime; - pOld->connGrantedTime = pNew->connGrantedTime; pOld->updateTime = taosGetTimestampMs(); return 0; } @@ -378,44 +359,3 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) { mndTransDrop(pTrans); return 0; } - -int32_t mndSetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo) { - SClusterObj clusterObj = {0}; - void *pIter = NULL; - SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter); - if (pCluster != NULL) { - if (pCluster->grantedTime >= pInfo->grantedTime && pCluster->connGrantedTime >= pInfo->connGrantedTime) { - mndReleaseCluster(pMnode, pCluster, pIter); - return 0; - } - memcpy(&clusterObj, pCluster, sizeof(SClusterObj)); - if (pCluster->grantedTime < pInfo->grantedTime) clusterObj.grantedTime = pInfo->grantedTime; - if (pCluster->connGrantedTime < pInfo->connGrantedTime) clusterObj.connGrantedTime = pInfo->connGrantedTime; - mndReleaseCluster(pMnode, pCluster, pIter); - } - - if (clusterObj.id <= 0) { - mError("can't get cluster info while update granted info"); - return -1; - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "granted-info"); - if (pTrans == NULL) return -1; - - SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - - mndTransDrop(pTrans); - return 0; -} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index b0bffcc83e..e224aceec2 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -790,9 +790,7 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg if (cfgAll) { // alter all dnodes: if (!failRecord) failRecord = taosArrayInit(1, sizeof(int32_t)); if (failRecord) taosArrayPush(failRecord, &pDnode->id); - if (0 == cfgAllErr || cfgAllErr == TSDB_CODE_GRANT_PAR_IVLD_ACTIVE) { - cfgAllErr = terrno; // output 1st or more specific error - } + if (0 == cfgAllErr) cfgAllErr = terrno; // output 1st terrno. } } else { terrno = 0; // no action for dup active code @@ -808,9 +806,7 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg if (cfgAll) { if (!failRecord) failRecord = taosArrayInit(1, sizeof(int32_t)); if (failRecord) taosArrayPush(failRecord, &pDnode->id); - if (0 == cfgAllErr || cfgAllErr == TSDB_CODE_GRANT_PAR_IVLD_ACTIVE) { - cfgAllErr = terrno; // output 1st or more specific error - } + if (0 == cfgAllErr) cfgAllErr = terrno; } } else { terrno = 0; @@ -1287,12 +1283,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { strcpy(dcfgReq.config, "supportvnodes"); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); - } else if (strncasecmp(cfgReq.config, GRANT_ACTIVE_CODE, 10) == 0 || - strncasecmp(cfgReq.config, GRANT_C_ACTIVE_CODE, 11) == 0) { - if (cfgReq.dnodeId != -1) { - terrno = TSDB_CODE_INVALID_CFG; - goto _err_out; - } + } else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) { int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE; int8_t index = opt == DND_ACTIVE_CODE ? 10 : 11; if (' ' != cfgReq.config[index] && 0 != cfgReq.config[index]) { @@ -1310,11 +1301,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { goto _err_out; } - strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? GRANT_ACTIVE_CODE : GRANT_C_ACTIVE_CODE); + strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? "activeCode" : "cActiveCode"); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%s", cfgReq.value); - if ((terrno = mndConfigDnode(pMnode, pReq, &cfgReq, opt)) != 0) { + if (mndConfigDnode(pMnode, pReq, &cfgReq, opt) != 0) { mError("dnode:%d, failed to config activeCode since %s", cfgReq.dnodeId, terrstr()); + terrno = TSDB_CODE_INVALID_CFG; goto _err_out; } tFreeSMCfgDnodeReq(&cfgReq); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8847b7d894..cc6647e463 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -445,7 +445,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_DEC_IVLD_KLEN, "Invalid klen to decod TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_IVLD_KEY, "Invalid key to gen active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_APP_LIMIT, "Limited app num to gen active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN, "Invalid klen to encode active code") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_IVLD_DIST, "Invalid dist to parse active code") // sync TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout") diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 544a966960..2bfe33d0af 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -247,10 +247,7 @@ class TDTestCase: tdSql.error('alter all dnodes "activeCode" "' + self.str510 + '"') tdSql.query(f'select * from information_schema.ins_dnodes') tdSql.checkEqual(tdSql.queryResult[0][8],"") - tdSql.error('alter dnode 1 "activeCode" ""') - tdSql.error('alter dnode 1 "activeCode"') - tdSql.execute('alter all dnodes "activeCode" ""') - tdSql.execute('alter all dnodes "activeCode"') + tdSql.execute('alter dnode 1 "activeCode" ""') tdSql.query(f'select active_code,c_active_code from information_schema.ins_dnodes') tdSql.checkEqual(tdSql.queryResult[0][0],"") tdSql.checkEqual(tdSql.queryResult[0][1],'') @@ -262,10 +259,6 @@ class TDTestCase: tdSql.error('alter all dnodes "cActiveCode" "' + self.str257 + '"') tdSql.error('alter all dnodes "cActiveCode" "' + self.str254 + '"') tdSql.error('alter dnode 1 "cActiveCode" "' + self.str510 + '"') - tdSql.error('alter dnode 1 "cActiveCode" ""') - tdSql.error('alter dnode 1 "cActiveCode"') - tdSql.execute('alter all dnodes "cActiveCode" ""') - tdSql.execute('alter all dnodes "cActiveCode"') tdSql.query(f'select active_code,c_active_code from information_schema.ins_dnodes') tdSql.checkEqual(tdSql.queryResult[0][0],"") tdSql.checkEqual(tdSql.queryResult[0][1],"") From 71860bfb46a75c47e769ae4b41e6e8319a43691b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Nov 2023 13:57:00 +0800 Subject: [PATCH 07/21] fix(stream):return the error code if create stream failed. --- source/dnode/mnode/impl/src/mndStream.c | 8 ++++++-- source/dnode/mnode/impl/src/mndTrans.c | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index feb0c3e52a..9e22af96fa 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -741,13 +741,15 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { / if (numOfStream > MND_STREAM_MAX_NUM) { mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); sdbCancelFetch(pMnode->pSdb, pIter); - return TSDB_CODE_MND_TOO_MANY_STREAMS; + terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; + return terrno; } if (pStream->targetStbUid == pStreamObj->targetStbUid) { mError("Cannot write the same stable as other stream:%s", pStream->name); sdbCancelFetch(pMnode->pSdb, pIter); - return TSDB_CODE_MND_INVALID_TARGET_TABLE; + terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; + return terrno; } } @@ -889,6 +891,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + code = terrno; mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); } @@ -898,6 +901,7 @@ _OVER: if (sql != NULL) { taosMemoryFreeClear(sql); } + return code; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index c9756ef814..7749decf91 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -834,7 +834,7 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) { if (mndCheckTransConflict(pMnode, pTrans)) { terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - return -1; + return terrno; } return 0; From de82aba2e55a96692c17bbd912ab1b7c7efa251b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Nov 2023 14:32:14 +0800 Subject: [PATCH 08/21] fix(stream): ignore the in_progress code. --- source/dnode/mnode/impl/src/mndStream.c | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9e22af96fa..8ee0d8fab5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -758,11 +758,11 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { / static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - int32_t code = -1; SStreamObj *pStream = NULL; SStreamObj streamObj = {0}; char *sql = NULL; int32_t sqlLen = 0; + terrno = TSDB_CODE_SUCCESS; SCMCreateStreamReq createStreamReq = {0}; if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) { @@ -785,7 +785,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { if (pStream != NULL) { if (createStreamReq.igExists) { mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name); - code = 0; goto _OVER; } else { terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; @@ -808,8 +807,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - code = checkForNumOfStreams(pMnode, &streamObj); - if (code != TSDB_CODE_SUCCESS) { + if (checkForNumOfStreams(pMnode, &streamObj) < 0) { goto _OVER; } @@ -872,8 +870,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { saveStreamTasksInfo(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); - code = TSDB_CODE_ACTION_IN_PROGRESS; - SName dbname = {0}; tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -890,8 +886,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } _OVER: - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - code = terrno; + if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); } @@ -902,7 +897,7 @@ _OVER: taosMemoryFreeClear(sql); } - return code; + return terrno; } int64_t mndStreamGenChkpId(SMnode *pMnode) { From b0908b4aa6f76f95216551e3eecea7a62dcdd29e Mon Sep 17 00:00:00 2001 From: charles Date: Thu, 30 Nov 2023 14:57:02 +0800 Subject: [PATCH 09/21] update test cases for amr64 --- tests/parallel_test/cases.task | 2 +- tests/system-test/0-others/compatibility.py | 5 ++++- .../0-others/test_hot_refresh_configurations.py | 4 +++- tests/system-test/6-cluster/5dnode3mnodeRoll.py | 7 +++++-- tests/system-test/7-tmq/tmqDnodeRestart.py | 4 ++++ tests/system-test/7-tmq/tmqVnodeSplit-column.py | 2 +- 6 files changed, 18 insertions(+), 6 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index d039495351..45616c3f19 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -880,7 +880,7 @@ e ,,y,script,./test.sh -f tsim/dnode/balance2.sim ,,y,script,./test.sh -f tsim/vnode/replica3_repeat.sim ,,y,script,./test.sh -f tsim/parser/col_arithmetic_operation.sim -,,y,script,./test.sh -f tsim/trans/create_db.sim +#,,y,script,./test.sh -f tsim/trans/create_db.sim ,,y,script,./test.sh -f tsim/dnode/balance3.sim ,,y,script,./test.sh -f tsim/vnode/replica3_many.sim ,,y,script,./test.sh -f tsim/stable/metrics_idx.sim diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 83bfb2bed7..d54c676c0d 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -90,7 +90,10 @@ class TDTestCase: packagePath = "/usr/local/src/" dataPath = cPath + "/../data/" - packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz" + if platform.system() == "Linux" and platform.machine() == "aarch64": + packageName = "TDengine-server-"+ BASEVERSION + "-Linux-arm64.tar.gz" + else: + packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz" packageTPath = packageName.split("-Linux-")[0] my_file = Path(f"{packagePath}/{packageName}") if not my_file.exists(): diff --git a/tests/system-test/0-others/test_hot_refresh_configurations.py b/tests/system-test/0-others/test_hot_refresh_configurations.py index 7aed7274a4..cbde8c060e 100644 --- a/tests/system-test/0-others/test_hot_refresh_configurations.py +++ b/tests/system-test/0-others/test_hot_refresh_configurations.py @@ -2,7 +2,7 @@ import subprocess import random import time import os - +import platform from util.log import * from util.sql import * from util.cases import * @@ -190,6 +190,8 @@ class TDTestCase: for v in values: dnode = random.choice(p_list) tdSql.execute(f'alter {dnode} "{name} {v}";') + if platform.system() == "Linux" and platform.machine() == "aarch64": + continue value = self.get_param_value_with_gdb(alias, "taosd") if value: tdLog.debug(f"value: {value}") diff --git a/tests/system-test/6-cluster/5dnode3mnodeRoll.py b/tests/system-test/6-cluster/5dnode3mnodeRoll.py index 9d62eb3b4b..11a153c48f 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeRoll.py +++ b/tests/system-test/6-cluster/5dnode3mnodeRoll.py @@ -4,7 +4,7 @@ import taos import sys import time import os - +import platform from util.log import * from util.sql import * from util.cases import * @@ -96,7 +96,10 @@ class TDTestCase: packagePath = "/usr/local/src/" dataPath = cPath + "/../data/" - packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz" + if platform.system() == "Linux" and platform.machine() == "aarch64": + packageName = "TDengine-server-"+ BASEVERSION + "-Linux-arm64.tar.gz" + else: + packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz" packageTPath = packageName.split("-Linux-")[0] my_file = Path(f"{packagePath}/{packageName}") if not my_file.exists(): diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index 74aba31726..0ac8482163 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -4,6 +4,7 @@ import sys import time import socket import os +import platform import threading from enum import Enum @@ -184,6 +185,9 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl + # ARM64:time cost is so long for stopping taosd, so add the pollDdelay to 120s + if platform.system() == "Linux" and platform.machine() == "aarch64": + paraDict['pollDelay'] = 300 tmqCom.initConsumerTable() # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py index 54a43465e7..87a73e981e 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -188,7 +188,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) From 3f6db938f36293da34f4514d336240a9ed068587 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Nov 2023 16:00:02 +0800 Subject: [PATCH 10/21] fix:minus row num to reduce execute time --- tests/system-test/7-tmq/tmqVnodeSplit-column.py | 6 +++--- tests/system-test/7-tmq/tmqVnodeSplit-db.py | 8 ++++---- .../7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py | 8 ++++---- .../7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeSplit-stb.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeTransform-db.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeTransform-stb.py | 8 ++++---- tests/system-test/7-tmq/tmqVnodeTransform.py | 8 ++++---- 9 files changed, 35 insertions(+), 35 deletions(-) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py index 87a73e981e..1fe2b5809a 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -23,7 +23,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -49,7 +49,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -118,7 +118,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-db.py b/tests/system-test/7-tmq/tmqVnodeSplit-db.py index e4353d3268..f66acf4fcd 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-db.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-db.py @@ -23,7 +23,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -49,7 +49,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -118,7 +118,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -189,7 +189,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py index 8276ae638b..68fb07b813 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata-false.py @@ -25,7 +25,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -51,7 +51,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -120,7 +120,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 120, @@ -189,7 +189,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py index 0d247b2848..6140e8a544 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py @@ -25,7 +25,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -51,7 +51,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -120,7 +120,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 120, @@ -189,7 +189,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py index cda5a27919..18b80b7f8d 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py @@ -27,7 +27,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -53,7 +53,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -122,7 +122,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 120, @@ -192,7 +192,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py index 17a427567e..c203350322 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb.py @@ -25,7 +25,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -51,7 +51,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -120,7 +120,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -190,7 +190,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-db.py b/tests/system-test/7-tmq/tmqVnodeTransform-db.py index 005bca70d6..5c61908d96 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-db.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-db.py @@ -20,7 +20,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -46,7 +46,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 30, @@ -138,7 +138,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 10, @@ -217,7 +217,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 10, diff --git a/tests/system-test/7-tmq/tmqVnodeTransform-stb.py b/tests/system-test/7-tmq/tmqVnodeTransform-stb.py index ec1331ae59..64cdf6d153 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform-stb.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform-stb.py @@ -20,7 +20,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -46,7 +46,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -137,7 +137,7 @@ class TDTestCase: 'ctbPrefix': 'ctb1', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -207,7 +207,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqVnodeTransform.py b/tests/system-test/7-tmq/tmqVnodeTransform.py index aab94bc7a2..3698297618 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform.py @@ -20,7 +20,7 @@ class TDTestCase: def __init__(self): self.vgroups = 1 self.ctbNum = 10 - self.rowsPerTbl = 10000 + self.rowsPerTbl = 1000 def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -46,7 +46,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -137,7 +137,7 @@ class TDTestCase: 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, 'ctbNum': 10, - 'rowsPerTbl': 10000, + 'rowsPerTbl': 1000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 60, @@ -203,7 +203,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) From 8ca00ca70b67996cd3c66f3e09885147cadf6044 Mon Sep 17 00:00:00 2001 From: charles Date: Thu, 30 Nov 2023 14:57:02 +0800 Subject: [PATCH 11/21] remove node.sh test case from CI due to exist issue --- tests/parallel_test/cases.task | 4 ++-- tests/system-test/0-others/compatibility.py | 5 ++++- .../0-others/test_hot_refresh_configurations.py | 4 +++- tests/system-test/6-cluster/5dnode3mnodeRoll.py | 7 +++++-- tests/system-test/7-tmq/tmqDnodeRestart.py | 4 ++++ tests/system-test/7-tmq/tmqVnodeSplit-column.py | 2 +- 6 files changed, 19 insertions(+), 7 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index d039495351..ce6bcfe105 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -880,7 +880,7 @@ e ,,y,script,./test.sh -f tsim/dnode/balance2.sim ,,y,script,./test.sh -f tsim/vnode/replica3_repeat.sim ,,y,script,./test.sh -f tsim/parser/col_arithmetic_operation.sim -,,y,script,./test.sh -f tsim/trans/create_db.sim +#,,y,script,./test.sh -f tsim/trans/create_db.sim ,,y,script,./test.sh -f tsim/dnode/balance3.sim ,,y,script,./test.sh -f tsim/vnode/replica3_many.sim ,,y,script,./test.sh -f tsim/stable/metrics_idx.sim @@ -1337,7 +1337,7 @@ e #docs-examples test ,,n,docs-examples-test,bash python.sh -,,n,docs-examples-test,bash node.sh +#,,n,docs-examples-test,bash node.sh ,,n,docs-examples-test,bash csharp.sh ,,n,docs-examples-test,bash jdbc.sh ,,n,docs-examples-test,bash go.sh diff --git a/tests/system-test/0-others/compatibility.py b/tests/system-test/0-others/compatibility.py index 83bfb2bed7..d54c676c0d 100644 --- a/tests/system-test/0-others/compatibility.py +++ b/tests/system-test/0-others/compatibility.py @@ -90,7 +90,10 @@ class TDTestCase: packagePath = "/usr/local/src/" dataPath = cPath + "/../data/" - packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz" + if platform.system() == "Linux" and platform.machine() == "aarch64": + packageName = "TDengine-server-"+ BASEVERSION + "-Linux-arm64.tar.gz" + else: + packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz" packageTPath = packageName.split("-Linux-")[0] my_file = Path(f"{packagePath}/{packageName}") if not my_file.exists(): diff --git a/tests/system-test/0-others/test_hot_refresh_configurations.py b/tests/system-test/0-others/test_hot_refresh_configurations.py index 7aed7274a4..cbde8c060e 100644 --- a/tests/system-test/0-others/test_hot_refresh_configurations.py +++ b/tests/system-test/0-others/test_hot_refresh_configurations.py @@ -2,7 +2,7 @@ import subprocess import random import time import os - +import platform from util.log import * from util.sql import * from util.cases import * @@ -190,6 +190,8 @@ class TDTestCase: for v in values: dnode = random.choice(p_list) tdSql.execute(f'alter {dnode} "{name} {v}";') + if platform.system() == "Linux" and platform.machine() == "aarch64": + continue value = self.get_param_value_with_gdb(alias, "taosd") if value: tdLog.debug(f"value: {value}") diff --git a/tests/system-test/6-cluster/5dnode3mnodeRoll.py b/tests/system-test/6-cluster/5dnode3mnodeRoll.py index 9d62eb3b4b..11a153c48f 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeRoll.py +++ b/tests/system-test/6-cluster/5dnode3mnodeRoll.py @@ -4,7 +4,7 @@ import taos import sys import time import os - +import platform from util.log import * from util.sql import * from util.cases import * @@ -96,7 +96,10 @@ class TDTestCase: packagePath = "/usr/local/src/" dataPath = cPath + "/../data/" - packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz" + if platform.system() == "Linux" and platform.machine() == "aarch64": + packageName = "TDengine-server-"+ BASEVERSION + "-Linux-arm64.tar.gz" + else: + packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz" packageTPath = packageName.split("-Linux-")[0] my_file = Path(f"{packagePath}/{packageName}") if not my_file.exists(): diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index 74aba31726..0ac8482163 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -4,6 +4,7 @@ import sys import time import socket import os +import platform import threading from enum import Enum @@ -184,6 +185,9 @@ class TDTestCase: paraDict['vgroups'] = self.vgroups paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl + # ARM64:time cost is so long for stopping taosd, so add the pollDdelay to 120s + if platform.system() == "Linux" and platform.machine() == "aarch64": + paraDict['pollDelay'] = 300 tmqCom.initConsumerTable() # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-column.py b/tests/system-test/7-tmq/tmqVnodeSplit-column.py index 54a43465e7..87a73e981e 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-column.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-column.py @@ -188,7 +188,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt / 2 >= resultList[0]: + if expectrowcnt / 2 > resultList[0]: tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) From 30d78c6be15bd270b90c739a385c16fcce849e5e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Nov 2023 16:10:09 +0800 Subject: [PATCH 12/21] fix(stream): make sure open operator only once for scan-history task. --- include/libs/stream/tstream.h | 3 ++- source/libs/stream/src/streamExec.c | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7f65ef8358..9f88672231 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -402,7 +402,8 @@ typedef struct SHistoryTaskInfo { int32_t tickCount; int32_t retryTimes; int32_t waitInterval; - int64_t haltVer; // offset in wal when halt the stream task + int64_t haltVer; // offset in wal when halt the stream task + bool operatorOpen; // false by default } SHistoryTaskInfo; typedef struct STaskOutputInfo { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8ab8f3852e..25f32195be 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -256,7 +256,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { bool finished = false; const char* id = pTask->id.idStr; - qSetStreamOpOpen(exec); + if (!pTask->hTaskInfo.operatorOpen) { + qSetStreamOpOpen(exec); + pTask->hTaskInfo.operatorOpen = true; + } while (1) { if (streamTaskShouldPause(pTask)) { From 77a48f87113f2a8c3ea81296421c61587a88ddff Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Nov 2023 17:36:03 +0800 Subject: [PATCH 13/21] fix:test case --- tests/script/tsim/snode/basic1.sim | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/script/tsim/snode/basic1.sim b/tests/script/tsim/snode/basic1.sim index 86072215f7..6e553591bd 100644 --- a/tests/script/tsim/snode/basic1.sim +++ b/tests/script/tsim/snode/basic1.sim @@ -113,11 +113,7 @@ sql_error drop snode on dnode 2 print =============== create drop snodes sql create snode on dnode 1 -sql create snode on dnode 2 -sql show snodes -if $rows != 2 then - return -1 -endi +sql_error create snode on dnode 2 print =============== restart system sh/exec.sh -n dnode1 -s stop -x SIGINT From acce0852df15aefcd818c0c1388ab3a920a3b3e6 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Nov 2023 19:52:27 +0800 Subject: [PATCH 14/21] fix:test case --- tests/script/tsim/snode/basic1.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/tsim/snode/basic1.sim b/tests/script/tsim/snode/basic1.sim index 6e553591bd..e7346b75a0 100644 --- a/tests/script/tsim/snode/basic1.sim +++ b/tests/script/tsim/snode/basic1.sim @@ -123,7 +123,7 @@ system sh/exec.sh -n dnode2 -s start sleep 2000 sql show snodes -if $rows != 2 then +if $rows != 1 then return -1 endi From 353c8d21a9ba89bec6cbc87e746a3c9e3edb6e48 Mon Sep 17 00:00:00 2001 From: haoranchen Date: Fri, 1 Dec 2023 09:25:41 +0800 Subject: [PATCH 15/21] ci:increase timeout of mac test in Jenkinsfile2 --- Jenkinsfile2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 1ab6a4a8cd..d3fc05a1d2 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -393,7 +393,7 @@ pipeline { agent{label " Mac_catalina "} steps { catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') { - timeout(time: 20, unit: 'MINUTES'){ + timeout(time: 30, unit: 'MINUTES'){ pre_test() pre_test_build_mac() } From eac3cbf564ff75085fa7484e3c1033317b6e9962 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Dec 2023 11:12:57 +0800 Subject: [PATCH 16/21] fix(stream): merge 3.0 --- include/libs/stream/tstream.h | 3 ++- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 +++- source/libs/stream/src/streamStart.c | 30 ++++++++++------------ 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9f88672231..2406601722 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -853,7 +853,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ); +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index aee2aaa244..b1d49bf31b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -475,6 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); if (!isLeader) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId, rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId); return code; @@ -482,6 +483,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId); if (pTask == NULL) { + streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", rsp.streamId, rsp.upstreamTaskId, vgId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -670,7 +672,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); streamMetaReleaseTask(pMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index a4c448f678..6e4fe45684 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -67,7 +67,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p); - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); return TSDB_CODE_SUCCESS; } @@ -469,14 +470,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + taosGetTimestampMs(), false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); - streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, + taosGetTimestampMs(), false); streamMetaReleaseTask(pTask->pMeta, pHTask); } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms @@ -1066,29 +1069,26 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) } } -int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) { - SStreamMeta* pMeta = pTask->pMeta; +int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, + int64_t endTs, bool ready) { + STaskStartInfo* pStartInfo = &pMeta->startInfo; + STaskId id = {.streamId = streamId, .taskId = taskId}; streamMetaWLock(pMeta); - - STaskId id = streamTaskExtractKey(pTask); - STaskStartInfo* pStartInfo = &pMeta->startInfo; - - SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet; + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - if (numOfRecv == numOfTotal) { + if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; - stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64 + stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64 ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, + pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); // print the initialization elapsed time and info @@ -1096,8 +1096,6 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); streamMetaResetStartInfo(pStartInfo); - } else { - stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); } streamMetaWUnLock(pMeta); From 4e5853d9fe305c26b88b7107bd9a6c8fceab7be6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Dec 2023 12:09:49 +0800 Subject: [PATCH 17/21] refactor: do some internal refactor. --- source/libs/stream/src/streamStart.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 6e4fe45684..0b6603cd7b 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1081,8 +1081,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet); - if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { + if (numOfRecv == numOfTotal) { pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; @@ -1094,8 +1095,9 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamI // print the initialization elapsed time and info displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo); + } else { + stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal); } streamMetaWUnLock(pMeta); From fd290709ff45bd99dfb14cf6c6d2c25e37911788 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 1 Dec 2023 14:05:04 +0800 Subject: [PATCH 18/21] fix: state window with block rowIndex overflow --- source/libs/executor/src/timewindowoperator.c | 4 +- tests/parallel_test/cases.task | 1 + tests/system-test/2-query/state_window.py | 203 ++++++++++++++++++ 3 files changed, 205 insertions(+), 3 deletions(-) create mode 100644 tests/system-test/2-query/state_window.py diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index db7c5e2570..0ca91f74ad 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -899,6 +899,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI SWindowRowsSup* pRowSup = &pInfo->winSup; pRowSup->numOfRows = 0; + pRowSup->startRowIndex = 0; struct SColumnDataAgg* pAgg = NULL; for (int32_t j = 0; j < pBlock->info.rows; ++j) { @@ -923,9 +924,6 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI doKeepTuple(pRowSup, tsList[j], gid); } else if (compareVal(val, &pInfo->stateKey)) { doKeepTuple(pRowSup, tsList[j], gid); - if (j == 0 && pRowSup->startRowIndex != 0) { - pRowSup->startRowIndex = 0; - } } else { // a new state window started SResultRow* pResult = NULL; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 7d3efbf181..af835c137a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -856,6 +856,7 @@ e ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/odbc.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill_with_group.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/state_window.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py ,,n,system-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/insertMix.py -N 3 diff --git a/tests/system-test/2-query/state_window.py b/tests/system-test/2-query/state_window.py new file mode 100644 index 0000000000..a211cd9dbe --- /dev/null +++ b/tests/system-test/2-query/state_window.py @@ -0,0 +1,203 @@ +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +# from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 4 + self.ctbNum = 1 + self.rowsPerTbl = 10 + self.duraion = '1h' + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration)) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, paraDict): + colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"]) + tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"]) + sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString) + tdLog.debug("%s"%(sqlString)) + tsql.execute(sqlString) + return + + def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0): + for i in range(ctbNum): + sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \ + (dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx) + tsql.execute(sqlString) + + tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + for i in range(ctbNum): + rowsBatched = 0 + sql += " %s%d values "%(ctbPrefix,i) + for j in range(rowsPerTbl): + if (i < ctbNum/2): + sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, 1, j%10, j%10, j%10, j%10, j%10, j%10) + else: + sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10) + rowsBatched += 1 + if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + rowsBatched = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s%d values " %(ctbPrefix,i) + else: + sql = "insert into " + if sql != pre_insert: + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'test', + 'dropFlag': 1, + 'vgroups': 2, + 'stbName': 'meters', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}], + 'ctbPrefix': 't', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 10000, + 'batchNum': 3000, + 'startTs': 1537146000000, + 'tsStep': 600000} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("create database") + self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion) + + tdLog.info("create stb") + self.create_stable(tsql=tdSql, paraDict=paraDict) + + tdLog.info("create child tables") + self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \ + stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\ + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"]) + self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\ + ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\ + rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\ + startTs=paraDict["startTs"],tsStep=paraDict["tsStep"]) + return + + def prepare_original_data(self): + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("flush database test", queryTimes=1) + time.sleep(2) + + def test_crash_for_state_window1(self): + tdSql.execute("drop database if exists test") + self.prepareTestEnv() + tdSql.execute("alter local 'queryPolicy' '3'") + self.prepare_original_data() + tdSql.execute("insert into t0 values(now, 4,4,4,4,4,4,4,4,4)", queryTimes=1) + tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1) + + def test_crash_for_state_window2(self): + tdSql.execute("drop database if exists test") + self.prepareTestEnv() + tdSql.execute("alter local 'queryPolicy' '3'") + self.prepare_original_data() + tdSql.execute("insert into t0 values(now, 4,NULL,4,4,4,4,4,4,4)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 4,4,4,4,4,4,4,4,4)", queryTimes=1) + tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1) + + def test_crash_for_state_window3(self): + tdSql.execute("drop database if exists test") + self.prepareTestEnv() + tdSql.execute("alter local 'queryPolicy' '3'") + self.prepare_original_data() + tdSql.execute("insert into t0 values(now, 4,NULL,4,4,4,4,4,4,4)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 4,5,4,4,4,4,4,4,4)", queryTimes=1) + tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1) + + def test_crash_for_state_window4(self): + tdSql.execute("drop database if exists test") + self.prepareTestEnv() + tdSql.execute("alter local 'queryPolicy' '3'") + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("flush database test", queryTimes=1) + time.sleep(2) + tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1) + + def test_crash_for_state_window5(self): + tdSql.execute("drop database if exists test") + self.prepareTestEnv() + tdSql.execute("alter local 'queryPolicy' '3'") + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("flush database test", queryTimes=1) + time.sleep(2) + tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1) + + def run(self): + self.test_crash_for_state_window1() + self.test_crash_for_state_window2() + self.test_crash_for_state_window3() + self.test_crash_for_state_window4() + self.test_crash_for_state_window5() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From 34e79d158cbba5c0d91384758f1b68aae888b0fa Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 1 Dec 2023 16:27:02 +0800 Subject: [PATCH 19/21] case: add stream basic test case --- tests/system-test/8-stream/stream_basic.py | 110 +++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 tests/system-test/8-stream/stream_basic.py diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py new file mode 100644 index 0000000000..7f4d1d5ee3 --- /dev/null +++ b/tests/system-test/8-stream/stream_basic.py @@ -0,0 +1,110 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * +from util.autogen import * + +import random +import time +import traceback +import os +from os import path + + +class TDTestCase: + # init + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + + # autoGen + self.autoGen = AutoGen() + + def waitTranslation(self, waitSeconds): + # wait end + for i in range(waitSeconds): + sql ="show transactions;" + rows = tdSql.query(sql) + if rows == 0: + return True + tdLog.info(f"i={i} wait for translation finish ...") + time.sleep(1) + + return False + + def getPath(self, tool="taosBenchmark"): + if (platform.system().lower() == 'windows'): + tool = tool + ".exe" + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + paths = [] + for root, dirs, files in os.walk(projPath): + if ((tool) in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + paths.append(os.path.join(root, tool)) + break + if (len(paths) == 0): + tdLog.exit("taosBenchmark not found!") + return + else: + tdLog.info("taosBenchmark found in %s" % paths[0]) + return paths[0] + + def taosBenchmark(self, param): + binPath = self.getPath() + cmd = f"{binPath} {param}" + tdLog.info(cmd) + os.system(cmd) + + # run + def run(self): + # gen data + random.seed(int(time.time())) + self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y") + # create stream + tdSql.execute("use db") + tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) + sql = "select count(*) from sta" + # loop wait max 60s to check count is ok + tdLog.info("loop wait result ...") + tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5) + + # check all data is correct + sql = "select * from sta where cnt != 20;" + tdSql.query(sql) + tdSql.checkRows(0) + + # check ts interval is correct + sql = "select * from ( select diff(_wstart) as tsdif from sta ) where tsdif != 10;" + tdSql.query(sql) + tdSql.checkRows(0) + + # stop + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file From a769ccb816de54ed2cc3ad6d798236bfeb57643e Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Fri, 1 Dec 2023 16:29:06 +0800 Subject: [PATCH 20/21] case: add stream basic test case --- tests/parallel_test/cases.task | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 92eaec52b5..b0129428b2 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -6,6 +6,7 @@ ,,y,unit-test,bash test.sh #system test +,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_session.py From f06117dbc915a02d086dbc22c46fe85e40858d6c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 1 Dec 2023 17:32:29 +0800 Subject: [PATCH 21/21] fix:[TD-27644]heartbeat closed before snode close leading to snode is hanged --- include/libs/stream/tstream.h | 3 -- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 6 ++++ source/dnode/vnode/src/inc/vnodeInt.h | 2 -- source/dnode/vnode/src/tq/tq.c | 36 ------------------- source/dnode/vnode/src/vnd/vnodeModule.c | 8 ----- source/libs/stream/inc/streamInt.h | 7 +--- source/libs/stream/src/stream.c | 44 +++++++----------------- source/libs/stream/src/streamDispatch.c | 4 +-- source/libs/stream/src/streamMeta.c | 6 ++-- source/libs/stream/src/streamStart.c | 16 ++++----- 10 files changed, 32 insertions(+), 100 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7f65ef8358..00d2bee880 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -216,9 +216,6 @@ int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem); SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); #endif -int32_t streamInit(); -void streamCleanUp(); - SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 6f13abcebc..e0503c83c6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -28,6 +28,9 @@ } \ } while (0) +extern int32_t streamTimerInit(); +extern void streamTimerCleanUp(); + static SDnode globalDnode = {0}; SDnode *dmInstance() { return &globalDnode; } @@ -166,6 +169,7 @@ int32_t dmInit() { #if defined(USE_S3) if (s3Begin() != 0) return -1; #endif + if (streamTimerInit() != 0) return -1; dInfo("dnode env is initialized"); return 0; @@ -194,6 +198,8 @@ void dmCleanup() { #if defined(USE_S3) s3End(); #endif + streamTimerCleanUp(); + dInfo("dnode env is cleaned up"); taosCleanupCfg(); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8a4cbb5fd0..7ed0b5103f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -223,8 +223,6 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); // tq -int tqInit(); -void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode); void tqNotifyClose(STQ*); void tqClose(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e9943d2abf..ee76a27414 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -17,12 +17,6 @@ #include "vnd.h" #include "tqCommon.h" -typedef struct { - int8_t inited; -} STqMgmt; - -static STqMgmt tqMgmt = {0}; - // 0: not init // 1: already inited // 2: wait to be inited or cleaup @@ -32,36 +26,6 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_ static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; } static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; } -int32_t tqInit() { - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2); - if (old != 2) break; - } - - if (old == 0) { - if (streamInit() < 0) { - return -1; - } - atomic_store_8(&tqMgmt.inited, 1); - } - - return 0; -} - -void tqCleanUp() { - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2); - if (old != 2) break; - } - - if (old == 1) { - streamCleanUp(); - atomic_store_8(&tqMgmt.inited, 0); - } -} - void tqDestroyTqHandle(void* data) { STqHandle* pData = (STqHandle*)data; qDestroyTask(pData->execHandle.task); diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 4e3cee42c6..44fcbefba7 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -39,12 +39,6 @@ int vnodeInit(int nthreads) { if (walInit() < 0) { return -1; } - if (tqInit() < 0) { - return -1; - } - if (s3Init() < 0) { - return -1; - } return 0; } @@ -58,7 +52,5 @@ void vnodeCleanup() { vnodeAsyncDestroy(&vnodeAsyncHandle[1]); walCleanUp(); - tqCleanUp(); smaCleanUp(); - s3CleanUp(); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 7b8dae8be7..0df36ec391 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -57,11 +57,6 @@ typedef struct { SSDataBlock* pBlock; } SStreamTrigger; -typedef struct SStreamGlobalEnv { - int8_t inited; - void* timer; -} SStreamGlobalEnv; - typedef struct SStreamContinueExecInfo { SEpSet epset; int32_t taskId; @@ -92,7 +87,7 @@ struct SStreamQueue { int8_t status; }; -extern SStreamGlobalEnv streamEnv; +extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; extern int32_t taskDbWrapperId; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1c874f34de..1bef42bf14 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,38 +16,18 @@ #include "streamInt.h" #include "ttimer.h" -SStreamGlobalEnv streamEnv; +void* streamTimer = NULL; -int32_t streamInit() { - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2); - if (old != 2) break; +int32_t streamTimerInit() { + streamTimer = taosTmrInit(1000, 100, 10000, "STREAM"); + if (streamTimer == NULL) { + return -1; } - - if (old == 0) { - streamEnv.timer = taosTmrInit(1000, 100, 10000, "STREAM"); - if (streamEnv.timer == NULL) { - atomic_store_8(&streamEnv.inited, 0); - return -1; - } - atomic_store_8(&streamEnv.inited, 1); - } - return 0; } -void streamCleanUp() { - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2); - if (old != 2) break; - } - - if (old == 1) { - taosTmrCleanUp(streamEnv.timer); - atomic_store_8(&streamEnv.inited, 0); - } +void streamTimerCleanUp() { + taosTmrCleanUp(streamTimer); } char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { @@ -77,7 +57,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { if (pTrigger == NULL) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); return; } @@ -88,7 +68,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); return; } @@ -97,7 +77,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); return; } @@ -105,7 +85,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { } } - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); } int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { @@ -115,7 +95,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); - pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer); + pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 6247d4ed53..1a67b08749 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -506,9 +506,9 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); if (pTask->msgInfo.pTimer != NULL) { - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pTimer); } else { - pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); + pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bd23e41a84..807f120cb7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -369,7 +369,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); metaRefMgtAdd(pMeta->vgId, pRid); - pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); + pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); pMeta->pHbInfo->tickCounter = 0; pMeta->pHbInfo->stopFlag = 0; pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); @@ -1099,7 +1099,7 @@ void metaHbToMnode(void* param, void* tmrId) { } if (!waitForEnoughDuration(pMeta->pHbInfo)) { - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); return; } @@ -1215,7 +1215,7 @@ void metaHbToMnode(void* param, void* tmrId) { _end: clearHbMsg(&hbMsg, pIdList); - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index a4c448f678..4c748d58bc 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -114,7 +114,7 @@ static void doReExecScanhistory(void* param, void* tmrId) { // release the task. streamMetaReleaseTask(pTask->pMeta, pTask); } else { - taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, + taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); } } @@ -137,9 +137,9 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref); if (pTask->schedHistoryInfo.pTimer == NULL) { - pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer); + pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); } else { - taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer); + taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); } return TSDB_CODE_SUCCESS; @@ -485,7 +485,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); - pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer); + pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer); } } @@ -726,7 +726,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { pHTaskInfo->tickCount -= 1; if (pHTaskInfo->tickCount > 0) { - taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer); streamMetaReleaseTask(pMeta, pTask); return; } @@ -754,7 +754,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); - taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer); streamMetaReleaseTask(pMeta, pTask); return; } @@ -815,7 +815,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { streamTaskInitForLaunchHTask(&pTask->hTaskInfo); if (pTask->hTaskInfo.pTimer == NULL) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer); + pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer); if (pTask->hTaskInfo.pTimer == NULL) { atomic_sub_fetch_32(&pTask->status.timerActive, 1); stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr, @@ -828,7 +828,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } else { // timer exists ASSERT(pTask->status.timerActive >= 1); stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); - taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer); + taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer); } return TSDB_CODE_SUCCESS;