fix(stream): update the step2 scan wal files.

This commit is contained in:
Haojun Liao 2023-07-27 16:10:55 +08:00
parent c54bd55c68
commit 6ff50d4eaf
6 changed files with 29 additions and 36 deletions

View File

@ -310,7 +310,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
int64_t ver = pReader->pHead->head.version; int64_t ver = pReader->pHead->head.version;
if (ver > maxVer) { if (ver > maxVer) {
tqDebug("maxVer in WAL:%"PRId64" reached, do not scan wal anymore, %s", maxVer, id); tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -209,6 +209,17 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64
", not scan wal anymore, set the transfer state flag",
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
pTask->status.transferState = true;
/*int32_t code = */streamSchedExec(pTask);
}
}
int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true; *pScanIdle = true;
bool noDataInWal = true; bool noDataInWal = true;
@ -251,6 +262,15 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
if (tInputQueueIsFull(pTask)) { if (tInputQueueIsFull(pTask)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
@ -259,17 +279,6 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = false; *pScanIdle = false;
if (pTask->info.fillHistory == 1) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL);
// the maximum version of data in the WAL has reached already, the step2 is done
if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) {
qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore",
pTask->id.idStr, pTask->chkInfo.currentVer);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
}
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int32_t code = doSetOffsetForWalReader(pTask, vgId); int32_t code = doSetOffsetForWalReader(pTask, vgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -284,6 +293,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr); code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr);
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue
checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
} }
@ -294,18 +304,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
pTask->chkInfo.currentVer = ver; pTask->chkInfo.currentVer = ver;
checkForFillHistoryVerRange(pTask, ver);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver);
{
if (pTask->info.fillHistory == 1) {
// the maximum version of data in the WAL has reached already, the step2 is done
if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) {
qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore, set the transfer state flag",
pTask->id.idStr, pTask->chkInfo.currentVer);
pTask->status.transferState = true;
}
}
}
} else { } else {
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
pTask->chkInfo.currentVer); pTask->chkInfo.currentVer);

View File

@ -892,7 +892,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->recoverStep1Finished = false; pStreamInfo->recoverStep1Finished = false;
pStreamInfo->recoverStep2Finished = false; pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 1. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
" - %" PRId64, " - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey); pWindow->ekey);
@ -911,7 +911,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->recoverStep1Finished = true; pStreamInfo->recoverStep1Finished = true;
pStreamInfo->recoverStep2Finished = false; pStreamInfo->recoverStep2Finished = false;
qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
", window:%" PRId64 " - %" PRId64, ", window:%" PRId64 " - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey); pWindow->ekey);

View File

@ -1793,7 +1793,7 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW
} }
} }
} else if (pWindow->ekey != INT64_MAX) { } else if (pWindow->ekey != INT64_MAX) {
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->skey); qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i); int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts <= pWindow->ekey); p[i] = (*ts <= pWindow->ekey);

View File

@ -355,8 +355,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
pTask->status.transferState = false; // reset this value, to avoid transfer state again // todo: destroy this task here
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
pTask->streamTaskId.taskId); pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
@ -510,13 +509,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
if (pInput == NULL) { if (pInput == NULL) {
ASSERT(batchSize == 0); ASSERT(batchSize == 0);
// if (pTask->info.fillHistory && pTask->status.transferState) {
// int32_t code = streamTransferStateToStreamTask(pTask);
// if (code != TSDB_CODE_SUCCESS) { // todo handle this
// return 0;
// }
// }
break; break;
} }

View File

@ -844,7 +844,8 @@ void streamTaskPause(SStreamTask* pTask) {
return; return;
} }
qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId); const char* pStatus = streamGetTaskStatusStr(status);
qDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);
taosMsleep(100); taosMsleep(100);
} }