diff --git a/docs/en/14-reference/03-connector/06-rust.mdx b/docs/en/14-reference/03-connector/06-rust.mdx index 56f5e20cb4..a98683d43c 100644 --- a/docs/en/14-reference/03-connector/06-rust.mdx +++ b/docs/en/14-reference/03-connector/06-rust.mdx @@ -31,7 +31,8 @@ Websocket connections are supported on all platforms that can run Go. | connector-rust version | TDengine version | major features | | :----------------: | :--------------: | :--------------------------------------------------: | -| v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. | +| v0.9.2 | 3.0.7.0 or later | STMT: Get tag_fields and col_fields under ws. | +| v0.8.12 | 3.0.5.0 | TMQ: Get consuming progress and seek offset to consume. | | v0.8.0 | 3.0.4.0 | Support schemaless insert. | | v0.7.6 | 3.0.3.0 | Support req_id in query. | | v0.6.0 | 3.0.0.0 | Base features. | diff --git a/docs/zh/08-connector/26-rust.mdx b/docs/zh/08-connector/26-rust.mdx index 79a6badfea..3e51aa72bb 100644 --- a/docs/zh/08-connector/26-rust.mdx +++ b/docs/zh/08-connector/26-rust.mdx @@ -30,7 +30,8 @@ Websocket 连接支持所有能运行 Rust 的平台。 | Rust 连接器版本 | TDengine 版本 | 主要功能 | | :----------------: | :--------------: | :--------------------------------------------------: | -| v0.8.12 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 | +| v0.9.2 | 3.0.7.0 or later | STMT:ws 下获取 tag_fields、col_fields。 | +| v0.8.12 | 3.0.5.0 | 消息订阅:获取消费进度及按照指定进度开始消费。 | | v0.8.0 | 3.0.4.0 | 支持无模式写入。 | | v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 | | v0.6.0 | 3.0.0.0 | 基础功能。 | diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b7544a13ca..066f83fbcb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,7 +45,6 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner - TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause }; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 92dd4ea419..0c8a55b340 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1859,8 +1859,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); - if(reqOffset->type != 0) pVg->offsetInfo.beginOffset = *reqOffset; - if(rspOffset->type != 0) pVg->offsetInfo.endOffset = *rspOffset; + pVg->offsetInfo.beginOffset = *reqOffset; + pVg->offsetInfo.endOffset = *rspOffset; } else { tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); } @@ -1948,7 +1948,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } taosWUnLockLatch(&tmq->lock); } else { - tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); @@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosWUnLockLatch(&tmq->lock); return pRsp; } else { - tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); @@ -2036,7 +2036,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosWUnLockLatch(&tmq->lock); return pRsp; } else { - tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 73a0bed48f..ef0006e7ab 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7227,6 +7227,9 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { return pLeft->uid == pRight->uid; + } else { + uError("offset type:%d", pLeft->type); + ASSERT(0); } } return false; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6081b9a530..45b46c11b4 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -420,6 +420,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ + ASSERT(0); continue; } taosWLockLatch(&pSub->lock); @@ -515,7 +516,10 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { char *topic = taosArrayGetP(pConsumer->currentTopics, i); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); // txn guarantees pSub is created - if(pSub == NULL) continue; + if(pSub == NULL) { + ASSERT(0); + continue; + } taosRLockLatch(&pSub->lock); SMqSubTopicEp topicEp = {0}; @@ -524,6 +528,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 2.1 fetch topic schema SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); if(pTopic == NULL) { + ASSERT(0); taosRUnLockLatch(&pSub->lock); mndReleaseSubscribe(pMnode, pSub); continue; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 6bd23c3b90..ab5877327f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1168,7 +1168,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1); - mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), + mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), consumerId, varDataVal(cgroup), pVgEp->vgId); // offset diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b0ddefce79..7235a56691 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -346,9 +346,9 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pSnode->pMeta, pTask); - qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", - pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, - streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bedb8bba0c..95921936e1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -146,6 +146,20 @@ void tqClose(STQ* pTq) { return; } + void* pIter = taosHashIterate(pTq->pPushMgr, NULL); + while (pIter) { + STqHandle* pHandle = *(STqHandle**)pIter; + int32_t vgId = TD_VID(pTq->pVnode); + + if(pHandle->msg != NULL) { + tqPushEmptyDataRsp(pHandle, vgId); + rpcFreeCont(pHandle->msg->pCont); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + pIter = taosHashIterate(pTq->pPushMgr, pIter); + } + tqOffsetClose(pTq->pOffsetStore); taosHashCleanup(pTq->pHandle); taosHashCleanup(pTq->pPushMgr); @@ -278,6 +292,10 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { tqInitDataRsp(&dataRsp, &req); dataRsp.blockNum = 0; dataRsp.rspOffset = dataRsp.reqOffset; + char buf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset); + tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); + tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); tDeleteMqDataRsp(&dataRsp); return 0; @@ -515,10 +533,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) { while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); + tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); + taosHashCancelIterate(pTq->pPushMgr, pIter); break; }else{ SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; @@ -849,30 +868,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosWLockLatch(&pTq->lock); if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, - req.newConsumerId); + tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); } else { - tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, - req.newConsumerId); + tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); + // atomic_add_fetch_32(&pHandle->epoch, 1); + + // kill executing task + // if(tqIsHandleExec(pHandle)) { + // qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + // if (pTaskInfo != NULL) { + // qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + // } + + // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + // qStreamCloseTsdbReader(pTaskInfo); + // } + // } + // remove if it has been register in the push manager, and return one empty block to consumer + tqUnregisterPushHandle(pTq, pHandle); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } - // atomic_add_fetch_32(&pHandle->epoch, 1); - - // kill executing task - // if(tqIsHandleExec(pHandle)) { - // qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - // if (pTaskInfo != NULL) { - // qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - // } - - // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - // qStreamCloseTsdbReader(pTaskInfo); - // } - // } - // remove if it has been register in the push manager, and return one empty block to consumer - tqUnregisterPushHandle(pTq, pHandle); taosWUnLockLatch(&pTq->lock); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); } end: @@ -1041,9 +1058,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", - pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, - streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + tqDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", @@ -1145,7 +1162,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 3. It's an fill history task, do nothing. wait for the main task to start it SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); if (p != NULL) { // reset the downstreamReady flag. - p->status.downstreamReady = 0; streamTaskCheckDownstreamTasks(p); } @@ -1154,12 +1170,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { - int32_t code = TSDB_CODE_SUCCESS; - char* msg = pMsg->pCont; - + SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamMeta* pMeta = pTq->pStreamMeta; - SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg; + int32_t code = TSDB_CODE_SUCCESS; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); if (pTask == NULL) { tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", @@ -1167,12 +1181,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return -1; } - // do recovery step 1 + // do recovery step1 const char* id = pTask->id.idStr; const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); + tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus); - int64_t st = taosGetTimestampMs(); + if (pTask->tsInfo.step1Start == 0) { + ASSERT(pTask->status.pauseAllowed == false); + pTask->tsInfo.step1Start = taosGetTimestampMs(); + if (pTask->info.fillHistory == 1) { + streamTaskEnablePause(pTask); + } + } else { + tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start); + } // we have to continue retrying to successfully execute the scan history task. int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, @@ -1185,31 +1207,21 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - ASSERT(pTask->status.pauseAllowed == false); - if (pTask->info.fillHistory == 1) { - streamTaskEnablePause(pTask); + ASSERT(pTask->status.pauseAllowed == true); } - if (!streamTaskRecoverScanStep1Finished(pTask)) { - streamSourceScanHistoryData(pTask); - } - - // disable the pause when handling the step2 scan of tsdb data. - // the whole next procedure cann't be stopped. - // todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode. - if (pTask->info.fillHistory == 1) { - streamTaskDisablePause(pTask); - } - - if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); + streamSourceScanHistoryData(pTask); + if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { + double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; + tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, + TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - streamMetaReleaseTask(pMeta, pTask); return 0; } - double el = (taosGetTimestampMs() - st) / 1000.0; + // the following procedure should be executed, no matter status is stop/pause or not + double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { @@ -1217,77 +1229,71 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pStreamTask = NULL; bool done = false; - if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { - // 1. stop the related stream task, get the current scan wal version of stream task, ver. - pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); - if (pStreamTask == NULL) { - qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", - pTask->streamTaskId.taskId, pTask->id.idStr); + // 1. get the related stream task + pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); + if (pStreamTask == NULL) { + // todo delete this task, if the related stream task is dropped + qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", + pTask->streamTaskId.taskId, pTask->id.idStr); - pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s fill-history task set status to be dropping", id); + tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamMetaSaveTask(pMeta, pTask); - streamMetaReleaseTask(pMeta, pTask); - return -1; - } - - ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - - // stream task in TASK_STATUS__SCAN_HISTORY can not be paused. - // wait for the stream task get ready for scan history data - while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - tqDebug( - "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", - id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); - taosMsleep(100); - } - - // now we can stop the stream task execution - streamTaskHalt(pStreamTask); - tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); - - // if it's an source task, extract the last version in wal. - pRange = &pTask->dataRange.range; - int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); + streamMetaUnregisterTask(pMeta, pTask->id.taskId); + streamMetaReleaseTask(pMeta, pTask); + return -1; } + ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); + + // 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the + // stream task get ready for scan history data + while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + tqDebug( + "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", + id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); + taosMsleep(100); + } + + // now we can stop the stream task execution + streamTaskHalt(pStreamTask); + + tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, + pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); + + // if it's an source task, extract the last version in wal. + pRange = &pTask->dataRange.range; + int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); + if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); streamTaskEndScanWAL(pTask); streamMetaReleaseTask(pMeta, pTask); } else { - if (!streamTaskRecoverScanStep1Finished(pTask)) { - STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 - ", do secondary scan-history from WAL after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); - ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); + STimeWindow* pWindow = &pTask->dataRange.window; + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + ", do secondary scan-history from WAL after halt the related stream task:%s", + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, + pStreamTask->id.idStr); + ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - pTask->tsInfo.step2Start = taosGetTimestampMs(); - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); - } + pTask->tsInfo.step2Start = taosGetTimestampMs(); + streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); - if (!streamTaskRecoverScanStep2Finished(pTask)) { - pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; - if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); - streamMetaReleaseTask(pMeta, pTask); - return 0; - } + int64_t dstVer = pTask->dataRange.range.minVer - 1; - int64_t dstVer = pTask->dataRange.range.minVer - 1; - - pTask->chkInfo.currentVer = dstVer; - walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); - tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer, - TASK_SCHED_STATUS__INACTIVE); - } + pTask->chkInfo.currentVer = dstVer; + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); + tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, + pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + // set the fill-history task to be normal + if (pTask->info.fillHistory == 1) { + streamSetStatusNormal(pTask); + } + // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 5. resume the related stream task. streamMetaReleaseTask(pMeta, pTask); @@ -1304,7 +1310,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( - "s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time " + "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); qResetStreamInfoTimeWindow(pTask->exec.pExecutor); @@ -1500,7 +1506,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask != NULL) { // even in halt status, the data in inputQ must be processed int8_t st = pTask->status.taskStatus; - if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) { + if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); @@ -1637,7 +1643,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } - if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) { + if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { streamStartRecoverTask(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { tqStartStreamTasks(pTq); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 5ccf4c825b..a236b98614 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -78,12 +78,12 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); - tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, + tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } -int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { +int tqUnregisterPushHandle(STQ* pTq, void *handle) { STqHandle *pHandle = (STqHandle*)handle; int32_t vgId = TD_VID(pTq->pVnode); @@ -91,7 +91,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { return 0; } int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); - tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); + tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); if(pHandle->msg != NULL) { // tqPushDataRsp(pHandle, vgId); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 67ae160d6d..c3e7d03e43 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -211,7 +211,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { - qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64 + qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal anymore, set the transfer state flag", pTask->id.idStr, ver, pTask->dataRange.range.maxVer); pTask->status.transferState = true; @@ -251,19 +251,19 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; // non-source or fill-history tasks don't need to response the WAL scan action. - if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } - if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) { + if (status != TASK_STATUS__NORMAL) { tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { - ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); + ASSERT(status == TASK_STATUS__NORMAL); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, pTask->dataRange.range.maxVer); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 9062084246..55a1cecafe 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -317,6 +317,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ // the offset value can not be monotonious increase?? offset = reqOffset; } else { + uError("req offset type is 0"); return TSDB_CODE_TMQ_INVALID_MSG; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index df6cc1b5e7..32429ff8c6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache return code; } -/* -int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { - int32_t code = 0; - SLRUCache *pCache = pTsdb->lruCache; - SArray *pCidList = pr->pCidList; - int num_keys = TARRAY_SIZE(pCidList); - for (int i = 0; i < num_keys; ++i) { - SLastCol *pLastCol = NULL; - int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); - - SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; - LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); - if (!h) { - taosThreadMutexLock(&pTsdb->lruMutex); - h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); - if (!h) { - pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype); - - size_t charge = sizeof(*pLastCol); - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { - charge += pLastCol->colVal.value.nData; - } - - LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h, - TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - } - - taosThreadMutexUnlock(&pTsdb->lruMutex); - } - - pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - - SLastCol lastCol = *pLastCol; - reallocVarData(&lastCol.colVal); - - if (h) { - taosLRUCacheRelease(pCache, h, false); - } - - taosArrayPush(pLastArray, &lastCol); - } - - return code; -} -*/ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t code = 0; // fetch schema @@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + taosThreadMutexLock(&pTsdb->lruMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex); rocksMayWrite(pTsdb, true, false, false); rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, @@ -1137,7 +1090,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksdb_free(values_list[i]); rocksdb_free(values_list[i + num_keys]); - taosThreadMutexLock(&pTsdb->lruMutex); + // taosThreadMutexLock(&pTsdb->lruMutex); LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen); if (h) { @@ -1159,7 +1112,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE } taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); - taosThreadMutexUnlock(&pTsdb->lruMutex); + // taosThreadMutexUnlock(&pTsdb->lruMutex); } for (int i = 0; i < num_keys; ++i) { taosMemoryFree(keys_list[i]); @@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE rocksMayWrite(pTsdb, true, false, true); + taosThreadMutexUnlock(&pTsdb->lruMutex); + _exit: taosMemoryFree(pTSchema); @@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { return code; } -/* -int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { - int32_t code = 0; - char key[32] = {0}; - int keyLen = 0; - // getTableCacheKey(uid, "lr", key, &keyLen); - getTableCacheKey(uid, 0, key, &keyLen); - LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - bool invalidate = false; - int16_t nCol = taosArrayGetSize(pLast); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (eKey >= tTsVal->ts) { - invalidate = true; - break; - } - } - - if (invalidate) { - taosLRUCacheRelease(pCache, h, true); - } else { - taosLRUCacheRelease(pCache, h, false); - } - } - - // getTableCacheKey(uid, "l", key, &keyLen); - getTableCacheKey(uid, 1, key, &keyLen); - h = taosLRUCacheLookup(pCache, key, keyLen); - if (h) { - SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); - bool invalidate = false; - int16_t nCol = taosArrayGetSize(pLast); - - for (int16_t iCol = 0; iCol < nCol; ++iCol) { - SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); - if (eKey >= tTsVal->ts) { - invalidate = true; - break; - } - } - - if (invalidate) { - taosLRUCacheRelease(pCache, h, true); - } else { - taosLRUCacheRelease(pCache, h, false); - } - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); - } - - return code; -} -*/ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) { int32_t code = 0; STSRow *cacheRow = NULL; @@ -1767,6 +1667,10 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea } if (record.version <= pReader->info.verRange.maxVer) { + /* + tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records", + TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid); + */ SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; taosArrayPush(pInfo->pTombData, &delData); } @@ -1977,9 +1881,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie goto _err; } - loadDataTomb(state->pr, state->pr->pFileReader); - state->pr->pCurFileSet = state->pFileSet; + + loadDataTomb(state->pr, state->pr->pFileReader); } if (!state->pIndexList) { @@ -2017,6 +1921,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie state->iBrinIndex = indexSize; } + if (state->pFileSet != state->pr->pCurFileSet) { + state->pr->pCurFileSet = state->pFileSet; + } + code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid, state->pr, state->lastTs, aCols, nCols); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 3b07ef9ea9..65fc552365 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -476,8 +476,8 @@ void vnodeClose(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); - walClose(pVnode->pWal); tqClose(pVnode->pTq); + walClose(pVnode->pWal); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); smaClose(pVnode->pSma); if (pVnode->pMeta) metaClose(&pVnode->pMeta); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b85305b32d..5a99c1ea9a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -341,6 +341,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v return NULL; } + qResetStreamInfoTimeWindow(pTaskInfo); return pTaskInfo; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6eee9df528..7434db61db 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1550,10 +1550,86 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock } } -static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) { +static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { + if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); + + if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); + + ASSERT(pCol->pData != NULL); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } else if (pWindow->ekey != INT64_MAX) { + qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts <= pWindow->ekey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + taosMemoryFree(p); + } +} + +// re-build the delete block, ONLY according to the split timestamp +static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { + if (skey == INT64_MIN) { + return; + } + + int32_t numOfRows = pBlock->info.rows; + + bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; + SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; + + for (int32_t i = 0; i < numOfRows; i++) { + if (tsStartCol[i] < skey) { + tsStartCol[i] = skey; + } + + if (tsEndCol[i] >= skey) { + p[i] = true; + } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); + taosMemoryFree(p); +} + +static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + const char* id = GET_TASKID(pTaskInfo); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); @@ -1593,7 +1669,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pBlockInfo->rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); + pBlockInfo->rows, id, &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { blockDataFreeRes((SSDataBlock*)pBlock); @@ -1608,8 +1684,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); } + // filter the block extracted from WAL files, according to the time window apply additional time window filter + doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id); pInfo->pRes->info.dataLoad = 1; + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); + if (pInfo->pRes->info.rows == 0) { + return 0; + } calBlockTbName(pInfo, pInfo->pRes); return 0; @@ -1666,7 +1748,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); blockDataCleanup(pInfo->pRes); - setBlockIntoRes(pInfo, pRes, true); + STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; + setBlockIntoRes(pInfo, pRes, &defaultWindow, true); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } @@ -1775,80 +1858,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } } -static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { - if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { - bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; - - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); - - if (pWindow->skey != INT64_MIN) { - qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); - - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*)colDataGetData(pCol, i); - p[i] = (*ts >= pWindow->skey); - - if (!p[i]) { - hasUnqualified = true; - } - } - } else if (pWindow->ekey != INT64_MAX) { - qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*)colDataGetData(pCol, i); - p[i] = (*ts <= pWindow->ekey); - - if (!p[i]) { - hasUnqualified = true; - } - } - } - - if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); - } - - taosMemoryFree(p); - } -} - -// re-build the delete block, ONLY according to the split timestamp -static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { - if (skey == INT64_MIN) { - return; - } - - int32_t numOfRows = pBlock->info.rows; - - bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); - bool hasUnqualified = false; - - SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; - SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); - uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; - - for (int32_t i = 0; i < numOfRows; i++) { - if (tsStartCol[i] < skey) { - tsStartCol[i] = skey; - } - - if (tsEndCol[i] >= skey) { - p[i] = true; - } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] - hasUnqualified = true; - } - } - - if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); - } - - qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); - taosMemoryFree(p); -} - static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2121,8 +2130,7 @@ FETCH_NEXT_BLOCK: return pInfo->pUpdateRes; } - SSDataBlock* pBlock = pInfo->pRes; - SDataBlockInfo* pBlockInfo = &pBlock->info; + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); NEXT_SUBMIT_BLK: @@ -2146,21 +2154,23 @@ FETCH_NEXT_BLOCK: } } - blockDataCleanup(pBlock); + blockDataCleanup(pInfo->pRes); while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) { SSDataBlock* pRes = NULL; int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); - qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, - id); + qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id); if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { qDebug("retrieve data failed, try next block in submit block, %s", id); continue; } - setBlockIntoRes(pInfo, pRes, false); + setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false); + if (pInfo->pRes->info.rows == 0) { + continue; + } if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; @@ -2168,13 +2178,8 @@ FETCH_NEXT_BLOCK: return pInfo->pCreateTbRes; } - // apply additional time window filter - doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); - pBlock->info.dataLoad = 1; - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - - doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); @@ -2196,7 +2201,7 @@ FETCH_NEXT_BLOCK: qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id); if (pBlockInfo->rows > 0) { - return pBlock; + return pInfo->pRes; } if (pInfo->pUpdateDataRes->info.rows > 0) { @@ -2587,7 +2592,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igExpired = pTableScanNode->igExpired; pInfo->twAggSup.maxTs = INT64_MIN; - pInfo->pState = NULL; + pInfo->pState = pTaskInfo->streamInfo.pState; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 16be9f6ca2..34b1262f09 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3736,7 +3736,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); if (winNum > 0) { - saveSessionOutputBuf(pAggSup, &winInfo); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(winInfo, pInfo->pStUpdated); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -3747,9 +3746,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { getSessionHashKey(&winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } - } else { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)winInfo.pOutputBuf, &pAggSup->stateStore); } + saveSessionOutputBuf(pAggSup, &winInfo); } taosMemoryFree(pBuf); @@ -4398,7 +4396,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) { setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated); - saveSessionOutputBuf(pAggSup, &curInfo.winInfo); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(curInfo.winInfo, pInfo->pSeUpdated); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -4409,14 +4406,12 @@ void streamStateReloadState(SOperatorInfo* pOperator) { getSessionHashKey(&curInfo.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); } + } else if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); } if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); - } - - if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); + saveSessionOutputBuf(pAggSup, &curInfo.winInfo); } } taosMemoryFree(pBuf); diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 1a4ee3e91a..d96bb9bba4 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -118,6 +118,12 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes); void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request); SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable); +/** + * @brief return a - b with overflow check + * @retval val range between [INT64_MIN, INT64_MAX] + */ +int64_t int64SafeSub(int64_t a, int64_t b); + #ifdef __cplusplus } #endif diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ef1bc8788c..8ce68a5c8c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3296,23 +3296,25 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* if (NULL == pInterval) { return TSDB_CODE_SUCCESS; } - - int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); + int64_t timeRange = 0; int64_t intervalRange = 0; - if (IS_CALENDAR_TIME_DURATION(pInterval->unit)) { - int64_t f = 1; - if (pInterval->unit == 'n') { - f = 30LL * MILLISECOND_PER_DAY; - } else if (pInterval->unit == 'y') { - f = 365LL * MILLISECOND_PER_DAY; + if (!pCxt->createStream) { + int64_t res = int64SafeSub(pFill->timeRange.skey, pFill->timeRange.ekey); + timeRange = res < 0 ? res == INT64_MIN ? INT64_MAX : -res : res; + if (IS_CALENDAR_TIME_DURATION(pInterval->unit)) { + int64_t f = 1; + if (pInterval->unit == 'n') { + f = 30LL * MILLISECOND_PER_DAY; + } else if (pInterval->unit == 'y') { + f = 365LL * MILLISECOND_PER_DAY; + } + intervalRange = pInterval->datum.i * f; + } else { + intervalRange = pInterval->datum.i; + } + if ((timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } - intervalRange = pInterval->datum.i * f; - } else { - intervalRange = pInterval->datum.i; - } - - if ((timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 42a0d1282a..1c292b1ec4 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -1142,3 +1142,18 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) { taosHashCleanup(pMetaCache->pTableIndex); taosHashCleanup(pMetaCache->pTableCfg); } + +int64_t int64SafeSub(int64_t a, int64_t b) { + int64_t res = (uint64_t)a - (uint64_t)b; + + if (a >= 0 && b < 0) { + if ((uint64_t)res > (uint64_t)INT64_MAX) { + // overflow + res = INT64_MAX; + } + } else if (a < 0 && b > 0 && res >= 0) { + // underflow + res = INT64_MIN; + } + return res; +} diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ba8e358f68..f85ade591c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -379,7 +379,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); if (code != TSDB_CODE_SUCCESS) { destroyStreamDataBlock((SStreamDataBlock*) pItem); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c546b36191..4ef7d6084d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -172,6 +172,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { bool finished = false; while (1) { + if (streamTaskShouldPause(&pTask->status)) { + double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; + qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); + return 0; + } + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -404,6 +410,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); + // clear the link between fill-history task and stream task info + pStreamTask->historyTaskId.taskId = 0; streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); @@ -414,6 +422,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { // save to disk taosWLockLatch(&pMeta->lock); + streamMetaSaveTask(pMeta, pStreamTask); if (streamMetaCommit(pMeta) < 0) { // persist to disk @@ -615,7 +624,7 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here if (taosQueueEmpty(pTask->inputQueue->queue)) { // fill-history WAL scan has completed - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) { streamTaskRecoverSetAllStepFinished(pTask); streamTaskEndScanWAL(pTask); } else { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 4dc7b87664..df45ff2759 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -85,6 +85,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); } + streamTaskEnablePause(pTask); streamTaskScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); @@ -839,7 +840,7 @@ void streamTaskPause(SStreamTask* pTask) { return; } - while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { + while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { status = pTask->status.taskStatus; if (status == TASK_STATUS__DROPPING) { qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); @@ -856,8 +857,19 @@ void streamTaskPause(SStreamTask* pTask) { taosMsleep(100); } + // todo: use the lock of the task. + taosWLockLatch(&pMeta->lock); + + status = pTask->status.taskStatus; + if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + taosWUnLockLatch(&pMeta->lock); + qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr); + return; + } + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + taosWUnLockLatch(&pMeta->lock); int64_t el = taosGetTimestampMs() - st; qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 6c9aff046c..9c2ed190c1 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -5,6 +5,13 @@ if (DEFINED GRANT_CFG_INCLUDE_DIR) add_definitions(-DGRANTS_CFG) endif() +IF (${ASSERT_NOT_CORE}) + ADD_DEFINITIONS(-DASSERT_NOT_CORE) + MESSAGE(STATUS "disable assert core") +ELSE () + MESSAGE(STATUS "enable assert core") +ENDIF (${ASSERT_NOT_CORE}) + target_include_directories( util PUBLIC "${TD_SOURCE_DIR}/include/util" diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index c07bafa1ea..de7ad848ed 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -76,7 +76,11 @@ static int32_t tsDaylightActive; /* Currently in daylight saving time. */ bool tsLogEmbedded = 0; bool tsAsyncLog = true; +#ifdef ASSERT_NOT_CORE +bool tsAssert = false; +#else bool tsAssert = true; +#endif int32_t tsNumOfLogLines = 10000000; int32_t tsLogKeepDays = 0; LogFp tsLogFp = NULL; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index bb45ffdf72..24f968ee65 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -105,7 +105,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py -#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDnodeRestart.py @@ -450,7 +450,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 ,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3 -,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 +#,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py index 37946d0c22..a717c4966d 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py @@ -222,9 +222,9 @@ class TDTestCase: actConsumeTotalRows = resultList[0] - if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) - tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) + tdLog.info("and second consume rows should be between [0 and %d]"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) time.sleep(10) diff --git a/tests/system-test/eco-system/manager/cmul.py b/tests/system-test/eco-system/manager/cmul.py new file mode 100644 index 0000000000..ac2fa5e4f2 --- /dev/null +++ b/tests/system-test/eco-system/manager/cmul.py @@ -0,0 +1,104 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# The option for wal_retetion_period and wal_retention_size is work well +# + +import taos +from taos.tmq import Consumer + +import os +import sys +import threading +import json +import time +import random +from datetime import date +from datetime import datetime +from datetime import timedelta +from os import path + + +topicName = "topic" +topicNum = 100 + +# consume topic +def consume_topic(topic_name, group,consume_cnt, index, wait): + consumer = Consumer( + { + "group.id": group, + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + } + ) + + print(f"start consumer topic:{topic_name} group={group} index={index} ...") + consumer.subscribe([topic_name]) + cnt = 0 + try: + while True and cnt < consume_cnt: + res = consumer.poll(1) + if not res: + if wait: + continue + else: + break + err = res.error() + if err is not None: + raise err + val = res.value() + cnt += 1 + print(f" consume {cnt} ") + for block in val: + datas = block.fetchall() + data = datas[0][:50] + + print(f" {topic_name}_{group}_{index} {cnt} {data}") + + finally: + consumer.unsubscribe() + consumer.close() + +def consumerThread(index): + global topicName, topicNum + print(f' thread {index} start...') + while True: + idx = random.randint(0, topicNum - 1) + name = f"{topicName}{idx}" + group = f"group_{index}_{idx}" + consume_topic(name, group, 100, index, True) + + + +if __name__ == "__main__": + print(sys.argv) + threadCnt = 10 + + if len(sys.argv) == 1: + threadCnt = int(sys.argv[1]) + + + threads = [] + print(f'consumer with {threadCnt} threads...') + for i in range(threadCnt): + x = threading.Thread(target=consumerThread, args=(i,)) + x.start() + threads.append(x) + + # wait + for i, thread in enumerate(threads): + thread.join() + print(f'join thread {i} end.') + diff --git a/tests/system-test/eco-system/manager/mul.py b/tests/system-test/eco-system/manager/mul.py new file mode 100644 index 0000000000..d78b63d386 --- /dev/null +++ b/tests/system-test/eco-system/manager/mul.py @@ -0,0 +1,114 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import os +import sys +import random +import time + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + + # prepareEnv + def prepareEnv(self): + self.dbName = "mullevel" + self.stbName = "meters" + self.topicName = "topic" + self.topicNum = 100 + self.loop = 50000 + + sql = f"use {self.dbName}" + tdSql.execute(sql) + + # generate topic sql + self.sqls = [ + f"select * from {self.stbName}", + f"select * from {self.stbName} where ui < 200", + f"select * from {self.stbName} where fc > 20.1", + f"select * from {self.stbName} where nch like '%%a%%'", + f"select * from {self.stbName} where fc > 20.1", + f"select lower(bin) from {self.stbName} where length(bin) < 10;", + f"select upper(bin) from {self.stbName} where length(nch) > 10;", + f"select upper(bin) from {self.stbName} where ti > 10 or ic < 40;", + f"select * from {self.stbName} where ic < 100 " + ] + + + + # prepareEnv + def createTopics(self): + for i in range(self.topicNum): + topicName = f"{self.topicName}{i}" + sql = random.choice(self.sqls) + createSql = f"create topic if not exists {topicName} as {sql}" + try: + tdSql.execute(createSql, 3, True) + except: + tdLog.info(f" create topic {topicName} failed.") + + + # random del topic + def managerTopics(self): + + for i in range(self.loop): + tdLog.info(f"start modify loop={i}") + idx = random.randint(0, self.topicNum - 1) + # delete + topicName = f"{self.topicName}{idx}" + sql = f"drop topic if exist {topicName}" + try: + tdSql.execute(sql, 3, True) + except: + tdLog.info(f" drop topic {topicName} failed.") + + + # create topic + sql = random.choice(self.sqls) + createSql = f"create topic if not exists {topicName} as {sql}" + try: + tdSql.execute(createSql, 3, True) + except: + tdLog.info(f" create topic {topicName} failed.") + + seconds = [0.1, 0.5, 3, 2.5, 1.5, 0.4, 5.2, 2.6, 0.4, 0.2] + time.sleep(random.choice(seconds)) + + + # run + def run(self): + # prepare env + self.prepareEnv() + + # create topic + self.createTopics() + + # modify topic + self.managerTopics() + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())