diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md index 5e4eadcceb..f50551b5de 100755 --- a/docs/en/14-reference/12-config/index.md +++ b/docs/en/14-reference/12-config/index.md @@ -432,7 +432,7 @@ The charset that takes effect is UTF-8. | Applicable | Server Only | | Meaning | Maximum number of threads to commit | | Value Range | 0-1024 | -| Default Value | | +| Default Value | 4 | ## Log Parameters diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index 6fce985927..effa72099a 100755 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -430,7 +430,7 @@ charset 的有效值是 UTF-8。 | 适用范围 | 仅服务端适用 | | 含义 | 设置写入线程的最大数量 | | 取值范围 | 0-1024 | -| 缺省值 | | +| 缺省值 | 4 | ## 日志相关 diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 90ee6f7cc0..e7035fe297 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -86,6 +86,7 @@ extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfSnodeStreamThreads; extern int32_t tsNumOfSnodeWriteThreads; extern int64_t tsRpcQueueMemoryAllowed; +extern int32_t tsRetentionSpeedLimitMB; // sync raft extern int32_t tsElectInterval; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index ac4811fb1b..8e8c9d9a85 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1362,6 +1362,8 @@ void blockDataEmpty(SSDataBlock* pDataBlock) { return; } + taosMemoryFreeClear(pDataBlock->pBlockAgg); + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 0cdb1eb784..f76ea35a23 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -74,6 +74,7 @@ int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; int32_t tsMaxStreamBackendCache = 128; // M int32_t tsPQSortMemThreshold = 16; // M +int32_t tsRetentionSpeedLimitMB = 0; // unlimited // sync raft int32_t tsElectInterval = 25 * 1000; @@ -668,6 +669,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -1118,6 +1120,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; + tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 0bdf5c2640..b6fc24a910 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -103,8 +103,8 @@ typedef enum { TRN_CONFLICT_GLOBAL = 1, TRN_CONFLICT_DB = 2, TRN_CONFLICT_DB_INSIDE = 3, - TRN_CONFLICT_TOPIC = 4, - TRN_CONFLICT_TOPIC_INSIDE = 5, +// TRN_CONFLICT_TOPIC = 4, +// TRN_CONFLICT_TOPIC_INSIDE = 5, TRN_CONFLICT_ARBGROUP = 6, } ETrnConflct; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 3eef2afcc1..9a7a8155ec 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -91,7 +91,7 @@ void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SR } } -static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, +static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) { SMqTopicObj *pTopic = NULL; int32_t code = 0; @@ -135,11 +135,6 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * } } - mndTransSetDbName(pTrans, pOneTopic, NULL); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - code = -1; - goto FAILED; - } mndReleaseTopic(pMnode, pTopic); } @@ -177,12 +172,12 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { goto END; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); if (pTrans == NULL) { code = -1; goto END; } - code = validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); + code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); if (code != 0) { goto END; } @@ -675,13 +670,13 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _over; } - code = validateTopics(pTrans, subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay); + code = validateTopics(subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay); if (code != TSDB_CODE_SUCCESS) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 0068b582cf..ffb723756c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -618,13 +618,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu char cgroup[TSDB_CGROUP_LEN] = {0}; mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto END; } - mndTransSetDbName(pTrans, topic, cgroup); + mndTransSetDbName(pTrans, pOutput->pSub->dbName, cgroup); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto END; @@ -908,33 +908,37 @@ END: } static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){ - // iter all vnode to delete handle - int32_t sz = taosArrayGetSize(pSub->unassignedVgs); - for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); + void* pIter = NULL; + SVgObj* pVgObj = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj); + if (pIter == NULL) { + break; + } + + if (!mndVgroupInDb(pVgObj, pSub->dbUid)) { + sdbRelease(pMnode->pSdb, pVgObj); + continue; + } SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); if(pReq == NULL){ terrno = TSDB_CODE_OUT_OF_MEMORY; + sdbRelease(pMnode->pSdb, pVgObj); return -1; } - pReq->head.vgId = htonl(pVgEp->vgId); - pReq->vgId = pVgEp->vgId; + pReq->head.vgId = htonl(pVgObj->vgId); + pReq->vgId = pVgObj->vgId; pReq->consumerId = -1; memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId); - if (pVgObj == NULL) { - taosMemoryFree(pReq); - terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; - return -1; - } STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj);; action.pCont = pReq; action.contLen = sizeof(SMqVDeleteReq); action.msgType = TDMT_VND_TMQ_DELETE_SUB; + action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST; - mndReleaseVgroup(pMnode, pVgObj); + sdbRelease(pMnode->pSdb, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -996,7 +1000,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup"); if (pTrans == NULL) { mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); code = -1; @@ -1004,7 +1008,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { } mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); - mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup); + mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto end; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 8a06b4a613..bcb38a3902 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -422,14 +422,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * SQueryPlan *pPlan = NULL; SMqTopicObj topicObj = {0}; - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic"); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); code = -1; goto _OUT; } - mndTransSetDbName(pTrans, pCreate->name, NULL); + mndTransSetDbName(pTrans, pDb->name, NULL); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto _OUT; @@ -779,14 +779,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-topic"); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); code = -1; goto end; } - mndTransSetDbName(pTrans, pTopic->name, NULL); + mndTransSetDbName(pTrans, pTopic->db, NULL); code = mndTransCheckConflict(pMnode, pTrans); if (code != 0) { goto end; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 05f685ecb4..2176cf4e39 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -836,26 +836,26 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { } } - if (pNew->conflict == TRN_CONFLICT_TOPIC) { - if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { - if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; - } - } - if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) { - if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; - if (pTrans->conflict == TRN_CONFLICT_TOPIC) { - if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; - } - if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { - if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) - conflict = true; - } - } +// if (pNew->conflict == TRN_CONFLICT_TOPIC) { +// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; +// if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { +// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; +// } +// } +// if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) { +// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; +// if (pTrans->conflict == TRN_CONFLICT_TOPIC) { +// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; +// } +// if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { +// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) +// conflict = true; +// } +// } if (pNew->conflict == TRN_CONFLICT_ARBGROUP) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) { - void *pIter = taosHashIterate(pNew->arbGroupIds, NULL); + pIter = taosHashIterate(pNew->arbGroupIds, NULL); while (pIter != NULL) { int32_t groupId = *(int32_t *)pIter; if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a2ca3662d7..30710bef27 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -50,6 +50,9 @@ void tqDestroyTqHandle(void* data) { if (pData->block != NULL) { blockDataDestroy(pData->block); } + if (pData->pRef) { + walCloseRef(pData->pRef->pWal, pData->pRef->refId); + } } static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) { @@ -571,9 +574,6 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosMsleep(10); continue; } - if (pHandle->pRef) { - walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); - } tqUnregisterPushHandle(pTq, pHandle); @@ -658,12 +658,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosRLockLatch(&pTq->lock); ret = tqMetaGetHandle(pTq, req.subKey); taosRUnLockLatch(&pTq->lock); - if (ret < 0) { break; } } - if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64, @@ -708,7 +706,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } } -end: + end: tDecoderClear(&dc); return ret; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 404cbf26dd..02184f1d50 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -352,7 +352,6 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); handle->consumerId = req->newConsumerId; - handle->epoch = -1; handle->execHandle.subType = req->subType; handle->fetchMeta = req->withMeta; @@ -371,7 +370,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ if(buildHandle(pTq, handle) < 0){ return -1; } - tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId); + tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 71e6771370..7375478e61 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -87,7 +87,7 @@ int tqUnregisterPushHandle(STQ* pTq, void *handle) { int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); - if(pHandle->msg != NULL) { + if(ret == 0 && pHandle->msg != NULL) { // tqPushDataRsp(pHandle, vgId); tqPushEmptyDataRsp(pHandle, vgId); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 3d53d1ada3..0d3994d78e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -38,6 +38,34 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) { return TARRAY2_APPEND(&rtner->fopArr, op); } +static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) { + int64_t total = 0; + int64_t interval = 1000; // 1s + int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX; + int64_t offset = 0; + int64_t remain = size; + + while (remain > 0) { + int64_t n; + int64_t last = taosGetTimestampMs(); + if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) { + return -1; + } + + total += n; + remain -= n; + + if (remain > 0) { + int64_t elapsed = taosGetTimestampMs() - last; + if (elapsed < interval) { + taosMsleep(interval - elapsed); + } + } + } + + return total; +} + static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) { int32_t code = 0; int32_t lino = 0; @@ -98,7 +126,8 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile if (fdTo == NULL) code = terrno; TSDB_CHECK_CODE(code, lino, _exit); - int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtner->szPage)); + int64_t n = tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage), + tsRetentionSpeedLimitMB); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index d06beebd6b..9ca681779d 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2178,9 +2178,25 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* return code; } if (group == NULL || groupByTbname) { - for (int32_t i = 0; i < numOfTables; i++) { - STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - info->groupId = groupByTbname ? info->uid : 0; + if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { + pTableListInfo->remainGroups = + taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (pTableListInfo->remainGroups == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int i = 0; i < numOfTables; i++) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + info->groupId = info->uid; + + taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), + sizeof(info->uid)); + } + } else { + for (int32_t i = 0; i < numOfTables; i++) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + info->groupId = groupByTbname ? info->uid : 0; + } } pTableListInfo->oneTableForEachGroup = groupByTbname; @@ -2193,8 +2209,6 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pTableListInfo->numOfOuputGroups = numOfTables; } else if (groupByTbname && pScanNode->groupOrderScan) { pTableListInfo->numOfOuputGroups = numOfTables; - } else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { - pTableListInfo->numOfOuputGroups = numOfTables; } else { pTableListInfo->numOfOuputGroups = 1; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index eef8b06ac5..ec40bceb5e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -725,7 +725,7 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) { if (pInfo->countState == TABLE_COUNT_STATE_END) { return; } - if (pInfo->base.pTableListInfo->oneTableForEachGroup || pInfo->base.pTableListInfo->groupOffset) { + if (pInfo->base.pTableListInfo->groupOffset) { pInfo->countState = TABLE_COUNT_STATE_PROCESSED; } else { taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId)); @@ -890,7 +890,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) { STableListInfo* pTableListInfo = pTableScanInfo->base.pTableListInfo; - if (pTableListInfo->oneTableForEachGroup || pTableListInfo->groupOffset) { // group by tbname, group by tag + sort + if (pTableListInfo->groupOffset) { // group by tbname, group by tag + sort if (pTableScanInfo->countState < TABLE_COUNT_STATE_PROCESSED) { pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED; STableKeyInfo* pStart = diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index 694227cea7..be99d01a5c 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -237,21 +237,19 @@ class ClusterComCheck: last_number=vgroup_numbers-1 while count < count_number: time.sleep(1) + count+=1 + print("check vgroup count :", count) tdSql.query(f"show {db_name}.vgroups;") - if count == 0 : - if tdSql.checkRows(vgroup_numbers) : - tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" ) - else: - tdLog.exit(f"vgroup number of {db_name} is not correct") + if tdSql.getRows() != vgroup_numbers : + continue if self.db_replica == 1 : if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader': tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';") print("db replica :",tdSql.queryResult[0][0]) if tdSql.queryResult[0][0] == db_replica: - ready_time= (count + 1) - tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count + 1} s") + tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count} s") return True - count+=1 + elif self.db_replica == 3 : vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]] @@ -261,10 +259,8 @@ class ClusterComCheck: tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';") print("db replica :",tdSql.queryResult[0][0]) if tdSql.queryResult[0][0] == db_replica: - ready_time= (count + 1) - tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {ready_time} s") + tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {count} s") return True - count+=1 else: tdLog.debug(tdSql.queryResult) tdLog.notice(f"elections of {db_name} all vgroups with replica {self.db_replica} are failed in {count} s ") diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index 37e3a17100..185d1b01cf 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -219,7 +219,7 @@ class TDTestCase: expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] topicList = topicName1 ifcheckdata = 0 - ifManualCommit = 1 + ifManualCommit = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ diff --git a/tests/system-test/7-tmq/tmqDropStbCtb.py b/tests/system-test/7-tmq/tmqDropStbCtb.py index eacacd913b..a1929237b7 100644 --- a/tests/system-test/7-tmq/tmqDropStbCtb.py +++ b/tests/system-test/7-tmq/tmqDropStbCtb.py @@ -157,7 +157,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) if self.snapshot == 0: - if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)): + if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") @@ -249,7 +249,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) if self.snapshot == 0: - if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)): + if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py index f01bf2558c..a5e61adc8d 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select-false.py @@ -200,12 +200,11 @@ class TDTestCase: # tmqCom.checkFileContent(consumerId, queryString) - time.sleep(2) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) - if deleteWal == True: - clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) + tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self): diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py index 5e11de04cb..eb35ebc718 100644 --- a/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py +++ b/tests/system-test/7-tmq/tmqVnodeSplit-stb-select.py @@ -199,13 +199,11 @@ class TDTestCase: tdLog.exit("%d tmq consume rows error!"%consumerId) # tmqCom.checkFileContent(consumerId, queryString) + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) - time.sleep(2) + time.sleep(3) for i in range(len(topicNameList)): tdSql.query("drop topic %s"%topicNameList[i]) - - if deleteWal == True: - clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) tdLog.printNoPrefix("======== test case 1 end ...... ") def run(self):