Merge branch 'enh/triggerCheckPoint2' of https://github.com/taosdata/TDengine into enh/triggerCheckPoint2

This commit is contained in:
yihaoDeng 2023-07-11 16:31:07 +08:00
commit 503d7540f7
5 changed files with 79 additions and 75 deletions

View File

@ -259,7 +259,7 @@ typedef struct SStreamId {
typedef struct SCheckpointInfo { typedef struct SCheckpointInfo {
int64_t keptCheckpointId; int64_t keptCheckpointId;
int64_t version; // offset in WAL int64_t version; // latest checkpointId version
int64_t currentVer; // current offset in WAL, not serialize it int64_t currentVer; // current offset in WAL, not serialize it
} SCheckpointInfo; } SCheckpointInfo;

View File

@ -75,7 +75,6 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb; pTask->pMsgCb = &pSnode->msgCb;
pTask->chkInfo.version = ver;
pTask->pMeta = pSnode->pMeta; pTask->pMeta = pSnode->pMeta;
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);

View File

@ -757,7 +757,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta; pTask->pMeta = pTq->pStreamMeta;
pTask->chkInfo.version = ver;
pTask->chkInfo.currentVer = ver; pTask->chkInfo.currentVer = ver;
pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.maxVer = ver;
@ -855,14 +854,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
streamSetupScheduleTrigger(pTask); streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
" child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel, vgId, pTask->id.idStr, pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer,
pTask->info.fillHistory, pTask->triggerParam); pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
// next valid version will add one
pTask->chkInfo.version += 1;
return 0; return 0;
} }

View File

@ -118,10 +118,8 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num); int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
if (old == 0) { if (old == 0) {
qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num); qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num);
pTask->checkpointingId = checkpointId;
} }
ASSERT(pTask->checkpointingId == checkpointId);
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
} }
@ -204,11 +202,11 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
} }
int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq) { int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq) {
int32_t code;
int64_t checkpointId = pReq->checkpointId; int64_t checkpointId = pReq->checkpointId;
int32_t childId = pReq->childId; int32_t childId = pReq->childId;
// set the task status // set the task status
pTask->checkpointingId = checkpointId;
pTask->status.taskStatus = TASK_STATUS__CK; pTask->status.taskStatus = TASK_STATUS__CK;
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);

View File

@ -295,21 +295,6 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
} }
#endif #endif
int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId) {
int64_t ckId = 0;
int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
SCheckpointInfo* pCkInfo = &pTask->chkInfo;
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64
" -> %" PRId64,
pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId);
pCkInfo->keptCheckpointId = checkpointId;
pCkInfo->version = dataVer;
return TSDB_CODE_SUCCESS;
}
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
// wait for the stream task to be idle // wait for the stream task to be idle
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
@ -378,6 +363,56 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// set input
static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) {
void* pExecutor = pTask->exec.pExecutor;
const SStreamQueueItem* pItem = pInput;
if (pItem->type == STREAM_INPUT__GET_RES) {
const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
ASSERT((*pVer) < pSubmit->submit.ver);
(*pVer) = pSubmit->submit.ver;
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask,
numOfBlocks, pMerged->ver);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
ASSERT((*pVer) < pMerged->ver);
(*pVer) = pMerged->ver;
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput;
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT);
} else {
ASSERT(0);
}
}
/** /**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec. * appropriate batch of blocks should be handled in 5 to 10 sec.
@ -418,46 +453,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize);
{ const SStreamQueueItem* pItem = pInput;
// set input qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type);
void* pExecutor = pTask->exec.pExecutor;
const SStreamQueueItem* pItem = pInput; int64_t ver = pTask->chkInfo.version;
if (pItem->type == STREAM_INPUT__GET_RES) { doSetStreamInputBlock(pTask, pInput, &pTask->chkInfo.version, id);
const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput;
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT);
} else {
ASSERT(0);
}
}
int64_t resSize = 0; int64_t resSize = 0;
int32_t totalBlocks = 0; int32_t totalBlocks = 0;
@ -467,6 +468,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
resSize / 1048576.0, totalBlocks); resSize / 1048576.0, totalBlocks);
// update the currentVer if processing the submit blocks.
ASSERT(pTask->chkInfo.version <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.version);
if(ver != pTask->chkInfo.version) {
qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver,
pTask->chkInfo.version);
}
int32_t type = pInput->type; int32_t type = pInput->type;
streamFreeQitem(pInput); streamFreeQitem(pInput);
@ -510,7 +519,6 @@ int32_t streamTryExec(SStreamTask* pTask) {
return -1; return -1;
} }
// todo the task should be commit here
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
@ -524,16 +532,10 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%" PRId64 "", pMeta->vgId, qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId); pTask->checkpointingId);
} }
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
code = updateCheckPointInfo(pTask, pTask->checkpointingId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
// send check point response to upstream task // send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask); code = streamTaskSendCheckpointSourceRsp(pTask);
@ -544,17 +546,24 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
taosWLockLatch(&pTask->pMeta->lock); taosWLockLatch(&pTask->pMeta->lock);
ASSERT(pTask->chkInfo.keptCheckpointId < pTask->checkpointingId);
pTask->chkInfo.keptCheckpointId = pTask->checkpointingId;
streamMetaSaveTask(pTask->pMeta, pTask); streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) { if (streamMetaCommit(pTask->pMeta) < 0) {
taosWUnLockLatch(&pTask->pMeta->lock); taosWUnLockLatch(&pTask->pMeta->lock);
qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); qError("s-task:%s failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", ver:%" PRId64
", since %s",
pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version, terrstr());
return -1; return -1;
} else { } else {
taosWUnLockLatch(&pTask->pMeta->lock); taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s commit after checkpoint generating", pTask->id.idStr);
} }
qInfo("vgId:%d s-task:%s commit task status after checkpoint completed", pMeta->vgId, pTask->id.idStr); qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64
" currentVer:%" PRId64,
pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version,
pTask->chkInfo.currentVer);
} else { } else {
// todo: let's retry send rsp to upstream/mnode // todo: let's retry send rsp to upstream/mnode
} }