fix(stream): add some logs and set the wal offset to be the ended version when finishing the step2 recover stage.

This commit is contained in:
Haojun Liao 2023-05-17 10:43:23 +08:00
parent e35d145734
commit f3f52c0232
4 changed files with 17 additions and 7 deletions

View File

@ -853,7 +853,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
memcpy(serializedReq, &req, len);
// dispatch msg
tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
tqDebug("s-task:%s start to recover blocking stage", pTask->id.idStr);
SRpcMsg rpcMsg = {
.code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
@ -877,6 +877,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
return -1;
}
qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
walReaderSeekVer(pTask->exec.pWalReader, sversion);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;

View File

@ -1792,7 +1792,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream scan called");
qDebug("stream scan started, %s", GET_TASKID(pTaskInfo));
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
@ -1801,13 +1801,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
pTSInfo->base.cond.startVersion = 0;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
} else {
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
}

View File

@ -60,7 +60,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data;

View File

@ -224,12 +224,19 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req*
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.pExecutor;
const char* id = pTask->id.idStr;
qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr);
int64_t st = taosGetTimestampMs();
qDebug("s-task:%s recover step2(blocking stage) started", id);
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
}
return streamScanExec(pTask, 100);
int32_t code = streamScanExec(pTask, 100);
double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s recover step2(blocking stage) ended, elapsed time:%.2fs", id, el);
return code;
}
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {