diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 2a22656d8c..d4bbc2cb26 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -54,8 +54,8 @@ typedef struct SDataDispatchHandle { // clang-format off // data format: // +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ -// |SDataCacheEntry | version | total length | numOfRows | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | .... | | (4 bytes) |(8 bytes) -// | | sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | +// |SDataCacheEntry | version | total length | numOfRows | group id | col1_schema | col2_schema | col3_schema... | column#1 length, column#2 length...| col1 bitmap | col1 data | col2 bitmap | col2 data | +// | | sizeof(int32_t) |sizeof(int32) | sizeof(int32)| sizeof(uint64_t) | (sizeof(int8_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | | // +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ // The length of bitmap is decided by number of rows of this data block, and the length of each column data is // recorded in the first segment, next to the struct header diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index e2632ee25a..e0d40cafa0 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -53,6 +53,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); +int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId); int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4a7e571011..af1224b716 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -262,3 +262,32 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } + +int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { + taosWLockLatch(&pMeta->lock); + + for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { + uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); + SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId)); + + ASSERT(p->chkInfo.keptCheckpointId < p->checkpointingId && p->checkpointingId == checkpointId); + p->chkInfo.keptCheckpointId = p->checkpointingId; + + streamMetaSaveTask(pMeta, p); + qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 + ", ver:%" PRId64 " currentVer:%" PRId64, + pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.version, p->chkInfo.currentVer); + } + + if (streamMetaCommit(pMeta) < 0) { + taosWUnLockLatch(&pMeta->lock); + qError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64", since %s", + pMeta->vgId, checkpointId, terrstr()); + return -1; + } else { + taosWUnLockLatch(&pMeta->lock); + qInfo("vgId:%d commit stream meta after do checkpoint, checkpointId:%. DONE" PRId64, pMeta->vgId, checkpointId); + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index e258e93f8d..f8eb6ef069 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -120,6 +120,7 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { return NULL; } + pDataSubmit->ver = pData->ver; pDataSubmit->submit = *pData; *pDataSubmit->dataRef = 1; // initialize the reference count to be 1 pDataSubmit->type = type; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d4b6f0927d..0492c46902 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -532,7 +532,8 @@ int32_t streamTryExec(SStreamTask* pTask) { if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); - qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%" PRId64, pMeta->vgId, + streamSaveTasks(pMeta, pTask->checkpointingId); + qDebug("vgId:%d vnode wide checkpoint completed, save all tasks status, checkpointId:%" PRId64, pMeta->vgId, pTask->checkpointingId); } @@ -543,29 +544,10 @@ int32_t streamTryExec(SStreamTask* pTask) { code = streamTaskSendCheckpointRsp(pTask); } - if (code == TSDB_CODE_SUCCESS) { - taosWLockLatch(&pTask->pMeta->lock); - - ASSERT(pTask->chkInfo.keptCheckpointId < pTask->checkpointingId); - pTask->chkInfo.keptCheckpointId = pTask->checkpointingId; - - streamMetaSaveTask(pTask->pMeta, pTask); - if (streamMetaCommit(pTask->pMeta) < 0) { - taosWUnLockLatch(&pTask->pMeta->lock); - qError("s-task:%s failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", ver:%" PRId64 - ", since %s", - pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version, terrstr()); - return -1; - } else { - taosWUnLockLatch(&pTask->pMeta->lock); - } - - qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64 - " currentVer:%" PRId64, - pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version, - pTask->chkInfo.currentVer); - } else { + if (code != TSDB_CODE_SUCCESS) { // todo: let's retry send rsp to upstream/mnode + qError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%"PRId64", code:%s", + pTask->id.idStr, pTask->checkpointingId, tstrerror(code)); } } else { if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&