diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 55bd5466ed..f4dcdb242e 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -314,7 +314,7 @@ def pre_test_build_win() { cd %WIN_CONNECTOR_ROOT% python.exe -m pip install --upgrade pip python -m pip uninstall taospy -y - python -m pip install taospy==2.7.6 + python -m pip install taospy==2.7.10 xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32 ''' return 1 diff --git a/docs/en/12-taos-sql/02-database.md b/docs/en/12-taos-sql/02-database.md index af619c11a5..68dba3fc56 100644 --- a/docs/en/12-taos-sql/02-database.md +++ b/docs/en/12-taos-sql/02-database.md @@ -43,7 +43,7 @@ database_option: { ## Parameters -- BUFFER: specifies the size (in MB) of the write buffer for each vnode. Enter a value between 3 and 16384. The default value is 96. +- BUFFER: specifies the size (in MB) of the write buffer for each vnode. Enter a value between 3 and 16384. The default value is 256. - CACHEMODEL: specifies how the latest data in subtables is stored in the cache. The default value is none. - none: The latest data is not cached. - last_row: The last row of each subtable is cached. This option significantly improves the performance of the LAST_ROW function. diff --git a/docs/examples/python/tmq_assignment_example.py b/docs/examples/python/tmq_assignment_example.py index a07347a9b9..41737e3fc4 100644 --- a/docs/examples/python/tmq_assignment_example.py +++ b/docs/examples/python/tmq_assignment_example.py @@ -55,4 +55,4 @@ def taos_get_assignment_and_seek_demo(): if __name__ == '__main__': - taosws_get_assignment_and_seek_demo() + taos_get_assignment_and_seek_demo() diff --git a/docs/zh/12-taos-sql/02-database.md b/docs/zh/12-taos-sql/02-database.md index b329413aa8..d79a985089 100644 --- a/docs/zh/12-taos-sql/02-database.md +++ b/docs/zh/12-taos-sql/02-database.md @@ -42,7 +42,7 @@ database_option: { ### 参数说明 -- BUFFER: 一个 VNODE 写入内存池大小,单位为 MB,默认为 96,最小为 3,最大为 16384。 +- BUFFER: 一个 VNODE 写入内存池大小,单位为 MB,默认为 256,最小为 3,最大为 16384。 - CACHEMODEL:表示是否在内存中缓存子表的最近数据。默认为 none。 - none:表示不缓存。 - last_row:表示缓存子表最近一行数据。这将显著改善 LAST_ROW 函数的性能表现。 diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index db54c6e196..aab8f00adc 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -636,6 +636,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; + taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); @@ -646,6 +647,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm pTopicName, numOfTopics); taosMemoryFree(pParamSet); pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + taosRUnLockLatch(&tmq->lock); return; } @@ -663,6 +665,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm vgId, numOfVgroups, pTopicName); taosMemoryFree(pParamSet); pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam); + taosRUnLockLatch(&tmq->lock); return; } @@ -679,6 +682,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm taosMemoryFree(pParamSet); pCommitFp(tmq, code, userParam); } + taosRUnLockLatch(&tmq->lock); } static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { @@ -696,6 +700,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us // init as 1 to prevent concurrency issue pParamSet->waitingRspNum = 1; + taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); @@ -725,6 +730,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us } } } + taosRUnLockLatch(&tmq->lock); tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics); @@ -799,6 +805,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SMqHbReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; + taosRLockLatch(&tmq->lock); // if(tmq->needReportOffsetRows){ req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ @@ -820,6 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) { } // tmq->needReportOffsetRows = false; // } + taosRUnLockLatch(&tmq->lock); int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); if (tlen < 0) { @@ -986,10 +994,12 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { if (*topics == NULL) { *topics = tmq_list_new(); } + taosRLockLatch(&tmq->lock); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); tmq_list_append(*topics, strchr(topic->topicName, '.') + 1); } + taosRUnLockLatch(&tmq->lock); return 0; } @@ -1527,12 +1537,7 @@ static void freeClientVgInfo(void* param) { static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { bool set = false; - int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics); - - char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; - tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", - tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); if (epoch <= tmq->epoch) { return false; } @@ -1548,6 +1553,12 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) return false; } + taosWLockLatch(&tmq->lock); + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + + char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", + tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); // todo extract method for (int32_t i = 0; i < topicNumCur; i++) { // find old topic @@ -1579,7 +1590,6 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) taosHashCleanup(pVgOffsetHashMap); - taosWLockLatch(&tmq->lock); // destroy current buffered existed topics info if (tmq->clientTopics) { taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo); @@ -1807,6 +1817,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){ return 0; } + int32_t code = 0; + + taosWLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); @@ -1816,7 +1829,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms + if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; @@ -1831,15 +1844,17 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } atomic_store_32(&pVg->vgSkipCnt, 0); - int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout); + code = doTmqPollImpl(tmq, pTopic, pVg, timeout); if (code != TSDB_CODE_SUCCESS) { - return code; + goto end; } } } - tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId); - return 0; +end: + taosWUnLockLatch(&tmq->lock); + tscDebug("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code); + return code; } static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) { @@ -1891,12 +1906,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; if (pDataRsp->head.epoch == consumerEpoch) { + taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){ tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + taosWUnLockLatch(&tmq->lock); return NULL; } // update the epset @@ -1944,8 +1961,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); + taosWUnLockLatch(&tmq->lock); return pRsp; } + taosWUnLockLatch(&tmq->lock); } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); @@ -1960,12 +1979,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { + taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){ tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + taosWUnLockLatch(&tmq->lock); return NULL; } @@ -1977,6 +1998,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); + taosWUnLockLatch(&tmq->lock); return pRsp; } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", @@ -1989,12 +2011,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { + taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){ tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + taosWUnLockLatch(&tmq->lock); return NULL; } @@ -2017,32 +2041,31 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->emptyBlockReceiveTs = taosGetTimestampMs(); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); - continue; } else { pVg->emptyBlockReceiveTs = 0; // reset the ts + // build rsp + void* pRsp = NULL; + int64_t numOfRows = 0; + if (pollRspWrapper->taosxRsp.createTableNum == 0) { + pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); + } else { + pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); + } + + tmq->totalRows += numOfRows; + + char buf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); + tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 + ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, + tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, + tmq->totalRows, pollRspWrapper->reqId); + + taosFreeQitem(pollRspWrapper); + taosWUnLockLatch(&tmq->lock); + return pRsp; } - - // build rsp - void* pRsp = NULL; - int64_t numOfRows = 0; - if (pollRspWrapper->taosxRsp.createTableNum == 0) { - pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); - } else { - pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); - } - - tmq->totalRows += numOfRows; - - char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); - tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 - ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, - tmq->totalRows, pollRspWrapper->reqId); - - taosFreeQitem(pollRspWrapper); - return pRsp; - + taosWUnLockLatch(&tmq->lock); } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); @@ -2121,7 +2144,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } -static void displayConsumeStatistics(const tmq_t* pTmq) { +static void displayConsumeStatistics(tmq_t* pTmq) { + taosRLockLatch(&pTmq->lock); int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch); @@ -2137,7 +2161,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) { tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); } } - + taosRUnLockLatch(&pTmq->lock); tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); } @@ -2544,14 +2568,18 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int32_t* numOfAssignment) { *numOfAssignment = 0; *assignment = NULL; + SMqVgCommon* pCommon = NULL; int32_t accId = tmq->pTscObj->acctId; char tname[128] = {0}; sprintf(tname, "%d.%s", accId, pTopicName); + int32_t code = TSDB_CODE_SUCCESS; + taosWLockLatch(&tmq->lock); SMqClientTopic* pTopic = getTopicByName(tmq, tname); if (pTopic == NULL) { - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; + goto end; } // in case of snapshot is opened, no valid offset will return @@ -2561,7 +2589,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a if (*assignment == NULL) { tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId, (*numOfAssignment) * sizeof(tmq_topic_assignment)); - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; } bool needFetch = false; @@ -2586,10 +2615,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } if (needFetch) { - SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon)); + pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon)); if (pCommon == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + code = terrno; + goto end; } pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment)); @@ -2604,8 +2634,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam)); if (pParam == NULL) { - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } pParam->epoch = tmq->epoch; @@ -2619,30 +2649,30 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); if (msgSize < 0) { taosMemoryFree(pParam); - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } char* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { taosMemoryFree(pParam); - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { taosMemoryFree(msg); taosMemoryFree(pParam); - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { taosMemoryFree(pParam); taosMemoryFree(msg); - destroyCommonInfo(pCommon); - return terrno; + code = terrno; + goto end; } sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; @@ -2662,20 +2692,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } tsem_wait(&pCommon->rsp); - int32_t code = pCommon->code; + code = pCommon->code; terrno = code; if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(*assignment); - *assignment = NULL; - *numOfAssignment = 0; - } else { - int32_t num = taosArrayGetSize(pCommon->pList); - for(int32_t i = 0; i < num; ++i) { - (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); - } - *numOfAssignment = num; + goto end; } + int32_t num = taosArrayGetSize(pCommon->pList); + for(int32_t i = 0; i < num; ++i) { + (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); + } + *numOfAssignment = num; for (int32_t j = 0; j < (*numOfAssignment); ++j) { tmq_topic_assignment* p = &(*assignment)[j]; @@ -2701,12 +2728,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pOffsetInfo->committedOffset.version = p->currentOffset; } } - - destroyCommonInfo(pCommon); - return code; - } else { - return TSDB_CODE_SUCCESS; } + +end: + if(code != TSDB_CODE_SUCCESS){ + taosMemoryFree(*assignment); + *assignment = NULL; + *numOfAssignment = 0; + } + destroyCommonInfo(pCommon); + taosWUnLockLatch(&tmq->lock); + return code; } void tmq_free_assignment(tmq_topic_assignment* pAssignment) { @@ -2727,9 +2759,11 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ char tname[128] = {0}; sprintf(tname, "%d.%s", accId, pTopicName); + taosWLockLatch(&tmq->lock); SMqClientTopic* pTopic = getTopicByName(tmq, tname); if (pTopic == NULL) { tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + taosWUnLockLatch(&tmq->lock); return TSDB_CODE_INVALID_PARA; } @@ -2745,6 +2779,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ if (pVg == NULL) { tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); + taosWUnLockLatch(&tmq->lock); return TSDB_CODE_INVALID_PARA; } @@ -2753,12 +2788,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ int32_t type = pOffsetInfo->currentOffset.type; if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); + taosWUnLockLatch(&tmq->lock); return TSDB_CODE_INVALID_PARA; } if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); + taosWUnLockLatch(&tmq->lock); return TSDB_CODE_INVALID_PARA; } @@ -2773,6 +2810,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); + taosWUnLockLatch(&tmq->lock); SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); if (pInfo == NULL) { diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 01409a3880..9647b5eaba 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -818,6 +818,9 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stableQuery, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->isSubQuery, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->subPlanNum, false); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 77453fd894..69f2273bb0 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -360,7 +360,12 @@ static int32_t vnodeCommitTask(void *arg) { // commit code = vnodeCommitImpl(pInfo); - if (code) goto _exit; + if (code) { + vFatal("vgId:%d, failed to commit vnode since %s", TD_VID(pVnode), terrstr()); + taosMsleep(100); + exit(EXIT_FAILURE); + goto _exit; + } vnodeReturnBufPool(pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeRetention.c b/source/dnode/vnode/src/vnd/vnodeRetention.c index 170deb4286..61623f0a69 100644 --- a/source/dnode/vnode/src/vnd/vnodeRetention.c +++ b/source/dnode/vnode/src/vnd/vnodeRetention.c @@ -121,7 +121,7 @@ int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now) { _exit: if (code) { - vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code)); + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); if (pInfo) taosMemoryFree(pInfo); } else { vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 85fcde2a52..88bd540b85 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -581,7 +581,9 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && !syncIsReadyForRead(pVnode->sync)) { + if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME || + pMsg->msgType == TDMT_VND_TMQ_CONSUME_PUSH) && + !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } @@ -637,8 +639,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); -// case TDMT_VND_TMQ_CONSUME: -// return tqProcessPollReq(pVnode->pTq, pMsg); + // case TDMT_VND_TMQ_CONSUME: + // return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_VG_WALINFO: return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: @@ -1370,7 +1372,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in } if (info.suid) { - metaGetInfo(pVnode->pMeta, info.suid, &info, NULL); + code = metaGetInfo(pVnode->pMeta, info.suid, &info, NULL); + ASSERT(code == 0); } if (pSubmitTbData->sver != info.skmVer) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1c5bd6d59c..b996c63031 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2702,13 +2702,12 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv, } static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, - int32_t order, int64_t ts) { - int32_t factor = (order == TSDB_ORDER_ASC) ? 1 : -1; + int64_t ts) { pDiffInfo->prevTs = ts; switch (type) { case TSDB_DATA_TYPE_INT: { int32_t v = *(int32_t*)pv; - int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null + int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null if (delta < 0 && pDiffInfo->ignoreNegative) { colDataSetNull_f_s(pOutput, pos); } else { @@ -2721,7 +2720,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: { int8_t v = *(int8_t*)pv; - int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null + int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null if (delta < 0 && pDiffInfo->ignoreNegative) { colDataSetNull_f_s(pOutput, pos); } else { @@ -2732,7 +2731,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, } case TSDB_DATA_TYPE_SMALLINT: { int16_t v = *(int16_t*)pv; - int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null + int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null if (delta < 0 && pDiffInfo->ignoreNegative) { colDataSetNull_f_s(pOutput, pos); } else { @@ -2744,7 +2743,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: { int64_t v = *(int64_t*)pv; - int64_t delta = factor * (v - pDiffInfo->prev.i64); // direct previous may be null + int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null if (delta < 0 && pDiffInfo->ignoreNegative) { colDataSetNull_f_s(pOutput, pos); } else { @@ -2755,7 +2754,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, } case TSDB_DATA_TYPE_FLOAT: { float v = *(float*)pv; - double delta = factor * (v - pDiffInfo->prev.d64); // direct previous may be null + double delta = v - pDiffInfo->prev.d64; // direct previous may be null if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow colDataSetNull_f_s(pOutput, pos); } else { @@ -2766,7 +2765,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, } case TSDB_DATA_TYPE_DOUBLE: { double v = *(double*)pv; - double delta = factor * (v - pDiffInfo->prev.d64); // direct previous may be null + double delta = v - pDiffInfo->prev.d64; // direct previous may be null if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow colDataSetNull_f_s(pOutput, pos); } else { @@ -2797,82 +2796,42 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - if (pCtx->order == TSDB_ORDER_ASC) { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - int32_t pos = startOffset + numOfElems; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t pos = startOffset + numOfElems; - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { - if (pDiffInfo->includeNull) { - colDataSetNull_f_s(pOutput, pos); + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (pDiffInfo->includeNull) { + colDataSetNull_f_s(pOutput, pos); - numOfElems += 1; - } - continue; + numOfElems += 1; } - - char* pv = colDataGetData(pInputCol, i); - - if (pDiffInfo->hasPrev) { - if (tsList[i] == pDiffInfo->prevTs) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - int32_t code = doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order, tsList[i]); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pos); - } - - numOfElems++; - } else { - int32_t code = doSetPrevVal(pDiffInfo, pInputCol->info.type, pv, tsList[i]); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - pDiffInfo->hasPrev = true; + continue; } - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - int32_t pos = startOffset + numOfElems; - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { - if (pDiffInfo->includeNull) { - colDataSetNull_f_s(pOutput, pos); - numOfElems += 1; - } - continue; + char* pv = colDataGetData(pInputCol, i); + + if (pDiffInfo->hasPrev) { + if (tsList[i] == pDiffInfo->prevTs) { + return TSDB_CODE_FUNC_DUP_TIMESTAMP; + } + int32_t code = doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, tsList[i]); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + // handle selectivity + if (pCtx->subsidiaries.num > 0) { + appendSelectivityValue(pCtx, i, pos); } - char* pv = colDataGetData(pInputCol, i); - - // there is a row of previous data block to be handled in the first place. - if (pDiffInfo->hasPrev) { - if (tsList[i] == pDiffInfo->prevTs) { - return TSDB_CODE_FUNC_DUP_TIMESTAMP; - } - int32_t code = doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order, tsList[i]); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - // handle selectivity - if (pCtx->subsidiaries.num > 0) { - appendSelectivityValue(pCtx, i, pos); - } - - numOfElems++; - } else { - int32_t code = doSetPrevVal(pDiffInfo, pInputCol->info.type, pv, tsList[i]); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + numOfElems++; + } else { + int32_t code = doSetPrevVal(pDiffInfo, pInputCol->info.type, pv, tsList[i]); + if (code != TSDB_CODE_SUCCESS) { + return code; } - - pDiffInfo->hasPrev = true; } + + pDiffInfo->hasPrev = true; } pResInfo->numOfRes = numOfElems; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index a12f8051ba..3d457c9b5f 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -602,18 +602,18 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { // ftruncate idx file if (offset < fileSize) { if (taosFtruncateFile(pIdxFile, offset) < 0) { - wError("vgId:%d, failed to ftruncate file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, - strerror(errno), offset, fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, failed to ftruncate file since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(), + offset, fnameStr); goto _err; } } // rebuild idx file if (taosLSeekFile(pIdxFile, 0, SEEK_END) < 0) { - wError("vgId:%d, failed to seek file due to %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, strerror(errno), - offset, fnameStr); terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, failed to seek file since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(), offset, + fnameStr); goto _err; } @@ -625,11 +625,12 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { idxEntry.offset += sizeof(SWalCkHead) + ckHead.head.bodyLen; if (walReadLogHead(pLogFile, idxEntry.offset, &ckHead) < 0) { - wError("vgId:%d, failed to read wal log head since %s. offset:%" PRId64 ", file:%s", pWal->cfg.vgId, terrstr(), - idxEntry.offset, fLogNameStr); + wError("vgId:%d, failed to read wal log head since %s. index:%" PRId64 ", offset:%" PRId64 ", file:%s", + pWal->cfg.vgId, terrstr(), idxEntry.ver, idxEntry.offset, fLogNameStr); goto _err; } if (taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, failed to append file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr); goto _err; } @@ -637,6 +638,7 @@ int walCheckAndRepairIdxFile(SWal* pWal, int32_t fileIdx) { } if (taosFsyncFile(pIdxFile) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, faild to fsync file since %s. file:%s", pWal->cfg.vgId, terrstr(), fnameStr); goto _err; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 9b7b3dfd50..ef97bff896 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -473,7 +473,10 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { // check alignment of idx entries int64_t endOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END); if (endOffset < 0) { - wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver); + wFatal("vgId:%d, failed to seek end of WAL idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), + ver); + taosMsleep(100); + exit(EXIT_FAILURE); } return 0; } @@ -533,16 +536,20 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy END: // recover in a reverse order if (taosFtruncateFile(pWal->pLogFile, offset) < 0) { - wFatal("vgId:%d, failed to ftruncate logfile to offset:%" PRId64 " during recovery due to %s", pWal->cfg.vgId, - offset, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); + wFatal("vgId:%d, failed to recover WAL logfile from write error since %s, offset:%" PRId64, pWal->cfg.vgId, + terrstr(), offset); + taosMsleep(100); + exit(EXIT_FAILURE); } int64_t idxOffset = (index - pFileInfo->firstVer) * sizeof(SWalIdxEntry); if (taosFtruncateFile(pWal->pIdxFile, idxOffset) < 0) { - wFatal("vgId:%d, failed to ftruncate idxfile to offset:%" PRId64 "during recovery due to %s", pWal->cfg.vgId, - idxOffset, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); + wFatal("vgId:%d, failed to recover WAL idxfile from write error since %s, offset:%" PRId64, pWal->cfg.vgId, + terrstr(), idxOffset); + taosMsleep(100); + exit(EXIT_FAILURE); } return -1; } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 70588887a0..c07bafa1ea 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -486,24 +486,11 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) { static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) { if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceAvailable()) { taosUpdateLogNums(level); -#if 0 - // DEBUG_FATAL and DEBUG_ERROR are duplicated - // fsync will cause thread blocking and may also generate log misalignment in case of asyncLog - if (tsAsyncLog && level != DEBUG_FATAL) { - taosPushLogBuffer(tsLogObj.logHandle, buffer, len); - } else { - taosWriteFile(tsLogObj.logHandle->pFile, buffer, len); - if (level == DEBUG_FATAL) { - taosFsyncFile(tsLogObj.logHandle->pFile); - } - } -#else if (tsAsyncLog) { taosPushLogBuffer(tsLogObj.logHandle, buffer, len); } else { taosWriteFile(tsLogObj.logHandle->pFile, buffer, len); } -#endif if (tsLogObj.maxLines > 0) { atomic_add_fetch_32(&tsLogObj.lines, 1); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index c151dd0e0a..90e0557997 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -36,6 +36,10 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py +,,n,system-test,python3 ./test.py -f 7-tmq/tmqDropConsumer.py + ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3 diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh index 2d736e1414..206f99ff3d 100755 --- a/tests/parallel_test/run_case.sh +++ b/tests/parallel_test/run_case.sh @@ -76,10 +76,10 @@ ulimit -c unlimited md5sum /usr/lib/libtaos.so.1 md5sum /home/TDinternal/debug/build/lib/libtaos.so -#define taospy 2.7.6 +#define taospy 2.7.10 pip3 list|grep taospy pip3 uninstall taospy -y -pip3 install --default-timeout=120 taospy==2.7.6 +pip3 install --default-timeout=120 taospy==2.7.10 $TIMEOUT_CMD $cmd RET=$? diff --git a/tests/system-test/2-query/diff.py b/tests/system-test/2-query/diff.py index cdea8964b4..c6f233eefa 100644 --- a/tests/system-test/2-query/diff.py +++ b/tests/system-test/2-query/diff.py @@ -23,7 +23,7 @@ class TDTestCase: tdSql.execute( f"create table {dbname}.ntb(ts timestamp,c1 int,c2 double,c3 float)") tdSql.execute( - f"insert into {dbname}.ntb values(now,1,1.0,10.5)(now+1s,10,-100.0,5.1)(now+10s,-1,15.1,5.0)") + f"insert into {dbname}.ntb values('2023-01-01 00:00:01',1,1.0,10.5)('2023-01-01 00:00:02',10,-100.0,5.1)('2023-01-01 00:00:03',-1,15.1,5.0)") tdSql.query(f"select diff(c1,0) from {dbname}.ntb") tdSql.checkRows(2) @@ -233,6 +233,40 @@ class TDTestCase: tdSql.checkRows(19) tdSql.checkData(0,0,None) + # TD-25098 + + tdSql.query(f"select ts, diff(c1) from {dbname}.ntb order by ts") + tdSql.checkRows(2) + tdSql.checkData(0, 0, '2023-01-01 00:00:02.000') + tdSql.checkData(1, 0, '2023-01-01 00:00:03.000') + + tdSql.checkData(0, 1, 9) + tdSql.checkData(1, 1, -11) + + tdSql.query(f"select ts, diff(c1) from {dbname}.ntb order by ts desc") + tdSql.checkRows(2) + tdSql.checkData(0, 0, '2023-01-01 00:00:03.000') + tdSql.checkData(1, 0, '2023-01-01 00:00:02.000') + + tdSql.checkData(0, 1, -11) + tdSql.checkData(1, 1, 9) + + tdSql.query(f"select ts, diff(c1) from (select * from {dbname}.ntb order by ts)") + tdSql.checkRows(2) + tdSql.checkData(0, 0, '2023-01-01 00:00:02.000') + tdSql.checkData(1, 0, '2023-01-01 00:00:03.000') + + tdSql.checkData(0, 1, 9) + tdSql.checkData(1, 1, -11) + + tdSql.query(f"select ts, diff(c1) from (select * from {dbname}.ntb order by ts desc)") + tdSql.checkRows(2) + tdSql.checkData(0, 0, '2023-01-01 00:00:02.000') + tdSql.checkData(1, 0, '2023-01-01 00:00:01.000') + + tdSql.checkData(0, 1, 11) + tdSql.checkData(1, 1, -9) + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 6b633fa193..3ea8273e7f 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -37,6 +37,9 @@ from util.common import * # INSERT_DATA = 3 class TMQCom: + def __init__(self): + self.g_end_insert_flag = 0 + def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdSql.init(conn.cursor()) @@ -330,8 +333,11 @@ class TMQCom: ctbDict[i] = 0 #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) - rowsOfCtb = 0 + rowsOfCtb = 0 while rowsOfCtb < rowsPerTbl: + if (0 != self.g_end_insert_flag): + tdLog.debug("get signal to stop insert data") + break for i in range(ctbNum): sql += " %s.%s%d values "%(dbName,ctbPrefix,i+ctbStartIdx) rowsBatched = 0 @@ -571,6 +577,20 @@ class TMQCom: tdLog.info(tsql.queryResult) tdLog.info("wait subscriptions exit for %d s"%wait_cnt) + def killProcesser(self, processerName): + killCmd = ( + "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1" + % processerName + ) + + psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % processerName + processID = subprocess.check_output(psCmd, shell=True) + + while processID: + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output(psCmd, shell=True) + def close(self): self.cursor.close() diff --git a/tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py b/tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py new file mode 100644 index 0000000000..3dabca4cd1 --- /dev/null +++ b/tests/system-test/7-tmq/tmqConsumeDiscontinuousData.py @@ -0,0 +1,248 @@ + +import sys +import time +import datetime +import threading +from taos.tmq import Consumer +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135} + + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 100 + self.tmqMaxTopicNum = 1 + self.tmqMaxGroups = 1 + self.walRetentionPeriod = 3 + self.actConsumeTotalRows = 0 + self.retryPoll = 0 + self.lock = threading.Lock() + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def getPath(self, tool="taosBenchmark"): + if (platform.system().lower() == 'windows'): + tool = tool + ".exe" + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + paths = [] + for root, dirs, files in os.walk(projPath): + if ((tool) in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + paths.append(os.path.join(root, tool)) + break + if (len(paths) == 0): + tdLog.exit("taosBenchmark not found!") + return + else: + tdLog.info("taosBenchmark found in %s" % paths[0]) + return paths[0] + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10, + 'batchNum': 1, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdSql.execute("alter database %s wal_retention_period %d" % (paraDict['dbName'], self.walRetentionPeriod)) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + # tdLog.info("insert data") + # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdDnodes.stop(1) + # tdDnodes.start(1) + # tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqSubscribe(self, **inputDict): + consumer_dict = { + "group.id": inputDict['group_id'], + "client.id": "client", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.commit.interval.ms": "100", + "enable.auto.commit": "true", + "auto.offset.reset": "earliest", + "experimental.snapshot.enable": "false", + "msg.with.table.name": "false" + } + + consumer = Consumer(consumer_dict) + consumer.subscribe([inputDict['topic_name']]) + onceFlag = 0 + try: + while True: + if (1 == self.retryPoll): + time.sleep(2) + continue + res = consumer.poll(inputDict['pollDelay']) + if not res: + break + err = res.error() + if err is not None: + raise err + + val = res.value() + for block in val: + # print(block.fetchall()) + data = block.fetchall() + for row in data: + # print("===================================") + # print(row) + self.actConsumeTotalRows += 1 + if (0 == onceFlag): + onceFlag = 1 + with self.lock: + self.retryPoll = 1 + currentTime = datetime.now() + print("%s temp stop consume"%(str(currentTime))) + + currentTime = datetime.now() + print("%s already consume rows: %d, and sleep for a while"%(str(currentTime), self.actConsumeTotalRows)) + # time.sleep(self.walRetentionPeriod * 3) + finally: + consumer.unsubscribe() + consumer.close() + + return + + def asyncSubscribe(self, inputDict): + pThread = threading.Thread(target=self.tmqSubscribe, kwargs=inputDict) + pThread.start() + return pThread + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 100, + 'batchNum': 1, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + # create topic + topicNameList = ['dbtstb_0001'] + tdLog.info("create topics from stb") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + for i in range(len(topicNameList)): + sqlString = "create topic %s as %s" %(topicNameList[i], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + + + # start consumer + inputDict = {'group_id': "grpid_0001", + 'topic_name': topicNameList[0], + 'pollDelay': 10 + } + + pThread2 = self.asyncSubscribe(inputDict) + + pThread1 = tmqCom.asyncInsertDataByInterlace(paraDict) + pThread1.join() + tdLog.info("firstly call to flash database") + tdSql.query("flush database %s"%(paraDict['dbName'])) + time.sleep(self.walRetentionPeriod + 1) + tdLog.info("secondely call to flash database") + tdSql.query("flush database %s"%(paraDict['dbName'])) + + # wait the consumer to complete one poll + while (0 == self.retryPoll): + time.sleep(1) + continue + + with self.lock: + self.retryPoll = 0 + currentTime = datetime.now() + print("%s restart consume"%(str(currentTime))) + + paraDict["startTs"] = 1640966400000 + paraDict["ctbNum"] * paraDict["rowsPerTbl"] + pThread3 = tmqCom.asyncInsertDataByInterlace(paraDict) + + + tdLog.debug("wait sub-thread to end insert data") + pThread3.join() + + totalInsertRows = paraDict["ctbNum"] * paraDict["rowsPerTbl"] * 2 + tdLog.debug("wait sub-thread to end consume data") + pThread2.join() + + tdLog.info("act consume total rows: %d, act insert total rows: %d"%(self.actConsumeTotalRows, totalInsertRows)) + + if (self.actConsumeTotalRows >= totalInsertRows): + tdLog.exit("act consume rows: %d not equal expect: %d"%(self.actConsumeTotalRows, totalInsertRows)) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqDropConsumer.json b/tests/system-test/7-tmq/tmqDropConsumer.json new file mode 100644 index 0000000000..538e93ea5c --- /dev/null +++ b/tests/system-test/7-tmq/tmqDropConsumer.json @@ -0,0 +1,28 @@ +{ + "filetype": "subscribe", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "result_file": "tmq_res.txt", + "tmq_info": { + "concurrent": 2, + "poll_delay": 100000, + "group.id": "", + "group_mode": "independent", + "create_mode": "parallel", + "client.id": "cliid_0001", + "auto.offset.reset": "earliest", + "enable.manual.commit": "false", + "enable.auto.commit": "false", + "auto.commit.interval.ms": 1000, + "experimental.snapshot.enable": "false", + "msg.with.table.name": "false", + "rows_file": "", + "topic_list": [ + {"name": "dbtstb_0001", "sql": "select * from dbt.stb;"}, + {"name": "dbtstb_0002", "sql": "select * from dbt.stb;"} + ] + } +} diff --git a/tests/system-test/7-tmq/tmqDropConsumer.py b/tests/system-test/7-tmq/tmqDropConsumer.py new file mode 100644 index 0000000000..336285a39e --- /dev/null +++ b/tests/system-test/7-tmq/tmqDropConsumer.py @@ -0,0 +1,282 @@ + +import sys +import time +import threading +from taos.tmq import Consumer +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135} + + def __init__(self): + self.vgroups = 2 + self.ctbNum = 10 + self.rowsPerTbl = 10 + self.tmqMaxTopicNum = 2 + self.tmqMaxGroups = 2 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def getPath(self, tool="taosBenchmark"): + if (platform.system().lower() == 'windows'): + tool = tool + ".exe" + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + paths = [] + for root, dirs, files in os.walk(projPath): + if ((tool) in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + paths.append(os.path.join(root, tool)) + break + if (len(paths) == 0): + tdLog.exit("taosBenchmark not found!") + return + else: + tdLog.info("taosBenchmark found in %s" % paths[0]) + return paths[0] + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 2, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdSql.execute("alter database %s wal_retention_period 360000" % (paraDict['dbName'])) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdDnodes.stop(1) + # tdDnodes.start(1) + tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqSubscribe(self, topicName, newGroupId, expectResult): + # create new connector for new tdSql instance in my thread + # newTdSql = tdCom.newTdSql() + # topicName = inputDict['topic_name'] + # group_id = inputDict['group_id'] + + consumer_dict = { + "group.id": newGroupId, + "client.id": "client", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.commit.interval.ms": "1000", + "enable.auto.commit": "true", + "auto.offset.reset": "earliest", + "experimental.snapshot.enable": "false", + "msg.with.table.name": "false" + } + + ret = 'success' + consumer = Consumer(consumer_dict) + # print("======%s"%(inputDict['topic_name'])) + try: + consumer.subscribe([topicName]) + except Exception as e: + tdLog.info("consumer.subscribe() fail ") + tdLog.info("%s"%(e)) + if (expectResult == "fail"): + consumer.close() + return 'success' + else: + consumer.close() + return 'fail' + + tdLog.info("consumer.subscribe() success ") + if (expectResult == "success"): + consumer.close() + return 'success' + else: + consumer.close() + return 'fail' + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 100000000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + topicNameList = ['dbtstb_0001','dbtstb_0002'] + tdLog.info("create topics from stb") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + for i in range(len(topicNameList)): + sqlString = "create topic %s as %s" %(topicNameList[i], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # tdSql.query('show topics;') + # topicNum = tdSql.queryRows + # tdLog.info(" topic count: %d"%(topicNum)) + # if topicNum != len(topicNameList): + # tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, len(topicNameList))) + + pThread = tmqCom.asyncInsertDataByInterlace(paraDict) + + # use taosBenchmark to subscribe + binPath = self.getPath() + cmd = "nohup %s -f ./7-tmq/tmqDropConsumer.json > /dev/null 2>&1 & " % binPath + tdLog.info("%s"%(cmd)) + os.system(cmd) + + expectTopicNum = len(topicNameList) + consumerThreadNum = 2 + expectConsumerNUm = expectTopicNum * consumerThreadNum + expectSubscribeNum = self.vgroups * expectTopicNum * consumerThreadNum + + tdSql.query('show topics;') + topicNum = tdSql.queryRows + tdLog.info(" get topic count: %d"%(topicNum)) + if topicNum != expectTopicNum: + tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, expectTopicNum)) + + flag = 0 + while (1): + tdSql.query('show consumers;') + consumerNUm = tdSql.queryRows + tdLog.info(" get consumers count: %d"%(consumerNUm)) + if consumerNUm == expectConsumerNUm: + flag = 1 + break + else: + time.sleep(1) + + if (0 == flag): + tmqCom.g_end_insert_flag = 1 + tdLog.exit("show consumers %d not equal expect num: %d"%(topicNum, expectConsumerNUm)) + + flag = 0 + for i in range(10): + tdSql.query('show subscriptions;') + subscribeNum = tdSql.queryRows + tdLog.info(" get subscriptions count: %d"%(subscribeNum)) + if subscribeNum == expectSubscribeNum: + flag = 1 + break + else: + time.sleep(1) + + if (0 == flag): + tmqCom.g_end_insert_flag = 1 + tdLog.exit("show subscriptions %d not equal expect num: %d"%(subscribeNum, expectSubscribeNum)) + + # get all consumer group id + tdSql.query('show consumers;') + consumerNUm = tdSql.queryRows + groupIdList = [] + for i in range(consumerNUm): + groupId = tdSql.getData(i,1) + existFlag = 0 + for j in range(len(groupIdList)): + if (groupId == groupIdList[j]): + existFlag = 1 + break + if (0 == existFlag): + groupIdList.append(groupId) + + # kill taosBenchmark + tmqCom.killProcesser("taosBenchmark") + tdLog.info("kill taosBenchmak end") + + # wait the status to "lost" + while (1): + exitFlag = 1 + tdSql.query('show consumers;') + consumerNUm = tdSql.queryRows + for i in range(consumerNUm): + status = tdSql.getData(i,3) + if (status != "lost"): + exitFlag = 0 + time.sleep(2) + break + if (1 == exitFlag): + break + + tdLog.info("all consumers status into 'lost'") + + # drop consumer groups + for i in range(len(groupIdList)): + for j in range(len(topicNameList)): + sqlCmd = f"drop consumer group `%s` on %s"%(groupIdList[i], topicNameList[j]) + tdLog.info("drop consumer cmd: %s"%(sqlCmd)) + tdSql.execute(sqlCmd) + + tmqCom.g_end_insert_flag = 1 + tdLog.debug("notify sub-thread to stop insert data") + pThread.join() + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqMaxGroupIds.json b/tests/system-test/7-tmq/tmqMaxGroupIds.json new file mode 100644 index 0000000000..beb16576b0 --- /dev/null +++ b/tests/system-test/7-tmq/tmqMaxGroupIds.json @@ -0,0 +1,27 @@ +{ + "filetype": "subscribe", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "result_file": "tmq_res.txt", + "tmq_info": { + "concurrent": 99, + "poll_delay": 100000, + "group.id": "", + "group_mode": "independent", + "create_mode": "parallel", + "client.id": "cliid_0001", + "auto.offset.reset": "earliest", + "enable.manual.commit": "false", + "enable.auto.commit": "false", + "auto.commit.interval.ms": 1000, + "experimental.snapshot.enable": "false", + "msg.with.table.name": "false", + "rows_file": "", + "topic_list": [ + {"name": "dbtstb_0001", "sql": "select * from dbt.stb;"} + ] + } +} diff --git a/tests/system-test/7-tmq/tmqMaxGroupIds.py b/tests/system-test/7-tmq/tmqMaxGroupIds.py new file mode 100644 index 0000000000..d22b79a44c --- /dev/null +++ b/tests/system-test/7-tmq/tmqMaxGroupIds.py @@ -0,0 +1,246 @@ + +import sys +import time +import threading +from taos.tmq import Consumer +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135} + + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 10 + self.tmqMaxTopicNum = 20 + self.tmqMaxGroups = 100 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def getPath(self, tool="taosBenchmark"): + if (platform.system().lower() == 'windows'): + tool = tool + ".exe" + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + paths = [] + for root, dirs, files in os.walk(projPath): + if ((tool) in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + paths.append(os.path.join(root, tool)) + break + if (len(paths) == 0): + tdLog.exit("taosBenchmark not found!") + return + else: + tdLog.info("taosBenchmark found in %s" % paths[0]) + return paths[0] + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdSql.execute("alter database %s wal_retention_period 360000" % (paraDict['dbName'])) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdDnodes.stop(1) + # tdDnodes.start(1) + tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqSubscribe(self, topicName, newGroupId, expectResult): + # create new connector for new tdSql instance in my thread + # newTdSql = tdCom.newTdSql() + # topicName = inputDict['topic_name'] + # group_id = inputDict['group_id'] + + consumer_dict = { + "group.id": newGroupId, + "client.id": "client", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.commit.interval.ms": "1000", + "enable.auto.commit": "true", + "auto.offset.reset": "earliest", + "experimental.snapshot.enable": "false", + "msg.with.table.name": "false" + } + + ret = 'success' + consumer = Consumer(consumer_dict) + # print("======%s"%(inputDict['topic_name'])) + try: + consumer.subscribe([topicName]) + except Exception as e: + tdLog.info("consumer.subscribe() fail ") + tdLog.info("%s"%(e)) + if (expectResult == "fail"): + consumer.close() + return 'success' + else: + consumer.close() + return 'fail' + + tdLog.info("consumer.subscribe() success ") + if (expectResult == "success"): + consumer.close() + return 'success' + else: + consumer.close() + return 'fail' + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 100000000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + topicNameList = ['dbtstb_0001'] + tdLog.info("create topics from stb") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + for i in range(len(topicNameList)): + sqlString = "create topic %s as %s" %(topicNameList[i], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # tdSql.query('show topics;') + # topicNum = tdSql.queryRows + # tdLog.info(" topic count: %d"%(topicNum)) + # if topicNum != len(topicNameList): + # tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, len(topicNameList))) + + pThread = tmqCom.asyncInsertDataByInterlace(paraDict) + + # use taosBenchmark to subscribe + binPath = self.getPath() + cmd = "nohup %s -f ./7-tmq/tmqMaxGroupIds.json > /dev/null 2>&1 & " % binPath + tdLog.info("%s"%(cmd)) + os.system(cmd) + + expectTopicNum = 1 + expectConsumerNUm = 99 + expectSubscribeNum = 99 + + tdSql.query('show topics;') + topicNum = tdSql.queryRows + tdLog.info(" get topic count: %d"%(topicNum)) + if topicNum != expectTopicNum: + tdLog.exit("show topics %d not equal expect num: %d"%(topicNum, expectTopicNum)) + + flag = 0 + while (1): + tdSql.query('show consumers;') + consumerNUm = tdSql.queryRows + tdLog.info(" get consumers count: %d"%(consumerNUm)) + if consumerNUm == expectConsumerNUm: + flag = 1 + break + else: + time.sleep(1) + + if (0 == flag): + tdLog.exit("show consumers %d not equal expect num: %d"%(topicNum, expectConsumerNUm)) + + flag = 0 + for i in range(10): + tdSql.query('show subscriptions;') + subscribeNum = tdSql.queryRows + tdLog.info(" get subscriptions count: %d"%(subscribeNum)) + if subscribeNum == expectSubscribeNum: + flag = 1 + break + else: + time.sleep(1) + + if (0 == flag): + tdLog.exit("show subscriptions %d not equal expect num: %d"%(subscribeNum, expectSubscribeNum)) + + res = self.tmqSubscribe(topicNameList[0], "newGroupId_001", "success") + if res != 'success': + tdLog.exit("limit max groupid fail") + + res = self.tmqSubscribe(topicNameList[0], "newGroupId_002", "fail") + if res != 'success': + tdLog.exit("limit max groupid fail") + + tmqCom.g_end_insert_flag = 1 + tdLog.debug("notify sub-thread to stop insert data") + pThread.join() + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 865d4680a3..e9dd067ac4 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -361,11 +361,11 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i case TSDB_DATA_TYPE_FLOAT: width = SHELL_FLOAT_WIDTH; if (tsEnableScience) { - taosFprintfFile(pFile, "%*e", width, GET_FLOAT_VAL(val)); + taosFprintfFile(pFile, "%*.7e", width, GET_FLOAT_VAL(val)); } else { - n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.5f", width, GET_FLOAT_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.7f", width, GET_FLOAT_VAL(val)); if (n > SHELL_FLOAT_WIDTH) { - taosFprintfFile(pFile, "%*e", width, GET_FLOAT_VAL(val)); + taosFprintfFile(pFile, "%*.7e", width, GET_FLOAT_VAL(val)); } else { taosFprintfFile(pFile, "%s", buf); } @@ -374,10 +374,10 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i case TSDB_DATA_TYPE_DOUBLE: width = SHELL_DOUBLE_WIDTH; if (tsEnableScience) { - snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%.9e", GET_DOUBLE_VAL(val)); - taosFprintfFile(pFile, "%*s", width, buf); + snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.15e", width, GET_DOUBLE_VAL(val)); + taosFprintfFile(pFile, "%s", buf); } else { - n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.15f", width, GET_DOUBLE_VAL(val)); if (n > SHELL_DOUBLE_WIDTH) { taosFprintfFile(pFile, "%*.15e", width, GET_DOUBLE_VAL(val)); } else { @@ -612,11 +612,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t break; case TSDB_DATA_TYPE_FLOAT: if (tsEnableScience) { - printf("%*e", width, GET_FLOAT_VAL(val)); + printf("%*.7e",width,GET_FLOAT_VAL(val)); } else { - n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.5f", width, GET_FLOAT_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.7f", width, GET_FLOAT_VAL(val)); if (n > SHELL_FLOAT_WIDTH) { - printf("%*e", width, GET_FLOAT_VAL(val)); + + printf("%*.7e", width,GET_FLOAT_VAL(val)); } else { printf("%s", buf); } @@ -624,14 +625,14 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t break; case TSDB_DATA_TYPE_DOUBLE: if (tsEnableScience) { - snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%.9e", GET_DOUBLE_VAL(val)); - printf("%*s", width, buf); + snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.15e", width,GET_DOUBLE_VAL(val)); + printf("%s", buf); } else { - n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.15f", width, GET_DOUBLE_VAL(val)); if (n > SHELL_DOUBLE_WIDTH) { printf("%*.15e", width, GET_DOUBLE_VAL(val)); } else { - printf("%s", buf); + printf("%*s", width,buf); } } break;