diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f35ab42ee8..7dceb5c083 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -47,6 +47,7 @@ enum { TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused TASK_STATUS__PAUSE, + TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore }; enum { @@ -118,7 +119,7 @@ typedef struct { } SStreamMergedSubmit; typedef struct { - int8_t type; + int8_t type; int32_t srcVgId; int32_t childId; @@ -134,14 +135,10 @@ typedef struct { SSDataBlock* pBlock; } SStreamRefDataBlock; -typedef struct { - int8_t type; -} SStreamCheckpoint; - typedef struct { int8_t type; SSDataBlock* pBlock; -} SStreamTrigger; +} SStreamTrigger, SStreamCheckpoint; typedef struct SStreamQueueNode SStreamQueueNode; @@ -486,8 +483,8 @@ typedef struct { int64_t expireTime; } SStreamCheckpointSourceRsp; -int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); -int32_t tDecodeSStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); +int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); +int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp); @@ -501,9 +498,7 @@ typedef struct { int32_t upstreamTaskId; int32_t upstreamNodeId; int32_t childId; - int64_t expireTime; - int8_t taskLevel; -} SStreamCheckpointReq; +} SStreamTaskCheckpointReq; typedef struct { SMsgHead msgHead; @@ -514,15 +509,13 @@ typedef struct { int32_t upstreamTaskId; int32_t upstreamNodeId; int32_t childId; - int64_t expireTime; - int8_t taskLevel; } SStreamCheckpointRsp; -int32_t tEncodeSStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq); -int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq); +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); -int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp); -int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp); +int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp); +int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp); typedef struct { int64_t streamId; @@ -633,7 +626,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); -int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq); +int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq); int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp); int32_t streamTaskReleaseState(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 428bd12ee8..60b8734080 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -889,10 +889,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { pMsg->checkpointId = checkpointId; SRpcMsg rpcMsg = { - .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, - .pCont = pMsg, - .contLen = sizeof(SMStreamDoCheckpointMsg), - }; + .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } @@ -909,7 +906,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con int32_t code; int32_t blen; - tEncodeSize(tEncodeSStreamCheckpointSourceReq, &req, blen, code); + tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code); if (code < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -926,7 +923,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); - tEncodeSStreamCheckpointSourceReq(&encoder, &req); + tEncodeStreamCheckpointSourceReq(&encoder, &req); SMsgHead *pMsgHead = (SMsgHead *)buf; pMsgHead->contLen = htonl(tlen); @@ -950,7 +947,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in int32_t code; int32_t blen; - tEncodeSize(tEncodeSStreamCheckpointSourceReq, &req, blen, code); + tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code); if (code < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -967,7 +964,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); - tEncodeSStreamCheckpointSourceReq(&encoder, &req); + tEncodeStreamCheckpointSourceReq(&encoder, &req); SMsgHead *pMsgHead = (SMsgHead *)buf; pMsgHead->contLen = htonl(tlen); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a966fa2245..584b238d1b 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -174,7 +174,6 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq); // tq util int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 203ac10720..f420f75cf1 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -219,7 +219,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. -int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); +int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqCheckStreamStatus(STQ* pTq); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 75a3634f7f..fff3a9d80f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1278,59 +1278,6 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } -int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) { - SDecoder* pCoder = &(SDecoder){0}; - SDeleteRes* pRes = &(SDeleteRes){0}; - - *pRefBlock = NULL; - - pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); - if (pRes->uidList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - tDecoderInit(pCoder, (uint8_t*)pData, len); - tDecodeDeleteRes(pCoder, pRes); - tDecoderClear(pCoder); - - int32_t numOfTables = taosArrayGetSize(pRes->uidList); - if (numOfTables == 0 || pRes->affectedRows == 0) { - taosArrayDestroy(pRes->uidList); - return TSDB_CODE_SUCCESS; - } - - SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); - blockDataEnsureCapacity(pDelBlock, numOfTables); - pDelBlock->info.rows = numOfTables; - pDelBlock->info.version = ver; - - for (int32_t i = 0; i < numOfTables; i++) { - // start key column - SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); - colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column - SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); - colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false); - // uid column - SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); - int64_t* pUid = taosArrayGet(pRes->uidList, i); - colDataSetVal(pUidCol, i, (const char*)pUid, false); - - colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i); - colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i); - colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i); - } - - taosArrayDestroy(pRes->uidList); - *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); - if (pRefBlock == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; - (*pRefBlock)->pBlock = pDelBlock; - return TSDB_CODE_SUCCESS; -} - int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; @@ -1365,7 +1312,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { tqStartStreamTasks(pTq); return 0; } else { - tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId); + tqError("vgId:%d failed to found s-task, taskId:0x%x", vgId, taskId); return -1; } } @@ -1593,14 +1540,34 @@ FAIL: int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } -int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { +int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); int32_t len = msgLen - sizeof(SMsgHead); + int32_t code = 0; - streamDoCheckpoint(pMeta); - // taosWLockLatch(&pMeta->lock); - // taosWUnLockLatch(&pMeta->lock); - return 0; + SStreamCheckpointSourceReq req= {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + goto FAIL; + } + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); + if (pTask == NULL) { + tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.taskId); + goto FAIL; + } + + streamProcessCheckpointSourceReq(pMeta, pTask, &req); + streamMetaReleaseTask(pMeta, pTask); + return code; + + FAIL: + return code; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 5db3e735cc..2dd6524feb 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -29,7 +29,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { int32_t scan = pMeta->walScanCounter; tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan); - // check all restore tasks + // check all tasks bool shouldIdle = true; createStreamTaskRunReq(pTq->pStreamMeta, &shouldIdle); @@ -73,6 +73,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { pTaskList = taosArrayDup(pMeta->pTaskList, NULL); taosWUnLockLatch(&pMeta->lock); + // broadcast the check downstream tasks msg for (int32_t i = 0; i < numOfTasks; ++i) { int32_t* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId); @@ -83,8 +84,8 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { streamTaskCheckDownstreamTasks(pTask); streamMetaReleaseTask(pMeta, pTask); } - taosArrayDestroy(pTaskList); + taosArrayDestroy(pTaskList); return 0; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index c61d42d44e..16df54e95d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -20,21 +20,6 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) { - int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); - if (code < 0) { - tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr); - return -1; - } - - if (streamSchedExec(pTask) < 0) { - tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno)); - return -1; - } - - return TSDB_CODE_SUCCESS; -} - int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { pRsp->reqOffset = pReq->reqOffset; @@ -415,3 +400,60 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* tmsgSendRsp(&rsp); return 0; } + +int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) { + SDecoder* pCoder = &(SDecoder){0}; + SDeleteRes* pRes = &(SDeleteRes){0}; + + *pRefBlock = NULL; + + pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t)); + if (pRes->uidList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + tDecoderInit(pCoder, (uint8_t*)pData, len); + tDecodeDeleteRes(pCoder, pRes); + tDecoderClear(pCoder); + + int32_t numOfTables = taosArrayGetSize(pRes->uidList); + if (numOfTables == 0 || pRes->affectedRows == 0) { + taosArrayDestroy(pRes->uidList); + return TSDB_CODE_SUCCESS; + } + + SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + blockDataEnsureCapacity(pDelBlock, numOfTables); + pDelBlock->info.rows = numOfTables; + pDelBlock->info.version = ver; + + for (int32_t i = 0; i < numOfTables; i++) { + // start key column + SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); + colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column + SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); + colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false); + // uid column + SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); + int64_t* pUid = taosArrayGet(pRes->uidList, i); + colDataSetVal(pUidCol, i, (const char*)pUid, false); + + colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i); + colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i); + colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i); + } + + taosArrayDestroy(pRes->uidList); + *pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); + if (pRefBlock == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK; + (*pRefBlock)->pBlock = pDelBlock; + return TSDB_CODE_SUCCESS; +} + +int32_t tqCreateCheckpointBlock(SStreamCheckpoint** pCheckpointBlock) { + +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ee036b1d10..d511d6c5b8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -499,7 +499,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_VND_STREAM_CHECK_POINT_SOURCE: { - if (tqProcessStreamCheckPointReq(pVnode->pTq, ver, pReq, len) < 0) goto _err; + if (tqProcessStreamCheckPointSourceReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { + goto _err; + } } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 2164b63caf..3b92e259de 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -19,7 +19,7 @@ #include "executor.h" #include "query.h" #include "tstream.h" - +#include "streamBackendRocksdb.h" #include "trpc.h" #ifdef __cplusplus @@ -48,6 +48,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); +int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, SEpSet* pEpSet); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ddbc8da3ec..b0a9bc44f2 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -385,8 +385,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); - } else if (type == STREAM_INPUT__GET_RES) { - // use the default memory limit, refactor later. + qDebug("s-task:%s checkpoint enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 8e216f21c4..7f47126aa1 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -15,7 +15,7 @@ #include "streamInt.h" -int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { +int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; @@ -26,7 +26,7 @@ int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheck return pEncoder->pos; } -int32_t tDecodeSStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) { +int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; @@ -59,7 +59,7 @@ int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointS return 0; } -int32_t tEncodeSStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq) { +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; @@ -68,13 +68,11 @@ int32_t tEncodeSStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointR if (tEncodeI64(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; - if (tEncodeI8(pEncoder, pReq->taskLevel) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } -int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq) { +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; @@ -83,13 +81,11 @@ int32_t tDecodeSStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pR if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; - if (tDecodeI8(pDecoder, &pReq->taskLevel) < 0) return -1; tEndDecode(pDecoder); return 0; } -int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) { +int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; @@ -98,13 +94,11 @@ int32_t tEncodeSStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointR if (tEncodeI64(pEncoder, pRsp->upstreamTaskId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->taskLevel) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } -int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) { +int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; @@ -113,8 +107,6 @@ int32_t tDecodeSStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pR if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->taskLevel) < 0) return -1; tEndDecode(pDecoder); return 0; } @@ -130,24 +122,105 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); } -// static int32_t streamDoCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) { -// // commit tdb state -// streamStateCommit(pTask->pState); -// // commit non-tdb state -// // copy and save new state -// // report to mnode -// // send checkpoint req to downstream -// return 0; -// } +static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) { + SStreamTaskCheckpointReq req = { + .streamId = pTask->id.streamId, + .upstreamTaskId = pTask->id.taskId, + .upstreamNodeId = pTask->info.nodeId, + .downstreamNodeId = pTask->info.nodeId, + .downstreamTaskId = pTask->id.taskId, + .childId = pTask->info.selfChildId, + .checkpointId = checkpointId, + }; + + // serialize + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; + req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; + + qDebug("s-task:%s dispatch checkpoint msg to task:0x%x(vgId:%d)", pTask->id.idStr, req.downstreamTaskId, + req.downstreamNodeId); + + streamDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + + int32_t numOfVgs = taosArrayGetSize(vgInfo); + pTask->notReadyTasks = numOfVgs; + pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); + + qDebug("s-task:%s dispatch %d checkpoint msg to downstream", pTask->id.idStr, numOfVgs); + + for (int32_t i = 0; i < numOfVgs; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + req.downstreamNodeId = pVgInfo->vgId; + req.downstreamTaskId = pVgInfo->taskId; + qDebug("s-task:%s (vgId:%d) checkpoint to task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, + pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i); + streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + } + } else { + qDebug("s-task:%s (vgId:%d) sink task set to be ready for checkpointing", pTask->id.idStr, pTask->info.nodeId); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SINK); + streamTaskLaunchScanHistory(pTask); + } -static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) { - // ref wal - // set status checkpointing - // do checkpoint return 0; } + +// set status check pointing +// do checkpoint +static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, uint64_t checkpointId) { + int code = 0; + char buf[256] = {0}; + + int64_t ts = taosGetTimestampMs(); + + sprintf(buf, "%s/%s", pMeta->path, "checkpoints"); + code = taosMulModeMkDir(buf, 0755); + if (code != 0) { + qError("failed to prepare checkpoint %s, checkpointId:%" PRIu64 ", reason:%s", buf, checkpointId, tstrerror(code)); + return code; + } + + pMeta->checkpointTs = ts; + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + + // 1. set task status to be prepared for check point + pTask->status.taskStatus = TASK_STATUS__CK; + + // 2. put the checkpoint data block into the inputQ, to enable the local status to be flushed to storage backend + { + SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock)); + if (pChkpoint == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pChkpoint->type = STREAM_INPUT__CHECKPOINT; + pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pChkpoint->pBlock == NULL) { + taosFreeQitem(pChkpoint); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pChkpoint->pBlock->info.type = STREAM_CHECKPOINT; + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) { + taosFreeQitem(pChkpoint); + return TSDB_CODE_OUT_OF_MEMORY; + } + + streamSchedExec(pTask); + } + + // 2. dispatch checkpoint msg to downstream task + streamTaskDispatchCheckpointMsg(pTask, checkpointId); + +// code = streamBackendDoCheckpoint((void*)pMeta, buf); + return code; +} + int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { - int32_t code; + int32_t code = 0; int64_t checkpointId = pReq->checkpointId; code = streamDoSourceCheckpoint(pMeta, pTask, checkpointId); @@ -159,7 +232,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, return 0; } -int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq) { +int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq) { int32_t code; int64_t checkpointId = pReq->checkpointId; int32_t childId = pReq->childId; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9241df2e70..41d552e8d9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -15,6 +15,7 @@ #include "streamInt.h" #include "ttimer.h" +#include "trpc.h" #define MAX_BLOCK_NAME_NUM 1024 #define DISPATCH_RETRY_INTERVAL_MS 300 @@ -620,3 +621,43 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { // this block can not be deleted until it has been sent to downstream task successfully. return TSDB_CODE_SUCCESS; } + +int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet) { + void* buf = NULL; + int32_t code = -1; + SRpcMsg msg = {0}; + + int32_t tlen; + tEncodeSize(tEncodeStreamTaskCheckpointReq, pReq, tlen, code); + if (code < 0) { + return -1; + } + + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + return -1; + } + + ((SMsgHead*)buf)->vgId = htonl(nodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamTaskCheckpointReq(&encoder, pReq)) < 0) { + rpcFreeCont(buf); + return code; + } + + tEncoderClear(&encoder); + + msg.contLen = tlen + sizeof(SMsgHead); + msg.pCont = buf; + msg.msgType = TDMT_STREAM_TASK_CHECKPOINT; + + qDebug("s-task:%s (level:%d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, + pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId); + + tmsgSendReq(pEpSet, &msg); + return 0; + +} diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 44352190e1..7d4e7e4615 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -462,24 +462,3 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return 0; } - -int32_t streamDoCheckpoint(SStreamMeta* pMeta) { - int code = -1; - char buf[256] = {0}; - - int64_t ts = taosGetTimestampMs(); - if (ts - pMeta->checkpointTs <= tsStreamCheckpointTickInterval * 1000) { - // avoid do checkpoint freq - return 0; - } - pMeta->checkpointTs = ts; - - sprintf(buf, "%s/%s", pMeta->path, "checkpoints"); - code = taosMulModeMkDir(buf, 0755); - if (code != 0) { - qError("failed to create chechpoint %s, reason:%s", buf, tstrerror(code)); - return code; - } - code = streamBackendDoCheckpoint((void*)pMeta, buf); - return code; -} diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index a3fc3418aa..ffce0af79e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -16,6 +16,7 @@ #include "streamInt.h" #include "ttimer.h" #include "wal.h" +#include "trpc.h" int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { SStreamScanHistoryReq req; @@ -43,6 +44,7 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__SCAN_HISTORY: return "scan-history"; case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__PAUSE: return "paused"; + case TASK_STATUS__CK: return "check-point"; default:return ""; } }