Merge branch '3.0' into fix/TD-24872-3.0

This commit is contained in:
kailixu 2023-07-31 08:42:52 +08:00
commit efa99b8733
29 changed files with 558 additions and 360 deletions

View File

@ -31,7 +31,8 @@ Websocket connections are supported on all platforms that can run Go.
| connector-rust version | TDengine version | major features | | connector-rust version | TDengine version | major features |
| :----------------: | :--------------: | :--------------------------------------------------: | | :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. | | v0.9.2 | 3.0.7.0 or later | STMT: Get tag_fields and col_fields under ws. |
| v0.8.12 | 3.0.5.0 | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.0 | 3.0.4.0 | Support schemaless insert. | | v0.8.0 | 3.0.4.0 | Support schemaless insert. |
| v0.7.6 | 3.0.3.0 | Support req_id in query. | | v0.7.6 | 3.0.3.0 | Support req_id in query. |
| v0.6.0 | 3.0.0.0 | Base features. | | v0.6.0 | 3.0.0.0 | Base features. |

View File

@ -30,7 +30,8 @@ Websocket 连接支持所有能运行 Rust 的平台。
| Rust 连接器版本 | TDengine 版本 | 主要功能 | | Rust 连接器版本 | TDengine 版本 | 主要功能 |
| :----------------: | :--------------: | :--------------------------------------------------: | | :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 | | v0.9.2 | 3.0.7.0 or later | STMTws 下获取 tag_fields、col_fields。 |
| v0.8.12 | 3.0.5.0 | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.8.0 | 3.0.4.0 | 支持无模式写入。 | | v0.8.0 | 3.0.4.0 | 支持无模式写入。 |
| v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 | | v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 |
| v0.6.0 | 3.0.0.0 | 基础功能。 | | v0.6.0 | 3.0.0.0 | 基础功能。 |

View File

@ -45,7 +45,6 @@ enum {
TASK_STATUS__FAIL, TASK_STATUS__FAIL,
TASK_STATUS__STOP, TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal
TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause TASK_STATUS__PAUSE, // pause
}; };

View File

@ -1859,8 +1859,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
if (!pVg->seekUpdated) { if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
if(reqOffset->type != 0) pVg->offsetInfo.beginOffset = *reqOffset; pVg->offsetInfo.beginOffset = *reqOffset;
if(rspOffset->type != 0) pVg->offsetInfo.endOffset = *rspOffset; pVg->offsetInfo.endOffset = *rspOffset;
} else { } else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
} }
@ -1948,7 +1948,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} }
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
} else { } else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
return pRsp; return pRsp;
} else { } else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
@ -2036,7 +2036,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
return pRsp; return pRsp;
} else { } else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);

View File

@ -7227,6 +7227,9 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) {
return pLeft->uid == pRight->uid; return pLeft->uid == pRight->uid;
} else {
uError("offset type:%d", pLeft->type);
ASSERT(0);
} }
} }
return false; return false;

View File

@ -420,6 +420,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){ if(pSub == NULL){
ASSERT(0);
continue; continue;
} }
taosWLockLatch(&pSub->lock); taosWLockLatch(&pSub->lock);
@ -515,7 +516,10 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i); char *topic = taosArrayGetP(pConsumer->currentTopics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
// txn guarantees pSub is created // txn guarantees pSub is created
if(pSub == NULL) continue; if(pSub == NULL) {
ASSERT(0);
continue;
}
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
SMqSubTopicEp topicEp = {0}; SMqSubTopicEp topicEp = {0};
@ -524,6 +528,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2.1 fetch topic schema // 2.1 fetch topic schema
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if(pTopic == NULL) { if(pTopic == NULL) {
ASSERT(0);
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
continue; continue;

View File

@ -1168,7 +1168,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1); colDataSetVal(pColInfo, *numOfRows, (const char *)consumerIdHex, consumerId == -1);
mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), mInfo("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
consumerId, varDataVal(cgroup), pVgEp->vgId); consumerId, varDataVal(cgroup), pVgEp->vgId);
// offset // offset

View File

@ -346,9 +346,9 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64

View File

@ -146,6 +146,20 @@ void tqClose(STQ* pTq) {
return; return;
} }
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter;
int32_t vgId = TD_VID(pTq->pVnode);
if(pHandle->msg != NULL) {
tqPushEmptyDataRsp(pHandle, vgId);
rpcFreeCont(pHandle->msg->pCont);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
pIter = taosHashIterate(pTq->pPushMgr, pIter);
}
tqOffsetClose(pTq->pOffsetStore); tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->pHandle); taosHashCleanup(pTq->pHandle);
taosHashCleanup(pTq->pPushMgr); taosHashCleanup(pTq->pPushMgr);
@ -278,6 +292,10 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
tqInitDataRsp(&dataRsp, &req); tqInitDataRsp(&dataRsp, &req);
dataRsp.blockNum = 0; dataRsp.blockNum = 0;
dataRsp.rspOffset = dataRsp.reqOffset; dataRsp.rspOffset = dataRsp.reqOffset;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId);
tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
return 0; return 0;
@ -515,10 +533,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
while (pIter) { while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter; STqHandle* pHandle = *(STqHandle**)pIter;
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
if (ASSERT(pHandle->msg != NULL)) { if (ASSERT(pHandle->msg != NULL)) {
tqError("pHandle->msg should not be null"); tqError("pHandle->msg should not be null");
taosHashCancelIterate(pTq->pPushMgr, pIter);
break; break;
}else{ }else{
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
@ -849,30 +868,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
if (pHandle->consumerId == req.newConsumerId) { // do nothing if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
req.newConsumerId);
} else { } else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
// atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
} }
// atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
} }
end: end:
@ -1041,9 +1058,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, tqDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
@ -1145,7 +1162,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 3. It's an fill history task, do nothing. wait for the main task to start it // 3. It's an fill history task, do nothing. wait for the main task to start it
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
if (p != NULL) { // reset the downstreamReady flag. if (p != NULL) { // reset the downstreamReady flag.
p->status.downstreamReady = 0;
streamTaskCheckDownstreamTasks(p); streamTaskCheckDownstreamTasks(p);
} }
@ -1154,12 +1170,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
} }
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS; SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
char* msg = pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg;
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
@ -1167,12 +1181,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return -1; return -1;
} }
// do recovery step 1 // do recovery step1
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
int64_t st = taosGetTimestampMs(); if (pTask->tsInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
pTask->tsInfo.step1Start = taosGetTimestampMs();
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
} else {
tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start);
}
// we have to continue retrying to successfully execute the scan history task. // we have to continue retrying to successfully execute the scan history task.
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
@ -1185,31 +1207,21 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
ASSERT(pTask->status.pauseAllowed == false);
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask); ASSERT(pTask->status.pauseAllowed == true);
} }
if (!streamTaskRecoverScanStep1Finished(pTask)) { streamSourceScanHistoryData(pTask);
streamSourceScanHistoryData(pTask); if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
} double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
// disable the pause when handling the step2 scan of tsdb data. TASK_SCHED_STATUS__INACTIVE);
// the whole next procedure cann't be stopped.
// todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode.
if (pTask->info.fillHistory == 1) {
streamTaskDisablePause(pTask);
}
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
streamMetaReleaseTask(pMeta, pTask);
return 0; return 0;
} }
double el = (taosGetTimestampMs() - st) / 1000.0; // the following procedure should be executed, no matter status is stop/pause or not
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el);
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
@ -1217,77 +1229,71 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pStreamTask = NULL; SStreamTask* pStreamTask = NULL;
bool done = false; bool done = false;
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { // 1. get the related stream task
// 1. stop the related stream task, get the current scan wal version of stream task, ver. pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) {
if (pStreamTask == NULL) { // todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr); pTask->streamTaskId.taskId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__DROPPING; tqDebug("s-task:%s fill-history task set status to be dropping", id);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaSaveTask(pMeta, pTask); streamMetaUnregisterTask(pMeta, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return -1; return -1;
}
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
// wait for the stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
taosMsleep(100);
}
// now we can stop the stream task execution
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
} }
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
// stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus));
taosMsleep(100);
}
// now we can stop the stream task execution
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
// if it's an source task, extract the last version in wal.
pRange = &pTask->dataRange.range;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
if (done) { if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
streamTaskEndScanWAL(pTask); streamTaskEndScanWAL(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {
if (!streamTaskRecoverScanStep1Finished(pTask)) { STimeWindow* pWindow = &pTask->dataRange.window;
STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 ", do secondary scan-history from WAL after halt the related stream task:%s",
", do secondary scan-history from WAL after halt the related stream task:%s", id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey,
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); pStreamTask->id.idStr);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
pTask->tsInfo.step2Start = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
}
if (!streamTaskRecoverScanStep2Finished(pTask)) { int64_t dstVer = pTask->dataRange.range.minVer - 1;
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL;
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
int64_t dstVer = pTask->dataRange.range.minVer - 1; pTask->chkInfo.currentVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
pTask->chkInfo.currentVer = dstVer; tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer,
TASK_SCHED_STATUS__INACTIVE);
}
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// set the fill-history task to be normal
if (pTask->info.fillHistory == 1) {
streamSetStatusNormal(pTask);
}
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task. // 5. resume the related stream task.
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
@ -1304,7 +1310,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (pTask->historyTaskId.taskId == 0) { if (pTask->historyTaskId.taskId == 0) {
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; *pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
tqDebug( tqDebug(
"s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time " "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time "
"window:%" PRId64 " - %" PRId64, "window:%" PRId64 " - %" PRId64,
id, pWindow->skey, pWindow->ekey); id, pWindow->skey, pWindow->ekey);
qResetStreamInfoTimeWindow(pTask->exec.pExecutor); qResetStreamInfoTimeWindow(pTask->exec.pExecutor);
@ -1500,7 +1506,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask != NULL) { if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed // even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus; int8_t st = pTask->status.taskStatus;
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) { if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version); pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
@ -1637,7 +1643,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} }
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) { if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartRecoverTask(pTask, igUntreated); streamStartRecoverTask(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);

View File

@ -78,12 +78,12 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
pHandle->msg->contLen = pMsg->contLen; pHandle->msg->contLen = pMsg->contLen;
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
return 0; return 0;
} }
int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { int tqUnregisterPushHandle(STQ* pTq, void *handle) {
STqHandle *pHandle = (STqHandle*)handle; STqHandle *pHandle = (STqHandle*)handle;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
@ -91,7 +91,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
return 0; return 0;
} }
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) { if(pHandle->msg != NULL) {
// tqPushDataRsp(pHandle, vgId); // tqPushDataRsp(pHandle, vgId);

View File

@ -211,7 +211,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64 qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, set the transfer state flag", ", not scan wal anymore, set the transfer state flag",
pTask->id.idStr, ver, pTask->dataRange.range.maxVer); pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
pTask->status.transferState = true; pTask->status.transferState = true;
@ -251,19 +251,19 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t status = pTask->status.taskStatus; int32_t status = pTask->status.taskStatus;
// non-source or fill-history tasks don't need to response the WAL scan action. // non-source or fill-history tasks don't need to response the WAL scan action.
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) { if (status != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); ASSERT(status == TASK_STATUS__NORMAL);
// the maximum version of data in the WAL has reached already, the step2 is done // the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer); pTask->dataRange.range.maxVer);

View File

@ -317,6 +317,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
// the offset value can not be monotonious increase?? // the offset value can not be monotonious increase??
offset = reqOffset; offset = reqOffset;
} else { } else {
uError("req offset type is 0");
return TSDB_CODE_TMQ_INVALID_MSG; return TSDB_CODE_TMQ_INVALID_MSG;
} }

View File

@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
return code; return code;
} }
/*
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = NULL;
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosArrayPush(pLastArray, &lastCol);
}
return code;
}
*/
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0; int32_t code = 0;
// fetch schema // fetch schema
@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *));
taosThreadMutexLock(&pTsdb->lruMutex);
taosThreadMutexLock(&pTsdb->rCache.rMutex); taosThreadMutexLock(&pTsdb->rCache.rMutex);
rocksMayWrite(pTsdb, true, false, false); rocksMayWrite(pTsdb, true, false, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
@ -1137,7 +1090,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_free(values_list[i]); rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]); rocksdb_free(values_list[i + num_keys]);
taosThreadMutexLock(&pTsdb->lruMutex); // taosThreadMutexLock(&pTsdb->lruMutex);
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen); LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
if (h) { if (h) {
@ -1159,7 +1112,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
} }
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen); taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
taosThreadMutexUnlock(&pTsdb->lruMutex); // taosThreadMutexUnlock(&pTsdb->lruMutex);
} }
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]); taosMemoryFree(keys_list[i]);
@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksMayWrite(pTsdb, true, false, true); rocksMayWrite(pTsdb, true, false, true);
taosThreadMutexUnlock(&pTsdb->lruMutex);
_exit: _exit:
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
return code; return code;
} }
/*
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
}
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
*/
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) { int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) {
int32_t code = 0; int32_t code = 0;
STSRow *cacheRow = NULL; STSRow *cacheRow = NULL;
@ -1767,6 +1667,10 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
} }
if (record.version <= pReader->info.verRange.maxVer) { if (record.version <= pReader->info.verRange.maxVer) {
/*
tsdbError("tomb xx load/cache: vgId:%d fid:%d commit %" PRId64 "~%" PRId64 "~%" PRId64 " tomb records",
TD_VID(pReader->pTsdb->pVnode), pReader->pCurFileSet->fid, record.skey, record.ekey, uid);
*/
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pInfo->pTombData, &delData); taosArrayPush(pInfo->pTombData, &delData);
} }
@ -1977,9 +1881,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
goto _err; goto _err;
} }
loadDataTomb(state->pr, state->pr->pFileReader);
state->pr->pCurFileSet = state->pFileSet; state->pr->pCurFileSet = state->pFileSet;
loadDataTomb(state->pr, state->pr->pFileReader);
} }
if (!state->pIndexList) { if (!state->pIndexList) {
@ -2017,6 +1921,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
state->iBrinIndex = indexSize; state->iBrinIndex = indexSize;
} }
if (state->pFileSet != state->pr->pCurFileSet) {
state->pr->pCurFileSet = state->pFileSet;
}
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid, code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
state->pr, state->lastTs, aCols, nCols); state->pr, state->lastTs, aCols, nCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -476,8 +476,8 @@ void vnodeClose(SVnode *pVnode) {
tsem_wait(&pVnode->canCommit); tsem_wait(&pVnode->canCommit);
vnodeSyncClose(pVnode); vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode); vnodeQueryClose(pVnode);
walClose(pVnode->pWal);
tqClose(pVnode->pTq); tqClose(pVnode->pTq);
walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
smaClose(pVnode->pSma); smaClose(pVnode->pSma);
if (pVnode->pMeta) metaClose(&pVnode->pMeta); if (pVnode->pMeta) metaClose(&pVnode->pMeta);

View File

@ -341,6 +341,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
return NULL; return NULL;
} }
qResetStreamInfoTimeWindow(pTaskInfo);
return pTaskInfo; return pTaskInfo;
} }

View File

@ -1550,10 +1550,86 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
} }
} }
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) { static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
ASSERT(pCol->pData != NULL);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
} else if (pWindow->ekey != INT64_MAX) {
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts <= pWindow->ekey);
if (!p[i]) {
hasUnqualified = true;
}
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
if (skey == INT64_MIN) {
return;
}
int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}
if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p);
}
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp; SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
const char* id = GET_TASKID(pTaskInfo);
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
@ -1593,7 +1669,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
pBlockInfo->rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); pBlockInfo->rows, id, &pTableScanInfo->base.metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure. // ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
@ -1608,8 +1684,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
} }
// filter the block extracted from WAL files, according to the time window apply additional time window filter
doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id);
pInfo->pRes->info.dataLoad = 1; pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pInfo->pRes->info.rows == 0) {
return 0;
}
calBlockTbName(pInfo, pInfo->pRes); calBlockTbName(pInfo, pInfo->pRes);
return 0; return 0;
@ -1666,7 +1748,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version); pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, pRes, true); STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
} }
@ -1775,80 +1858,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
} }
} }
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
} else if (pWindow->ekey != INT64_MAX) {
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts <= pWindow->ekey);
if (!p[i]) {
hasUnqualified = true;
}
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
if (skey == INT64_MIN) {
return;
}
int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}
if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p);
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -2121,8 +2130,7 @@ FETCH_NEXT_BLOCK:
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
SSDataBlock* pBlock = pInfo->pRes; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SDataBlockInfo* pBlockInfo = &pBlock->info;
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
NEXT_SUBMIT_BLK: NEXT_SUBMIT_BLK:
@ -2146,21 +2154,23 @@ FETCH_NEXT_BLOCK:
} }
} }
blockDataCleanup(pBlock); blockDataCleanup(pInfo->pRes);
while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) { while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id);
id);
if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
qDebug("retrieve data failed, try next block in submit block, %s", id); qDebug("retrieve data failed, try next block in submit block, %s", id);
continue; continue;
} }
setBlockIntoRes(pInfo, pRes, false); setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
if (pInfo->pRes->info.rows == 0) {
continue;
}
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
@ -2168,13 +2178,8 @@ FETCH_NEXT_BLOCK:
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
// apply additional time window filter doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
@ -2196,7 +2201,7 @@ FETCH_NEXT_BLOCK:
qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id); qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
return pBlock; return pInfo->pRes;
} }
if (pInfo->pUpdateDataRes->info.rows > 0) { if (pInfo->pUpdateDataRes->info.rows > 0) {
@ -2587,7 +2592,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
pInfo->igExpired = pTableScanNode->igExpired; pInfo->igExpired = pTableScanNode->igExpired;
pInfo->twAggSup.maxTs = INT64_MIN; pInfo->twAggSup.maxTs = INT64_MIN;
pInfo->pState = NULL; pInfo->pState = pTaskInfo->streamInfo.pState;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;

View File

@ -3736,7 +3736,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
if (winNum > 0) { if (winNum > 0) {
saveSessionOutputBuf(pAggSup, &winInfo);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(winInfo, pInfo->pStUpdated); saveResult(winInfo, pInfo->pStUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -3747,9 +3746,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
getSessionHashKey(&winInfo.sessionWin, &key); getSessionHashKey(&winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
} }
} else {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)winInfo.pOutputBuf, &pAggSup->stateStore);
} }
saveSessionOutputBuf(pAggSup, &winInfo);
} }
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
@ -4398,7 +4396,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated); compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(curInfo.winInfo, pInfo->pSeUpdated); saveResult(curInfo.winInfo, pInfo->pSeUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -4409,14 +4406,12 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
getSessionHashKey(&curInfo.winInfo.sessionWin, &key); getSessionHashKey(&curInfo.winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo));
} }
} else if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
} }
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
}
if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
} }
} }
taosMemoryFree(pBuf); taosMemoryFree(pBuf);

View File

@ -118,6 +118,12 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request); void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request);
SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable); SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable);
/**
* @brief return a - b with overflow check
* @retval val range between [INT64_MIN, INT64_MAX]
*/
int64_t int64SafeSub(int64_t a, int64_t b);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -3296,23 +3296,25 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode*
if (NULL == pInterval) { if (NULL == pInterval) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t timeRange = 0;
int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey);
int64_t intervalRange = 0; int64_t intervalRange = 0;
if (IS_CALENDAR_TIME_DURATION(pInterval->unit)) { if (!pCxt->createStream) {
int64_t f = 1; int64_t res = int64SafeSub(pFill->timeRange.skey, pFill->timeRange.ekey);
if (pInterval->unit == 'n') { timeRange = res < 0 ? res == INT64_MIN ? INT64_MAX : -res : res;
f = 30LL * MILLISECOND_PER_DAY; if (IS_CALENDAR_TIME_DURATION(pInterval->unit)) {
} else if (pInterval->unit == 'y') { int64_t f = 1;
f = 365LL * MILLISECOND_PER_DAY; if (pInterval->unit == 'n') {
f = 30LL * MILLISECOND_PER_DAY;
} else if (pInterval->unit == 'y') {
f = 365LL * MILLISECOND_PER_DAY;
}
intervalRange = pInterval->datum.i * f;
} else {
intervalRange = pInterval->datum.i;
}
if ((timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
} }
intervalRange = pInterval->datum.i * f;
} else {
intervalRange = pInterval->datum.i;
}
if ((timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -1142,3 +1142,18 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
taosHashCleanup(pMetaCache->pTableIndex); taosHashCleanup(pMetaCache->pTableIndex);
taosHashCleanup(pMetaCache->pTableCfg); taosHashCleanup(pMetaCache->pTableCfg);
} }
int64_t int64SafeSub(int64_t a, int64_t b) {
int64_t res = (uint64_t)a - (uint64_t)b;
if (a >= 0 && b < 0) {
if ((uint64_t)res > (uint64_t)INT64_MAX) {
// overflow
res = INT64_MAX;
}
} else if (a < 0 && b > 0 && res >= 0) {
// underflow
res = INT64_MIN;
}
return res;
}

View File

@ -379,7 +379,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return -1; return -1;
} }
qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock((SStreamDataBlock*) pItem); destroyStreamDataBlock((SStreamDataBlock*) pItem);

View File

@ -172,6 +172,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
bool finished = false; bool finished = false;
while (1) { while (1) {
if (streamTaskShouldPause(&pTask->status)) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
return 0;
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) { if (pRes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -404,6 +410,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
streamTaskResumeFromHalt(pStreamTask); streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
@ -414,6 +422,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// save to disk // save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask); streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
@ -615,7 +624,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
// todo the task should be commit here // todo the task should be commit here
if (taosQueueEmpty(pTask->inputQueue->queue)) { if (taosQueueEmpty(pTask->inputQueue->queue)) {
// fill-history WAL scan has completed // fill-history WAL scan has completed
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) {
streamTaskRecoverSetAllStepFinished(pTask); streamTaskRecoverSetAllStepFinished(pTask);
streamTaskEndScanWAL(pTask); streamTaskEndScanWAL(pTask);
} else { } else {

View File

@ -85,6 +85,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask); streamSetParamForScanHistory(pTask);
} }
streamTaskEnablePause(pTask);
streamTaskScanHistoryPrepare(pTask); streamTaskScanHistoryPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
@ -839,7 +840,7 @@ void streamTaskPause(SStreamTask* pTask) {
return; return;
} }
while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
status = pTask->status.taskStatus; status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) { if (status == TASK_STATUS__DROPPING) {
qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
@ -856,8 +857,19 @@ void streamTaskPause(SStreamTask* pTask) {
taosMsleep(100); taosMsleep(100);
} }
// todo: use the lock of the task.
taosWLockLatch(&pMeta->lock);
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
taosWUnLockLatch(&pMeta->lock);
qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
taosWUnLockLatch(&pMeta->lock);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,

View File

@ -5,6 +5,13 @@ if (DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions(-DGRANTS_CFG) add_definitions(-DGRANTS_CFG)
endif() endif()
IF (${ASSERT_NOT_CORE})
ADD_DEFINITIONS(-DASSERT_NOT_CORE)
MESSAGE(STATUS "disable assert core")
ELSE ()
MESSAGE(STATUS "enable assert core")
ENDIF (${ASSERT_NOT_CORE})
target_include_directories( target_include_directories(
util util
PUBLIC "${TD_SOURCE_DIR}/include/util" PUBLIC "${TD_SOURCE_DIR}/include/util"

View File

@ -76,7 +76,11 @@ static int32_t tsDaylightActive; /* Currently in daylight saving time. */
bool tsLogEmbedded = 0; bool tsLogEmbedded = 0;
bool tsAsyncLog = true; bool tsAsyncLog = true;
#ifdef ASSERT_NOT_CORE
bool tsAssert = false;
#else
bool tsAssert = true; bool tsAssert = true;
#endif
int32_t tsNumOfLogLines = 10000000; int32_t tsNumOfLogLines = 10000000;
int32_t tsLogKeepDays = 0; int32_t tsLogKeepDays = 0;
LogFp tsLogFp = NULL; LogFp tsLogFp = NULL;

View File

@ -105,7 +105,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDnodeRestart.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
@ -450,7 +450,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3 ,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 #,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5

View File

@ -222,9 +222,9 @@ class TDTestCase:
actConsumeTotalRows = resultList[0] actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.info("and second consume rows should be between [0 and %d]"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10) time.sleep(10)

View File

@ -0,0 +1,104 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
#
# The option for wal_retetion_period and wal_retention_size is work well
#
import taos
from taos.tmq import Consumer
import os
import sys
import threading
import json
import time
import random
from datetime import date
from datetime import datetime
from datetime import timedelta
from os import path
topicName = "topic"
topicNum = 100
# consume topic
def consume_topic(topic_name, group,consume_cnt, index, wait):
consumer = Consumer(
{
"group.id": group,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
print(f"start consumer topic:{topic_name} group={group} index={index} ...")
consumer.subscribe([topic_name])
cnt = 0
try:
while True and cnt < consume_cnt:
res = consumer.poll(1)
if not res:
if wait:
continue
else:
break
err = res.error()
if err is not None:
raise err
val = res.value()
cnt += 1
print(f" consume {cnt} ")
for block in val:
datas = block.fetchall()
data = datas[0][:50]
print(f" {topic_name}_{group}_{index} {cnt} {data}")
finally:
consumer.unsubscribe()
consumer.close()
def consumerThread(index):
global topicName, topicNum
print(f' thread {index} start...')
while True:
idx = random.randint(0, topicNum - 1)
name = f"{topicName}{idx}"
group = f"group_{index}_{idx}"
consume_topic(name, group, 100, index, True)
if __name__ == "__main__":
print(sys.argv)
threadCnt = 10
if len(sys.argv) == 1:
threadCnt = int(sys.argv[1])
threads = []
print(f'consumer with {threadCnt} threads...')
for i in range(threadCnt):
x = threading.Thread(target=consumerThread, args=(i,))
x.start()
threads.append(x)
# wait
for i, thread in enumerate(threads):
thread.join()
print(f'join thread {i} end.')

View File

@ -0,0 +1,114 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import sys
import random
import time
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
# prepareEnv
def prepareEnv(self):
self.dbName = "mullevel"
self.stbName = "meters"
self.topicName = "topic"
self.topicNum = 100
self.loop = 50000
sql = f"use {self.dbName}"
tdSql.execute(sql)
# generate topic sql
self.sqls = [
f"select * from {self.stbName}",
f"select * from {self.stbName} where ui < 200",
f"select * from {self.stbName} where fc > 20.1",
f"select * from {self.stbName} where nch like '%%a%%'",
f"select * from {self.stbName} where fc > 20.1",
f"select lower(bin) from {self.stbName} where length(bin) < 10;",
f"select upper(bin) from {self.stbName} where length(nch) > 10;",
f"select upper(bin) from {self.stbName} where ti > 10 or ic < 40;",
f"select * from {self.stbName} where ic < 100 "
]
# prepareEnv
def createTopics(self):
for i in range(self.topicNum):
topicName = f"{self.topicName}{i}"
sql = random.choice(self.sqls)
createSql = f"create topic if not exists {topicName} as {sql}"
try:
tdSql.execute(createSql, 3, True)
except:
tdLog.info(f" create topic {topicName} failed.")
# random del topic
def managerTopics(self):
for i in range(self.loop):
tdLog.info(f"start modify loop={i}")
idx = random.randint(0, self.topicNum - 1)
# delete
topicName = f"{self.topicName}{idx}"
sql = f"drop topic if exist {topicName}"
try:
tdSql.execute(sql, 3, True)
except:
tdLog.info(f" drop topic {topicName} failed.")
# create topic
sql = random.choice(self.sqls)
createSql = f"create topic if not exists {topicName} as {sql}"
try:
tdSql.execute(createSql, 3, True)
except:
tdLog.info(f" create topic {topicName} failed.")
seconds = [0.1, 0.5, 3, 2.5, 1.5, 0.4, 5.2, 2.6, 0.4, 0.2]
time.sleep(random.choice(seconds))
# run
def run(self):
# prepare env
self.prepareEnv()
# create topic
self.createTopics()
# modify topic
self.managerTopics()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())