fix(stream): commit task meta after do vnode-wide checkpoint.

This commit is contained in:
Haojun Liao 2023-07-12 10:16:14 +08:00
parent 09b764494d
commit 6f2fc4fab3
5 changed files with 38 additions and 25 deletions

View File

@ -54,8 +54,8 @@ typedef struct SDataDispatchHandle {
// clang-format off
// data format:
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | version | total length | numOfRows | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes)
// | | sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | |
// |SDataCacheEntry | version | total length | numOfRows | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data |
// | | sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | |
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header

View File

@ -53,6 +53,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId);
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);

View File

@ -262,3 +262,32 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) {
taosWLockLatch(&pMeta->lock);
for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) {
uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId));
ASSERT(p->chkInfo.keptCheckpointId < p->checkpointingId && p->checkpointingId == checkpointId);
p->chkInfo.keptCheckpointId = p->checkpointingId;
streamMetaSaveTask(pMeta, p);
qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64
", ver:%" PRId64 " currentVer:%" PRId64,
pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.version, p->chkInfo.currentVer);
}
if (streamMetaCommit(pMeta) < 0) {
taosWUnLockLatch(&pMeta->lock);
qError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64", since %s",
pMeta->vgId, checkpointId, terrstr());
return -1;
} else {
taosWUnLockLatch(&pMeta->lock);
qInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%. DONE" PRId64, pMeta->vgId, checkpointId);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -120,6 +120,7 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
return NULL;
}
pDataSubmit->ver = pData->ver;
pDataSubmit->submit = *pData;
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1
pDataSubmit->type = type;

View File

@ -532,7 +532,8 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%" PRId64, pMeta->vgId,
streamSaveTasks(pMeta, pTask->checkpointingId);
qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId,
pTask->checkpointingId);
}
@ -543,29 +544,10 @@ int32_t streamTryExec(SStreamTask* pTask) {
code = streamTaskSendCheckpointRsp(pTask);
}
if (code == TSDB_CODE_SUCCESS) {
taosWLockLatch(&pTask->pMeta->lock);
ASSERT(pTask->chkInfo.keptCheckpointId < pTask->checkpointingId);
pTask->chkInfo.keptCheckpointId = pTask->checkpointingId;
streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) {
taosWUnLockLatch(&pTask->pMeta->lock);
qError("s-task:%s failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", ver:%" PRId64
", since %s",
pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version, terrstr());
return -1;
} else {
taosWUnLockLatch(&pTask->pMeta->lock);
}
qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64
" currentVer:%" PRId64,
pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version,
pTask->chkInfo.currentVer);
} else {
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%"PRId64", code:%s",
pTask->id.idStr, pTask->checkpointingId, tstrerror(code));
}
} else {
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&