refactor: do some internal refactor.
This commit is contained in:
parent
f8d0c52483
commit
719d1d1b90
|
@ -886,7 +886,8 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
||||||
pTask->execInfo.step2Start = taosGetTimestampMs();
|
pTask->execInfo.step2Start = taosGetTimestampMs();
|
||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer,
|
||||||
|
pRange->maxVer, 0.0);
|
||||||
streamTaskPutTranstateIntoInputQ(pTask);
|
streamTaskPutTranstateIntoInputQ(pTask);
|
||||||
streamExecTask(pTask); // exec directly
|
streamExecTask(pTask); // exec directly
|
||||||
} else {
|
} else {
|
||||||
|
@ -1141,8 +1142,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
|
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed", vgId, req.taskId);
|
||||||
req.taskId);
|
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
|
|
|
@ -465,7 +465,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||||
while (pReader->nextBlk < numOfBlocks) {
|
while (pReader->nextBlk < numOfBlocks) {
|
||||||
tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
|
tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
|
||||||
pReader->nextBlk, numOfBlocks, idstr);
|
(pReader->nextBlk + 1), numOfBlocks, idstr);
|
||||||
|
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
if (pReader->tbIdHash == NULL) {
|
if (pReader->tbIdHash == NULL) {
|
||||||
|
|
|
@ -2155,7 +2155,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
|
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
|
||||||
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
|
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
|
||||||
pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
|
pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
|
||||||
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
|
qDebug("stream scan step2 (scan wal), verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
|
||||||
pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
|
pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
|
||||||
pTSInfo->base.cond.twindows.ekey, id);
|
pTSInfo->base.cond.twindows.ekey, id);
|
||||||
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
|
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||||
|
|
|
@ -340,7 +340,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
|
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s fill-history task end, scal wal elapsed time:%.2fSec,update related stream task:%s info, transfer "
|
"s-task:%s fill-history task end, scan wal elapsed time:%.2fSec,update related stream task:%s info, transfer "
|
||||||
"exec state",
|
"exec state",
|
||||||
id, el, pStreamTask->id.idStr);
|
id, el, pStreamTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
@ -380,22 +380,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 1. expand the query time window for stream task of WAL scanner
|
||||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
// update the scan data range for source task.
|
// update the scan data range for source task.
|
||||||
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
||||||
", status:%s, sched-status:%d",
|
", status:%s, sched-status:%d",
|
||||||
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
||||||
pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
|
pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
|
||||||
} else {
|
|
||||||
stDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1. expand the query time window for stream task of WAL scanner
|
|
||||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
pTimeWindow->skey = INT64_MIN;
|
pTimeWindow->skey = INT64_MIN;
|
||||||
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s non-source task no need to reset filter window", pStreamTask->id.idStr);
|
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. transfer the ownership of executor state
|
// 2. transfer the ownership of executor state
|
||||||
|
|
|
@ -305,7 +305,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead) {
|
int32_t walSkipFetchBody(SWalReader *pRead) {
|
||||||
wDebug("vgId:%d, skip fetch body:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64
|
wDebug("vgId:%d, skip:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64
|
||||||
", applied:%" PRId64 ", 0x%" PRIx64,
|
", applied:%" PRId64 ", 0x%" PRIx64,
|
||||||
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||||
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);
|
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);
|
||||||
|
|
Loading…
Reference in New Issue