fix(stream): refactor the step2 wal scan.
This commit is contained in:
parent
e89f530db2
commit
40fe3ef4f6
|
@ -588,6 +588,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
||||
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
||||
bool streamTaskIsIdle(const SStreamTask* pTask);
|
||||
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
|
||||
|
||||
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||
|
@ -605,7 +606,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
|
||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
|
||||
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
|
||||
|
||||
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
|
||||
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
|
||||
|
|
|
@ -241,7 +241,7 @@ bool tqNextBlockImpl(STqReader *pReader, const char *idstr);
|
|||
SWalReader* tqGetWalReader(STqReader* pReader);
|
||||
SSDataBlock* tqGetResultBlock (STqReader* pReader);
|
||||
|
||||
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, const char *id);
|
||||
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id);
|
||||
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
|
||||
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
|
||||
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock** pRes, const char* idstr);
|
||||
|
|
|
@ -1128,6 +1128,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (pTask->info.fillHistory) {
|
||||
SVersionRange* pRange = NULL;
|
||||
SStreamTask* pStreamTask = NULL;
|
||||
bool done = false;
|
||||
|
||||
if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
|
||||
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
|
||||
|
@ -1157,58 +1158,55 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
// now we can stop the stream task execution
|
||||
streamTaskHalt(pStreamTask);
|
||||
tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel,
|
||||
id);
|
||||
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
|
||||
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
|
||||
|
||||
// if it's an source task, extract the last version in wal.
|
||||
pRange = &pTask->dataRange.range;
|
||||
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
||||
streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
||||
done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer);
|
||||
}
|
||||
|
||||
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||
", do secondary scan-history data after halt the related stream task:%s",
|
||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
|
||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||
|
||||
if (done) {
|
||||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
||||
}
|
||||
streamTaskEndScanWAL(pTask);
|
||||
} else {
|
||||
if (!streamTaskRecoverScanStep1Finished(pTask)) {
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||
", do secondary scan-history data after halt the related stream task:%s",
|
||||
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
|
||||
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
|
||||
|
||||
if (!streamTaskRecoverScanStep2Finished(pTask)) {
|
||||
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL;
|
||||
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
|
||||
}
|
||||
|
||||
int64_t dstVer = pTask->dataRange.range.minVer - 1;
|
||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
||||
tqDebug("s-task:%s seek wal reader to ver:%"PRId64, id, dstVer);
|
||||
if (!streamTaskRecoverScanStep2Finished(pTask)) {
|
||||
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL;
|
||||
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
|
||||
tqDebug("s-task:%s is dropped or paused, abort recover in step1", id);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int64_t dstVer = pTask->dataRange.range.minVer - 1;
|
||||
|
||||
pTask->chkInfo.currentVer = dstVer;
|
||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
|
||||
tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer,
|
||||
TASK_SCHED_STATUS__INACTIVE);
|
||||
}
|
||||
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
|
||||
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
||||
// 5. resume the related stream task.
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
|
||||
tqStartStreamTasks(pTq);
|
||||
}
|
||||
|
||||
// int64_t el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
||||
// tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||
//
|
||||
// // 3. notify downstream tasks to transfer executor state after handle all history blocks.
|
||||
// if (!pTask->status.transferState) {
|
||||
// code = streamDispatchTransferStateMsg(pTask);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// // todo handle error
|
||||
// }
|
||||
//
|
||||
// pTask->status.transferState = true;
|
||||
// }
|
||||
|
||||
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
||||
// 5. resume the related stream task.
|
||||
streamTryExec(pTask);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
} else {
|
||||
// todo update the chkInfo version for current task.
|
||||
// this task has an associated history stream task, so we need to scan wal from the end version of
|
||||
|
@ -1218,7 +1216,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (pTask->historyTaskId.taskId == 0) {
|
||||
*pWindow = (STimeWindow){INT64_MIN, INT64_MAX};
|
||||
tqDebug(
|
||||
"s-task:%s scan history in stream time window completed, no related fill history task, reset the time "
|
||||
"s-task:%s scan history in stream time window completed, no related fill-history task, reset the time "
|
||||
"window:%" PRId64 " - %" PRId64,
|
||||
id, pWindow->skey, pWindow->ekey);
|
||||
} else {
|
||||
|
|
|
@ -302,13 +302,17 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
|
||||
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
|
||||
int32_t code = walNextValidMsg(pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int64_t ver = pReader->pHead->head.version;
|
||||
if (ver > maxVer) {
|
||||
tqDebug("maxVer in WAL:%"PRId64" reached, do not scan wal anymore, %s", maxVer, id);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||
|
|
|
@ -38,9 +38,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
|
|||
if (shouldIdle) {
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
||||
pMeta->walScanCounter -= 1;
|
||||
times = pMeta->walScanCounter;
|
||||
|
||||
times = (--pMeta->walScanCounter);
|
||||
ASSERT(pMeta->walScanCounter >= 0);
|
||||
|
||||
if (pMeta->walScanCounter <= 0) {
|
||||
|
@ -242,7 +240,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
int32_t status = pTask->status.taskStatus;
|
||||
|
||||
// non-source or fill-history tasks don't need to response the WAL scan action.
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE || pTask->info.fillHistory == 1) {
|
||||
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
@ -280,10 +278,10 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
}
|
||||
|
||||
int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue);
|
||||
int64_t maxVer = (pTask->info.fillHistory == 1)? pTask->dataRange.range.maxVer:INT64_MAX;
|
||||
|
||||
// append the data for the stream
|
||||
SStreamQueueItem* pItem = NULL;
|
||||
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
|
||||
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr);
|
||||
|
||||
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
|
@ -297,6 +295,17 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||
pTask->chkInfo.currentVer = ver;
|
||||
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver);
|
||||
|
||||
{
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
// the maximum version of data in the WAL has reached already, the step2 is done
|
||||
if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) {
|
||||
qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore, set the transfer state flag",
|
||||
pTask->id.idStr, pTask->chkInfo.currentVer);
|
||||
pTask->status.transferState = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
|
||||
pTask->chkInfo.currentVer);
|
||||
|
|
|
@ -1775,19 +1775,32 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
|||
}
|
||||
|
||||
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
|
||||
if (pWindow->skey != INT64_MIN) {
|
||||
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);
|
||||
|
||||
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
|
||||
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
||||
bool hasUnqualified = false;
|
||||
bool hasUnqualified = false;
|
||||
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
int64_t* ts = (int64_t*) colDataGetData(pCol, i);
|
||||
p[i] = (*ts >= pWindow->skey);
|
||||
|
||||
if (!p[i]) {
|
||||
hasUnqualified = true;
|
||||
if (pWindow->skey != INT64_MIN) {
|
||||
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
|
||||
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
|
||||
p[i] = (*ts >= pWindow->skey);
|
||||
|
||||
if (!p[i]) {
|
||||
hasUnqualified = true;
|
||||
}
|
||||
}
|
||||
} else if (pWindow->ekey != INT64_MAX) {
|
||||
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->skey);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
|
||||
p[i] = (*ts <= pWindow->ekey);
|
||||
|
||||
if (!p[i]) {
|
||||
hasUnqualified = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -510,12 +510,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
|
||||
if (pInput == NULL) {
|
||||
ASSERT(batchSize == 0);
|
||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||
int32_t code = streamTransferStateToStreamTask(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
// if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||
// int32_t code = streamTransferStateToStreamTask(pTask);
|
||||
// if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||
// return 0;
|
||||
// }
|
||||
// }
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -584,6 +584,28 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
|
|||
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
|
||||
}
|
||||
|
||||
int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||
|
||||
// 3. notify downstream tasks to transfer executor state after handle all history blocks.
|
||||
pTask->status.transferState = true;
|
||||
|
||||
int32_t code = streamDispatchTransferStateMsg(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo handle error
|
||||
}
|
||||
|
||||
// the last execution of fill-history task, in order to transfer task operator states.
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle this
|
||||
return code;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamTryExec(SStreamTask* pTask) {
|
||||
// this function may be executed by multi-threads, so status check is required.
|
||||
int8_t schedStatus =
|
||||
|
@ -600,27 +622,11 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
|
||||
// todo the task should be commit here
|
||||
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL &&
|
||||
pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) {
|
||||
// fill-history WAL scan has completed
|
||||
// fill-history WAL scan has completed
|
||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) {
|
||||
streamTaskRecoverSetAllStepFinished(pTask);
|
||||
|
||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||
|
||||
// 3. notify downstream tasks to transfer executor state after handle all history blocks.
|
||||
if (!pTask->status.transferState) {
|
||||
code = streamDispatchTransferStateMsg(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo handle error
|
||||
}
|
||||
|
||||
pTask->status.transferState = true;
|
||||
}
|
||||
|
||||
// the last execution of fill-history task, in order to transfer task operator states.
|
||||
code = streamExecForAll(pTask);
|
||||
|
||||
streamTaskEndScanWAL(pTask);
|
||||
} else {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
pTask->status.schedStatus);
|
||||
|
|
|
@ -284,7 +284,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
|
|||
|
||||
// common
|
||||
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
|
||||
qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr);
|
||||
qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
|
||||
return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
|
||||
}
|
||||
|
||||
|
@ -507,7 +507,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
|
|||
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
|
||||
qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
|
||||
" ver range:%" PRId64 " - %" PRId64,
|
||||
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
|
||||
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
|
||||
|
@ -654,7 +654,7 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) {
|
|||
return qStreamRecoverSetAllStepFinished(exec);
|
||||
}
|
||||
|
||||
void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
||||
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
||||
SVersionRange* pRange = &pTask->dataRange.range;
|
||||
ASSERT(latestVer >= pRange->maxVer);
|
||||
|
||||
|
@ -663,13 +663,16 @@ void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
|
|||
// no input data yet. no need to execute the secondardy scan while stream task halt
|
||||
streamTaskRecoverSetAllStepFinished(pTask);
|
||||
qDebug(
|
||||
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during secondary scan",
|
||||
pTask->id.idStr);
|
||||
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
|
||||
"related stream task currentVer:%" PRId64,
|
||||
pTask->id.idStr, latestVer);
|
||||
return true;
|
||||
} else {
|
||||
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to
|
||||
// [pTask->dataRange.range.maxVer, ver1]
|
||||
pRange->minVer = nextStartVer;
|
||||
pRange->maxVer = latestVer - 1;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue