fix(stream): limit the already processed data.

This commit is contained in:
Haojun Liao 2023-09-10 00:01:25 +08:00
parent 9a30573b8b
commit 85a5c45098
10 changed files with 49 additions and 46 deletions

View File

@ -260,7 +260,7 @@ typedef struct SStreamTaskId {
typedef struct SCheckpointInfo {
int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version
int64_t currentVer; // current offset in WAL, not serialize it
int64_t nextProcessVer; // current offset in WAL, not serialize it
} SCheckpointInfo;
typedef struct SStreamStatus {

View File

@ -86,18 +86,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
} else {
if (pTask->chkInfo.currentVer == -1) {
pTask->chkInfo.currentVer = 0;
if (pTask->chkInfo.nextProcessVer == -1) {
pTask->chkInfo.nextProcessVer = 0;
}
}
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);

View File

@ -834,14 +834,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
// checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) {
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);
@ -1121,7 +1121,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
int64_t dstVer = pTask->dataRange.range.minVer - 1;
pTask->chkInfo.currentVer = dstVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE);
@ -1154,7 +1154,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug(
"s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start "
"ver:%" PRId64 ", window:%" PRId64 " - %" PRId64,
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
id, pTask->chkInfo.nextProcessVer, pWindow->skey, pWindow->ekey);
}
code = streamTaskScanHistoryDataComplete(pTask);
@ -1289,8 +1289,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus;
if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.currentVer);
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
streamProcessRunReq(pTask);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
@ -1429,10 +1429,10 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
}
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory &&

View File

@ -187,25 +187,26 @@ end:
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
int32_t code = -1;
int32_t vgId = TD_VID(pTq->pVnode);
int64_t id = pHandle->pWalReader->readerId;
int64_t offset = *fetchOffset;
int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64,
vgId, offset, lastVer, committedVer, appliedVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64", 0x%"PRIx64,
vgId, offset, lastVer, committedVer, appliedVer, id);
while (offset <= appliedVer) {
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
", no more log to return, reqId:0x%" PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
", no more log to return, reqId:0x%" PRIx64 " 0x%" PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
goto END;
}
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId);
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64" 0x%"PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader);

View File

@ -256,36 +256,36 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
// seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
if (pTask->chkInfo.currentVer < firstVer) {
if (pTask->chkInfo.nextProcessVer < firstVer) {
tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64,
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, firstVer, firstVer);
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, firstVer, firstVer);
pTask->chkInfo.currentVer = firstVer;
pTask->chkInfo.nextProcessVer = firstVer;
// todo need retry if failed
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer);
} else {
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
if (currentVer == -1) { // we only seek the read for the first time
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
return code;
}
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.currentVer);
pTask->chkInfo.nextProcessVer);
}
}
int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
if (skipToVer != 0 && skipToVer > pTask->chkInfo.currentVer) {
if (skipToVer != 0 && skipToVer > pTask->chkInfo.nextProcessVer) {
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
return code;
@ -304,7 +304,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
if (!pTask->status.appendTranstateBlock) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64
", not scan wal anymore, add transfer-state block into inputQ",
id, ver, maxVer);
@ -313,7 +313,7 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) {
/*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask);
/*int32_t code = */ streamSchedExec(pTask);
} else {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal",
id, ver, maxVer);
}
}
@ -421,12 +421,12 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
code = streamTaskPutDataIntoInputQ(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
pTask->chkInfo.currentVer = ver;
pTask->chkInfo.nextProcessVer = ver;
handleFillhistoryScanComplete(pTask, ver);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver);
} else {
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
pTask->chkInfo.currentVer);
pTask->chkInfo.nextProcessVer);
}
}

View File

@ -287,7 +287,7 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks
qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer,
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
streamGetTaskStatusStr(prev));
}

View File

@ -563,11 +563,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SIZE_IN_MB(resSize), totalBlocks);
// update the currentVer if processing the submit blocks.
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver >= pTask->chkInfo.checkpointVer);
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer);
if (ver != pTask->chkInfo.checkpointVer) {
qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 " , currentVer:%" PRId64,
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.currentVer);
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer);
pTask->chkInfo.checkpointVer = ver;
}

View File

@ -504,7 +504,8 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
pHTask->dataRange.range.minVer = 0;
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
// the query version range should be limited to the already processed data
pHTask->dataRange.range.maxVer = pTask->chkInfo.nextProcessVer - 1;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64

View File

@ -380,7 +380,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;
pTask->chkInfo.currentVer = ver;
pTask->chkInfo.nextProcessVer = ver;
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb;

View File

@ -75,6 +75,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
while (fetchVer <= appliedVer) {
if (walFetchHead(pReader, fetchVer) < 0) {
return -1;
@ -257,7 +258,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
bool seeked = false;
wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64", %"PRIx64,
", applied ver:%" PRId64", 0x%"PRIx64,
pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
pRead->pWal->vers.appliedVer, pRead->readerId);
@ -297,7 +298,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
code = walValidHeadCksum(pRead->pHead);
if (code != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, %"PRIx64, pRead->pWal->cfg.vgId, ver,
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed, 0x%"PRIx64, pRead->pWal->cfg.vgId, ver,
pRead->readerId);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
@ -347,11 +348,11 @@ int32_t walFetchBody(SWalReader *pRead) {
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, %"PRIx64,
vgId, pReadHead->version, ver, tstrerror(terrno), pRead->readerId);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s, 0x%"PRIx64,
vgId, pReadHead->version, ver, tstrerror(terrno), id);
} else {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, %"PRIx64,
vgId, pReadHead->version, ver, pRead->readerId);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted, 0x%"PRIx64,
vgId, pReadHead->version, ver, id);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
return -1;