fix(stream): fix error in checkpointing.

This commit is contained in:
Haojun Liao 2023-07-10 19:20:05 +08:00
parent ab5b2fe0a8
commit 87bedef2cd
6 changed files with 59 additions and 47 deletions

View File

@ -258,7 +258,7 @@ typedef struct SStreamId {
} SStreamId;
typedef struct SCheckpointInfo {
int64_t id;
int64_t keptCheckpointId;
int64_t version; // offset in WAL
int64_t currentVer; // current offset in WAL, not serialize it
} SCheckpointInfo;
@ -311,7 +311,6 @@ struct SStreamTask {
SStreamId historyTaskId;
SStreamId streamTaskId;
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
SArray* pRpcMsgList; // SArray<SRpcMsg*>
// output
union {

View File

@ -777,11 +777,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
return -1;
}
SReadHandle handle = {.vnode = pTq->pVnode,
.initTqReader = 1,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window};
SReadHandle handle = {
.vnode = pTq->pVnode,
.initTqReader = 1,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
@ -804,13 +806,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamEpInfoList);
SReadHandle handle = {.vnode = NULL,
.numOfVgroups = numOfVgroups,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window};
initStorageAPI(&handle.api);
SReadHandle handle = {
.vnode = NULL,
.numOfVgroups = numOfVgroups,
.pStateBackend = pTask->pState,
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
if (pTask->exec.pExecutor == NULL) {
return -1;
@ -819,6 +823,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
}
if (pTask->status.taskStatus != TASK_STATUS__SCAN_HISTORY) {
tqInfo("s-task:%s status is set to %s prev in meta:%s", pTask->id.idStr,
streamGetTaskStatusStr(TASK_STATUS__SCAN_HISTORY), streamGetTaskStatusStr(pTask->status.taskStatus));
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
}
pTask->pRpcMsgList = taosArrayInit(4, sizeof(SRpcMsg));
// sink

View File

@ -51,7 +51,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id);

View File

@ -475,7 +475,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
}
// this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) {
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pRpcMsgList);
ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num);

View File

@ -18,7 +18,7 @@
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
static int32_t updateCheckPointInfo(SStreamTask* pTask);
static int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId);
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
@ -32,12 +32,6 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) {
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
int32_t* totalBlocks) {
int32_t code = updateCheckPointInfo(pTask);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return code;
}
int32_t numOfBlocks = taosArrayGetSize(pRes);
if (numOfBlocks > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
@ -50,7 +44,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
size / 1048576.0);
code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
int32_t code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
destroyStreamDataBlock(pStreamBlocks);
return -1;
@ -301,31 +295,17 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
}
#endif
int32_t updateCheckPointInfo(SStreamTask* pTask) {
int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId) {
int64_t ckId = 0;
int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
SCheckpointInfo* pCkInfo = &pTask->chkInfo;
if (ckId > pCkInfo->id) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);
pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};
taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) {
taosWUnLockLatch(&pTask->pMeta->lock);
qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
return -1;
} else {
taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
}
}
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64
" -> %" PRId64,
pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId);
pCkInfo->keptCheckpointId = checkpointId;
pCkInfo->version = dataVer;
return TSDB_CODE_SUCCESS;
}
@ -544,15 +524,38 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%"PRId64, pMeta->vgId);
}
code = updateCheckPointInfo(pTask, pTask->checkpointingId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamTaskSendCheckpointSourceRsp(pTask);
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
streamTaskSendCheckpointRsp(pTask, pMeta->vgId);
code = streamTaskSendCheckpointRsp(pTask);
}
if (code == TSDB_CODE_SUCCESS) {
taosWLockLatch(&pTask->pMeta->lock);
streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) {
taosWUnLockLatch(&pTask->pMeta->lock);
qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
return -1;
} else {
taosWUnLockLatch(&pTask->pMeta->lock);
qDebug("s-task:%s commit after checkpoint generating", pTask->id.idStr);
}
qInfo("vgId:%d s-task:%s commit task status after checkpoint completed", pMeta->vgId, pTask->id.idStr);
} else {
// todo: let's retry send rsp to upstream/mnode
}
} else {
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
(!streamTaskShouldPause(&pTask->status))) {

View File

@ -85,7 +85,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.id) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.keptCheckpointId) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.version) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
@ -148,7 +148,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.id) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.keptCheckpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.version) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;