fix(stream): fix msg lost bug.

This commit is contained in:
Haojun Liao 2023-09-09 15:08:36 +08:00
parent 47fd144447
commit 6f5c855a4b
3 changed files with 13 additions and 15 deletions

View File

@ -329,7 +329,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, void* data) {
code = doBuildSubmitFromResBlock(pVnode, i, stbFullName, suid, pDataBlock, pTask, &tbData);
taosArrayPush(submitReq.aSubmitTbData, &tbData);
code = doBuildSubmitAndSendMsg(pVnode, pTask, numOfBlocks, &submitReq);
code = doBuildSubmitAndSendMsg(pVnode, pTask, 1, &submitReq);
}
}
} else {

View File

@ -396,25 +396,23 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t numOfItems = streamTaskGetInputQItems(pTask);
int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX;
taosThreadMutexLock(&pTask->lock);
pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
SStreamQueueItem* pItem = NULL;
code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr);
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue
handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader));
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
taosThreadMutexLock(&pTask->lock);
pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask);
if (pItem != NULL) {
streamFreeQitem(pItem);
}
continue;
}

View File

@ -566,8 +566,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver >= pTask->chkInfo.checkpointVer);
if (ver != pTask->chkInfo.checkpointVer) {
qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr,
pTask->chkInfo.checkpointVer, ver);
qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 " , currentVer:%" PRId64,
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.currentVer);
pTask->chkInfo.checkpointVer = ver;
}