fix(stream): set correct checkpoint version when starting stream tasks.
This commit is contained in:
parent
571cccd737
commit
09b764494d
|
@ -259,7 +259,7 @@ typedef struct SStreamId {
|
||||||
|
|
||||||
typedef struct SCheckpointInfo {
|
typedef struct SCheckpointInfo {
|
||||||
int64_t keptCheckpointId;
|
int64_t keptCheckpointId;
|
||||||
int64_t version; // offset in WAL
|
int64_t version; // latest checkpointId version
|
||||||
int64_t currentVer; // current offset in WAL, not serialize it
|
int64_t currentVer; // current offset in WAL, not serialize it
|
||||||
} SCheckpointInfo;
|
} SCheckpointInfo;
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,6 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
pTask->pMsgCb = &pSnode->msgCb;
|
pTask->pMsgCb = &pSnode->msgCb;
|
||||||
pTask->chkInfo.version = ver;
|
|
||||||
pTask->pMeta = pSnode->pMeta;
|
pTask->pMeta = pSnode->pMeta;
|
||||||
|
|
||||||
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
||||||
|
|
|
@ -757,7 +757,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||||
pTask->pMeta = pTq->pStreamMeta;
|
pTask->pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
pTask->chkInfo.version = ver;
|
|
||||||
pTask->chkInfo.currentVer = ver;
|
pTask->chkInfo.currentVer = ver;
|
||||||
|
|
||||||
pTask->dataRange.range.maxVer = ver;
|
pTask->dataRange.range.maxVer = ver;
|
||||||
|
@ -855,14 +854,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
|
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||||
|
|
||||||
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64
|
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
|
||||||
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
|
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
|
||||||
vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel,
|
vgId, pTask->id.idStr, pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer,
|
||||||
pTask->info.fillHistory, pTask->triggerParam);
|
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
|
||||||
|
|
||||||
// next valid version will add one
|
|
||||||
pTask->chkInfo.version += 1;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -118,7 +118,6 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i
|
||||||
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
|
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
|
||||||
if (old == 0) {
|
if (old == 0) {
|
||||||
qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
|
qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
|
||||||
pTask->checkpointingId = checkpointId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
|
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
|
||||||
|
@ -207,6 +206,7 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
|
||||||
int32_t childId = pReq->childId;
|
int32_t childId = pReq->childId;
|
||||||
|
|
||||||
// set the task status
|
// set the task status
|
||||||
|
pTask->checkpointingId = checkpointId;
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK;
|
pTask->status.taskStatus = TASK_STATUS__CK;
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
|
||||||
|
|
||||||
|
|
|
@ -295,21 +295,6 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId) {
|
|
||||||
int64_t ckId = 0;
|
|
||||||
int64_t dataVer = 0;
|
|
||||||
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
|
|
||||||
|
|
||||||
SCheckpointInfo* pCkInfo = &pTask->chkInfo;
|
|
||||||
// qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64
|
|
||||||
// " -> %" PRId64,
|
|
||||||
// pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId);
|
|
||||||
pCkInfo->keptCheckpointId = checkpointId;
|
|
||||||
// pCkInfo->version = dataVer;
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
||||||
// wait for the stream task to be idle
|
// wait for the stream task to be idle
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
@ -379,7 +364,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// set input
|
// set input
|
||||||
static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pCurrentVer, const char* id) {
|
static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) {
|
||||||
void* pExecutor = pTask->exec.pExecutor;
|
void* pExecutor = pTask->exec.pExecutor;
|
||||||
|
|
||||||
const SStreamQueueItem* pItem = pInput;
|
const SStreamQueueItem* pItem = pInput;
|
||||||
|
@ -393,8 +378,8 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
|
||||||
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
|
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
|
||||||
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
|
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
|
||||||
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
|
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
|
||||||
ASSERT((*pCurrentVer) < pSubmit->submit.ver);
|
ASSERT((*pVer) < pSubmit->submit.ver);
|
||||||
(*pCurrentVer) = pSubmit->submit.ver;
|
(*pVer) = pSubmit->submit.ver;
|
||||||
|
|
||||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
|
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
|
||||||
|
@ -412,8 +397,8 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
|
||||||
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask,
|
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask,
|
||||||
numOfBlocks, pMerged->ver);
|
numOfBlocks, pMerged->ver);
|
||||||
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
|
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
|
||||||
ASSERT((*pCurrentVer) < pMerged->ver);
|
ASSERT((*pVer) < pMerged->ver);
|
||||||
(*pCurrentVer) = pMerged->ver;
|
(*pVer) = pMerged->ver;
|
||||||
|
|
||||||
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
|
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
|
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
|
||||||
|
@ -472,8 +457,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
const SStreamQueueItem* pItem = pInput;
|
const SStreamQueueItem* pItem = pInput;
|
||||||
qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type);
|
qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type);
|
||||||
|
|
||||||
int64_t currentVer = pTask->chkInfo.currentVer;
|
int64_t ver = pTask->chkInfo.version;
|
||||||
doSetStreamInputBlock(pTask, pInput, ¤tVer, id);
|
doSetStreamInputBlock(pTask, pInput, &pTask->chkInfo.version, id);
|
||||||
|
|
||||||
int64_t resSize = 0;
|
int64_t resSize = 0;
|
||||||
int32_t totalBlocks = 0;
|
int32_t totalBlocks = 0;
|
||||||
|
@ -484,10 +469,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
resSize / 1048576.0, totalBlocks);
|
resSize / 1048576.0, totalBlocks);
|
||||||
|
|
||||||
// update the currentVer if processing the submit blocks.
|
// update the currentVer if processing the submit blocks.
|
||||||
if(currentVer > pTask->chkInfo.currentVer) {
|
ASSERT(pTask->chkInfo.version <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.version);
|
||||||
qDebug("s-task:%s update currentVer from %" PRId64 " to %" PRId64, pTask->id.idStr,
|
|
||||||
pTask->chkInfo.currentVer, currentVer);
|
if(ver != pTask->chkInfo.version) {
|
||||||
pTask->chkInfo.currentVer = currentVer;
|
qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver,
|
||||||
|
pTask->chkInfo.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t type = pInput->type;
|
int32_t type = pInput->type;
|
||||||
|
@ -546,7 +532,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
|
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
|
||||||
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
||||||
qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%" PRId64 "", pMeta->vgId,
|
qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%" PRId64, pMeta->vgId,
|
||||||
pTask->checkpointingId);
|
pTask->checkpointingId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -574,8 +560,10 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
taosWUnLockLatch(&pTask->pMeta->lock);
|
taosWUnLockLatch(&pTask->pMeta->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64,
|
qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64
|
||||||
pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version);
|
" currentVer:%" PRId64,
|
||||||
|
pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version,
|
||||||
|
pTask->chkInfo.currentVer);
|
||||||
} else {
|
} else {
|
||||||
// todo: let's retry send rsp to upstream/mnode
|
// todo: let's retry send rsp to upstream/mnode
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue