From 47877898d04f9aaea6f961cfebb58c0dfab66ffe Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Jul 2023 16:07:52 +0800 Subject: [PATCH] enh(stream): generate the checkpoint framework. --- include/libs/stream/tstream.h | 31 +- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 3 +- source/dnode/vnode/src/inc/vnodeInt.h | 2 + source/dnode/vnode/src/tq/tq.c | 164 ++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 11 + source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/inc/streamInt.h | 13 +- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 19 +- source/libs/stream/src/streamCheckpoint.c | 159 +++-- source/libs/stream/src/streamData.c | 2 +- source/libs/stream/src/streamDispatch.c | 584 ++++++++++-------- source/libs/stream/src/streamExec.c | 112 ++-- source/libs/stream/src/streamQueue.c | 60 +- source/libs/stream/src/streamRecover.c | 23 - 16 files changed, 669 insertions(+), 519 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7dceb5c083..678ec7503f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -48,6 +48,7 @@ enum { TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused TASK_STATUS__PAUSE, TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore + TASK_STATUS__CK_READY, }; enum { @@ -310,9 +311,8 @@ struct SStreamTask { SStreamId historyTaskId; SStreamId streamTaskId; SArray* pUpstreamEpInfoList; // SArray, // children info - int32_t nextCheckId; SArray* checkpointInfo; // SArray - + SArray* pRpcMsgList; // SArray // output union { STaskDispatcherFixedEp fixedEpDispatcher; @@ -343,6 +343,7 @@ struct SStreamTask { int32_t refCnt; int64_t checkpointingId; int32_t checkpointAlignCnt; + int32_t checkpointNotReadyTasks; struct SStreamMeta* pMeta; SSHashObj* pNameMap; }; @@ -366,7 +367,8 @@ typedef struct SStreamMeta { SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; - int64_t checkpointTs; +// uint64_t checkpointId; + int32_t notCkptReadyTasks; SArray* checkpointSaved; SArray* checkpointInUse; int32_t checkpointCap; @@ -486,8 +488,8 @@ typedef struct { int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq); int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq); -int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); -int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp); +int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp); +int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp); typedef struct { SMsgHead msgHead; @@ -509,13 +511,13 @@ typedef struct { int32_t upstreamTaskId; int32_t upstreamNodeId; int32_t childId; -} SStreamCheckpointRsp; +} SStreamTaskCheckpointRsp; -int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); -int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); +int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); +int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); -int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp); -int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp); +int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpointRsp* pRsp); +int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamTaskCheckpointRsp* pRsp); typedef struct { int64_t streamId; @@ -536,12 +538,6 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); -int32_t tEncodeSStreamTaskScanHistoryReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq); -int32_t tDecodeSStreamTaskScanHistoryReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq); - -int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp); -int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp); - int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); @@ -557,7 +553,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); -// int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); @@ -627,7 +622,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq); -int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp); +int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointRsp* pRsp); int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 311cbdc43b..47ff491c3f 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -80,6 +80,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index a0b3300a10..b2654b7c39 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -750,7 +750,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f420f75cf1..8aabaac0bb 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -220,6 +220,8 @@ int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); +int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); +int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqCheckStreamStatus(STQ* pTq); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fff3a9d80f..431aee9c5e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -217,55 +217,6 @@ void tqNotifyClose(STQ* pTq) { } } -//static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, -// int64_t consumerId, int32_t type) { -// int32_t len = 0; -// int32_t code = 0; -// -// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { -// tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); -// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { -// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); -// } -// -// if (code < 0) { -// return -1; -// } -// -// int32_t tlen = sizeof(SMqRspHead) + len; -// void* buf = rpcMallocCont(tlen); -// if (buf == NULL) { -// return -1; -// } -// -// ((SMqRspHead*)buf)->mqMsgType = type; -// ((SMqRspHead*)buf)->epoch = epoch; -// ((SMqRspHead*)buf)->consumerId = consumerId; -// -// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); -// -// SEncoder encoder = {0}; -// tEncoderInit(&encoder, abuf, len); -// -// if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) { -// tEncodeMqDataRsp(&encoder, pRsp); -// } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { -// tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); -// } -// -// tEncoderClear(&encoder); -// -// SRpcMsg rsp = { -// .info = *pRpcHandleInfo, -// .pCont = buf, -// .contLen = tlen, -// .code = 0, -// }; -// -// tmsgSendRsp(&rsp); -// return 0; -//} - int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { SMqDataRsp dataRsp = {0}; dataRsp.head.consumerId = pHandle->consumerId; @@ -948,6 +899,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } + // todo extract method SEncoder encoder; int32_t code; int32_t len; @@ -973,7 +925,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code; @@ -982,7 +934,6 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)pReq, len); code = tDecodeStreamTaskCheckRsp(&decoder, &rsp); - if (code < 0) { tDecoderClear(&decoder); return -1; @@ -1295,26 +1246,27 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask != NULL) { - // even in halt status, the data in inputQ must be processed - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) { - tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.version); - streamProcessRunReq(pTask); - } else { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, - pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); - } - - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqStartStreamTasks(pTq); - return 0; - } else { + if (pTask == NULL) { tqError("vgId:%d failed to found s-task, taskId:0x%x", vgId, taskId); return -1; } + + // even in halt status, the data in inputQ must be processed + int8_t status = pTask->status.taskStatus; + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) { + tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.version); + streamProcessRunReq(pTask); + } else { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, + pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); + } + + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + tqStartStreamTasks(pTq); + + return 0; } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { @@ -1554,16 +1506,23 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMs if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); + tqError("vgId:%d failed to decode checkpoint source msg, code:%s", vgId, tstrerror(code)); goto FAIL; } tDecoderClear(&decoder); + // todo handle this bug: task not in ready state. SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); if (pTask == NULL) { - tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.taskId); + tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, req.taskId); goto FAIL; } + // backup the rpchandle for rsp + SRpcMsg* pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + memcpy(pRpcMsg, (SRpcMsg*)pMsg, sizeof(SRpcMsg)); + taosArrayPush(pTask->pRpcMsgList, &pRpcMsg); + streamProcessCheckpointSourceReq(pMeta, pTask, &req); streamMetaReleaseTask(pMeta, pTask); return code; @@ -1571,3 +1530,70 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMs FAIL: return code; } + +int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); + int32_t len = msgLen - sizeof(SMsgHead); + int32_t code = 0; + + SStreamTaskCheckpointReq req= {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamCheckpointReq(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + goto FAIL; + } + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId); + goto FAIL; + } + + streamProcessCheckpointReq(pMeta, pTask, &req); + streamMetaReleaseTask(pMeta, pTask); + return code; + + FAIL: + return code; +} + +// downstream task has complete the stream task checkpoint procedure +int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { + // if this task is an agg task, rsp this message to upstream directly. + // if this task is an source task, send source rsp to mnode + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); + int32_t len = msgLen - sizeof(SMsgHead); + int32_t code = 0; + + SStreamTaskCheckpointRsp req= {0}; + + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if (tDecodeStreamCheckpointRsp(&decoder, &req) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + tDecoderClear(&decoder); + goto FAIL; + } + tDecoderClear(&decoder); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId); + if (pTask == NULL) { + tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId); + goto FAIL; + } + + streamProcessCheckpointRsp(pMeta, pTask, &req); + streamMetaReleaseTask(pMeta, pTask); + return code; + + FAIL: + return code; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index d511d6c5b8..28eb495c70 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -503,6 +503,17 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } } break; + + case TDMT_STREAM_TASK_CHECKPOINT: { + if (tqProcessStreamCheckPointReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { + goto _err; + } + } break; + case TDMT_STREAM_TASK_CHECKPOINT_RSP: { + if (tqProcessStreamCheckPointRsp(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { + goto _err; + } + } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 28b7c10117..aa48f5cb29 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -47,7 +47,7 @@ typedef struct { void* streamBackendInit(const char* path); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); -int32_t streamBackendDoCheckpoint(void* pMeta, const char* path); +int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 3b92e259de..47b03914b8 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -32,11 +32,13 @@ typedef struct { } SStreamGlobalEnv; extern SStreamGlobalEnv streamEnv; +extern int32_t streamBackendId; +extern int32_t streamBackendCfWrapperId; void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); -SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); +SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); void destroyStreamDataBlock(SStreamDataBlock* pBlock); @@ -49,15 +51,12 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); +int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId); +int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask, int32_t vgId); -int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet); - +int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); -extern int32_t streamBackendId; -extern int32_t streamBackendCfWrapperId; - #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b0a9bc44f2..f772237635 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -146,7 +146,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int8_t status = 0; - SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); + SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); if (pBlock == NULL) { streamTaskInputFail(pTask); status = TASK_INPUT_STATUS__FAILED; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 99bcfd744f..e278cf28e7 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -401,20 +401,29 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { taosArrayDestroy(checkpointDel); return 0; } -int32_t streamBackendDoCheckpoint(void* arg, const char* path) { + +int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { SStreamMeta* pMeta = arg; int64_t backendRid = pMeta->streamBackendRid; - int64_t checkpointId = pMeta->checkpointTs; int64_t st = taosGetTimestampMs(); int32_t code = -1; - SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); + + char path[256] = {0}; + sprintf(path, "%s/%s", pMeta->path, "checkpoints"); + code = taosMulModeMkDir(path, 0755); + if (code != 0) { + qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); + return code; + } char checkpointDir[256] = {0}; - sprintf(checkpointDir, "%s/checkpoint_%" PRId64 "", path, checkpointId); + snprintf(checkpointDir, tListLen(checkpointDir),"%s/checkpoint_%" PRIu64, path, checkpointId); + SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); if (pHandle == NULL) { return -1; } + qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, path); if (pHandle->db != NULL) { char* err = NULL; @@ -442,10 +451,12 @@ int32_t streamBackendDoCheckpoint(void* arg, const char* path) { taosWUnLockLatch(&pMeta->checkpointDirLock); delObsoleteCheckpoint(arg, path); + _ERROR: taosReleaseRef(streamBackendId, backendRid); return code; } + SListNode* streamBackendAddCompare(void* backend, void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)backend; SListNode* node = NULL; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7f47126aa1..eb7081b998 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -37,7 +37,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo return 0; } -int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { +int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; @@ -48,7 +48,7 @@ int32_t tEncodeSStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheck return pEncoder->pos; } -int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp) { +int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; @@ -59,7 +59,7 @@ int32_t tDecodeSStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointS return 0; } -int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { +int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; @@ -72,7 +72,7 @@ int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskChec return pEncoder->pos; } -int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { +int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; @@ -85,7 +85,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint return 0; } -int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) { +int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpointRsp* pRsp) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; @@ -98,7 +98,7 @@ int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRs return pEncoder->pos; } -int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) { +int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamTaskCheckpointRsp* pRsp) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; @@ -112,13 +112,14 @@ int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRs } static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) { - if (pTask->checkpointingId == 0) { + int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int64_t old = atomic_val_compare_exchange_64(&pTask->checkpointingId, 0, num); + if (old == 0) { + qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num); pTask->checkpointingId = checkpointId; - pTask->checkpointAlignCnt = taosArrayGetSize(pTask->pUpstreamEpInfoList); } ASSERT(pTask->checkpointingId == checkpointId); - return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); } @@ -159,77 +160,49 @@ static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t chec pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i); streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } - } else { - qDebug("s-task:%s (vgId:%d) sink task set to be ready for checkpointing", pTask->id.idStr, pTask->info.nodeId); - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SINK); - streamTaskLaunchScanHistory(pTask); } return 0; } -// set status check pointing -// do checkpoint -static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, uint64_t checkpointId) { - int code = 0; - char buf[256] = {0}; - - int64_t ts = taosGetTimestampMs(); - - sprintf(buf, "%s/%s", pMeta->path, "checkpoints"); - code = taosMulModeMkDir(buf, 0755); - if (code != 0) { - qError("failed to prepare checkpoint %s, checkpointId:%" PRIu64 ", reason:%s", buf, checkpointId, tstrerror(code)); - return code; +static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask) { + SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock)); + if (pChkpoint == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } - pMeta->checkpointTs = ts; - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - - // 1. set task status to be prepared for check point - pTask->status.taskStatus = TASK_STATUS__CK; - - // 2. put the checkpoint data block into the inputQ, to enable the local status to be flushed to storage backend - { - SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock)); - if (pChkpoint == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pChkpoint->type = STREAM_INPUT__CHECKPOINT; - pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (pChkpoint->pBlock == NULL) { - taosFreeQitem(pChkpoint); - return TSDB_CODE_OUT_OF_MEMORY; - } - - pChkpoint->pBlock->info.type = STREAM_CHECKPOINT; - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) { - taosFreeQitem(pChkpoint); - return TSDB_CODE_OUT_OF_MEMORY; - } - - streamSchedExec(pTask); + pChkpoint->type = STREAM_INPUT__CHECKPOINT; + pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pChkpoint->pBlock == NULL) { + taosFreeQitem(pChkpoint); + return TSDB_CODE_OUT_OF_MEMORY; } - // 2. dispatch checkpoint msg to downstream task - streamTaskDispatchCheckpointMsg(pTask, checkpointId); + pChkpoint->pBlock->info.type = STREAM_CHECKPOINT; + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) { + taosFreeQitem(pChkpoint); + return TSDB_CODE_OUT_OF_MEMORY; + } -// code = streamBackendDoCheckpoint((void*)pMeta, buf); - return code; + streamSchedExec(pTask); + return TSDB_CODE_SUCCESS; } int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) { int32_t code = 0; int64_t checkpointId = pReq->checkpointId; - code = streamDoSourceCheckpoint(pMeta, pTask, checkpointId); - if (code < 0) { - // rsp error - return -1; - } + qDebug("s-task:%s level:%d receive the checkpoint msg id:%" PRId64 " from mnode", pTask->id.idStr, + pTask->info.taskLevel, checkpointId); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - return 0; + // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. + pTask->status.taskStatus = TASK_STATUS__CK; + pTask->checkpointNotReadyTasks = 1; + + // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. + streamTaskDispatchCheckpointMsg(pTask, checkpointId); + return code; } int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq) { @@ -237,31 +210,55 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre int64_t checkpointId = pReq->checkpointId; int32_t childId = pReq->childId; - if (taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0) { - code = streamAlignCheckpoint(pTask, checkpointId, childId); - if (code > 0) { + // set the task status + pTask->status.taskStatus = TASK_STATUS__CK; + ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); + + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + qDebug("s-task:%s sink task set to checkpoint ready", pTask->id.idStr); + appendCheckpointIntoInputQ(pTask); + streamSchedExec(pTask); + } else { + // todo close the inputQ for data from childId, which means data from childId are not allowed to put into intpuQ + // anymore + ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) > 0); + + // there are still some upstream tasks not send checkpoint request + int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId); + if (notReady > 0) { + int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); + qDebug("s-task:%s %d upstream tasks not send checkpoint info yet, total:%d", pTask->id.idStr, notReady, num); return 0; } - if (code < 0) { - ASSERT(0); - return -1; - } - } - // code = streamDoCheckpoint(pMeta, pTask, checkpointId); - if (code < 0) { - // rsp error - return -1; - } + qDebug("s-task:%s all upstream send checkpoint msg now, dispatch checkpoint msg to downstream", pTask->id.idStr); + pTask->checkpointNotReadyTasks = (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) + ? 1 + : taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); - // send rsp to all children + // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY + // 2. dispatch check point msg to all downstream tasks + streamTaskDispatchCheckpointMsg(pTask, checkpointId); + } return 0; } -int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp) { - // recover step2, scan from wal - // unref wal - // set status normal +/** + * All down stream tasks have successfully completed the check point task. + * Current stream task is allowed to start to do checkpoint things in ASYNC model. + */ +int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointRsp* pRsp) { + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); + + // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task + int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1); + if (notReady == 0) { + qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for this task", + pTask->id.idStr); + appendCheckpointIntoInputQ(pTask); + streamSchedExec(pTask); + } + return 0; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index bad104bc8e..e258e93f8d 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,7 +15,7 @@ #include "streamInt.h" -SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { +SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); if (pData == NULL) { return NULL; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 41d552e8d9..dbbb6b0db3 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -14,8 +14,8 @@ */ #include "streamInt.h" -#include "ttimer.h" #include "trpc.h" +#include "ttimer.h" #define MAX_BLOCK_NAME_NUM 1024 #define DISPATCH_RETRY_INTERVAL_MS 300 @@ -26,57 +26,19 @@ typedef struct SBlockName { char parTbName[TSDB_TABLE_NAME_LEN]; } SBlockName; -static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; - ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); - ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); - for (int32_t i = 0; i < pReq->blockNum; i++) { - int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); - void* data = taosArrayGetP(pReq->data, i); - if (tEncodeI32(pEncoder, len) < 0) return -1; - if (tEncodeBinary(pEncoder, data, len) < 0) return -1; - } - tEndEncode(pEncoder); - return pEncoder->pos; -} +static void doRetryDispatchData(void* param, void* tmrId); +static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq); +static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); +static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, + SEpSet* pEpSet); +static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet); +static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, + int32_t vgSz, int64_t groupId); -static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { - int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); - void* buf = taosMemoryCalloc(1, dataStrLen); - if (buf == NULL) return -1; - - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; - pRetrieve->useconds = 0; - pRetrieve->precision = TSDB_DEFAULT_PRECISION; - pRetrieve->compressed = 0; - pRetrieve->completed = 1; - pRetrieve->streamBlockType = pBlock->info.type; - pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); - pRetrieve->skey = htobe64(pBlock->info.window.skey); - pRetrieve->ekey = htobe64(pBlock->info.window.ekey); - pRetrieve->version = htobe64(pBlock->info.version); - pRetrieve->watermark = htobe64(pBlock->info.watermark); - memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); - - int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); - pRetrieve->numOfCols = htonl(numOfCols); - - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); - actualLen += sizeof(SRetrieveTableRsp); - ASSERT(actualLen <= dataStrLen); - taosArrayPush(pReq->dataLen, &actualLen); - taosArrayPush(pReq->data, &buf); - - pReq->totalLen += dataStrLen; - return 0; +static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { + pMsg->msgType = msgType; + pMsg->pCont = pCont; + pMsg->contLen = contLen; } int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { @@ -103,6 +65,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { taosArrayPush(pReq->dataLen, &len1); taosArrayPush(pReq->data, &data); } + tEndDecode(pDecoder); return 0; } @@ -220,7 +183,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) tEncodeStreamRetrieveReq(&encoder, &req); tEncoderClear(&encoder); - SRpcMsg rpcMsg = {.code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len}; + SRpcMsg rpcMsg = {0}; + initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, buf, len + sizeof(SMsgHead)); + if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { ASSERT(0); goto CLEAR; @@ -263,13 +228,9 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR rpcFreeCont(buf); return code; } - tEncoderClear(&encoder); - msg.contLen = tlen + sizeof(SMsgHead); - msg.pCont = buf; - msg.msgType = TDMT_STREAM_TASK_CHECK; - + initRpcMsg(&msg, TDMT_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead)); qDebug("s-task:%s (level:%d) dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId); @@ -277,161 +238,6 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR return 0; } -int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet) { - void* buf = NULL; - int32_t code = -1; - SRpcMsg msg = {0}; - - int32_t tlen; - tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code); - if (code < 0) { - return -1; - } - - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - ((SMsgHead*)buf)->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) { - if (buf) { - rpcFreeCont(buf); - } - return code; - } - - tEncoderClear(&encoder); - - msg.contLen = tlen + sizeof(SMsgHead); - msg.pCont = buf; - msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH; - msg.info.noResp = 1; - - tmsgSendReq(pEpSet, &msg); - - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, - pReq->taskId, vgId); - return 0; -} - -static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { - void* buf = NULL; - int32_t code = -1; - SRpcMsg msg = {0}; - - // serialize - int32_t tlen; - tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code); - if (code < 0) { - goto FAIL; - } - - code = -1; - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - goto FAIL; - } - - ((SMsgHead*)buf)->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) { - goto FAIL; - } - tEncoderClear(&encoder); - - msg.contLen = tlen + sizeof(SMsgHead); - msg.pCont = buf; - msg.msgType = pTask->msgInfo.msgType; - - qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); - return tmsgSendReq(pEpSet, &msg); - -FAIL: - if (buf) { - rpcFreeCont(buf); - } - - return code; -} - -int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, - int64_t groupId) { - uint32_t hashValue = 0; - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - if (pTask->pNameMap == NULL) { - pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - } - - void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); - if (pVal) { - SBlockName* pBln = (SBlockName*)pVal; - hashValue = pBln->hashValue; - if (!pDataBlock->info.parTbName[0]) { - memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName)); - } - } else { - char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); - if (ctbName == NULL) { - return -1; - } - - if (pDataBlock->info.parTbName[0]) { - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); - } else { - buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); - } - - /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ - SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo; - hashValue = - taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); - taosMemoryFree(ctbName); - SBlockName bln = {0}; - bln.hashValue = hashValue; - memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); - if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { - tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); - } - } - - bool found = false; - // TODO: optimize search - int32_t j; - for (j = 0; j < vgSz; j++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); - ASSERT(pVgInfo->vgId > 0); - - if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { - if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { - return -1; - } - - if (pReqs[j].blockNum == 0) { - atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - } - - pReqs[j].blockNum++; - found = true; - break; - } - } - ASSERT(found); - return 0; -} - int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; @@ -494,7 +300,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat // TODO: do not use broadcast if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - for (int32_t j = 0; j < vgSz; j++) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; @@ -514,14 +319,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } } - qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId, - numOfBlocks, vgSz); + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, + pTask->info.selfChildId, numOfBlocks, vgSz); for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId, - pReqs[i].blockNum, pVgInfo->vgId); + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, + pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId); code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet); if (code < 0) { @@ -544,20 +349,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return code; } -static void doRetryDispatchData(void* param, void* tmrId) { - SStreamTask* pTask = param; - ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); - - int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); - if (code != TSDB_CODE_SUCCESS) { - qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); - atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); - streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); - } -} - void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { - qError("s-task:%s dispatch data in %"PRId64"ms", pTask->id.idStr, waitDuration); + qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration); taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); } @@ -604,15 +397,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { // todo deal with only partially success dispatch case atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); - if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore + if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore destroyStreamDataBlock(pTask->msgInfo.pData); pTask->msgInfo.pData = NULL; return code; } - if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", pTask->id.idStr, - retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); + if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry + qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", + pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } @@ -622,13 +415,14 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet) { +int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, + SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; int32_t tlen; - tEncodeSize(tEncodeStreamTaskCheckpointReq, pReq, tlen, code); + tEncodeSize(tEncodeStreamCheckpointReq, pReq, tlen, code); if (code < 0) { return -1; } @@ -643,21 +437,325 @@ int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpo SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamTaskCheckpointReq(&encoder, pReq)) < 0) { + if ((code = tEncodeStreamCheckpointReq(&encoder, pReq)) < 0) { rpcFreeCont(buf); return code; } - tEncoderClear(&encoder); - msg.contLen = tlen + sizeof(SMsgHead); - msg.pCont = buf; - msg.msgType = TDMT_STREAM_TASK_CHECKPOINT; - + initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); qDebug("s-task:%s (level:%d) dispatch checkpoint msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); return 0; - +} + +int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { + SStreamScanHistoryFinishReq req = {.streamId = pTask->id.streamId, .childId = pTask->info.selfChildId}; + + // serialize + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + req.taskId = pTask->fixedEpDispatcher.taskId; + doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgs = taosArrayGetSize(vgInfo); + + qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", + pTask->id.idStr, numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus)); + for (int32_t i = 0; i < numOfVgs; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + req.taskId = pVgInfo->taskId; + doDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + } + } + + return 0; +} + +// this function is usually invoked by sink/agg task +int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) { +// int32_t code = 0; +// int32_t len; +// +// // todo set upstreamTaskId Info +// const SStreamTaskCheckpointRsp rsp = { +// .streamId = pTask->id.streamId, .downstreamTaskId = pTask->id.taskId, .downstreamNodeId = vgId}; +// +// SEncoder encoder; +// tEncodeSize(tEncodeStreamCheckpointRsp, &rsp, len, code); +// if (code < 0) { +// qError("vgId:%d failed to encode checkpoint rsp, task:0x%x", vgId, rsp.downstreamTaskId); +// return -1; +// } +// +// void* buf = rpcMallocCont(sizeof(SMsgHead) + len); +// ((SMsgHead*)buf)->vgId = htonl(rsp.upstreamNodeId); +// +// void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); +// tEncoderInit(&encoder, (uint8_t*)abuf, len); +// tEncodeStreamCheckpointRsp(&encoder, &rsp); +// tEncoderClear(&encoder); + + int32_t num = taosArrayGetSize(pTask->pRpcMsgList); + ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num); + + qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel, + num); + for(int32_t i = 0; i < num; ++i) { + SRpcMsg* pMsg = taosArrayGetP(pTask->pRpcMsgList, 0); + tmsgSendRsp(pMsg); + } + + return TSDB_CODE_SUCCESS; +} + +// this function is only invoked by source task, and send rsp to mnode +int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask, int32_t vgId) { + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); +// int32_t code = 0; +// int32_t len; +// SEncoder encoder; +// SStreamMeta* pMeta = pTask->pMeta; +// const SStreamCheckpointSourceRsp rsp = { +// .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId, .checkpointId = pMeta->checkpointId}; +// +// tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code); +// if (code < 0) { +// qError("vgId:%d failed to encode source checkpoint rsp, task:0x%x", vgId, pTask->id.taskId); +// return -1; +// } +// +// void* buf = rpcMallocCont(sizeof(SMsgHead) + len); +// ((SMsgHead*)buf)->vgId = htonl(rsp.nodeId); +// +// void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); +// tEncoderInit(&encoder, (uint8_t*)abuf, len); +// tEncodeStreamCheckpointSourceRsp(&encoder, &rsp); +// tEncoderClear(&encoder); +// SRpcMsg rspMsg = *pTask->rpcMsg; + + ASSERT(taosArrayGetSize(pTask->pRpcMsgList) == 0); + + qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); + tmsgSendRsp(taosArrayGetP(pTask->pRpcMsgList, 0)); + + return TSDB_CODE_SUCCESS; +} + +int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; + ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); + ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); + for (int32_t i = 0; i < pReq->blockNum; i++) { + int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); + void* data = taosArrayGetP(pReq->data, i); + if (tEncodeI32(pEncoder, len) < 0) return -1; + if (tEncodeBinary(pEncoder, data, len) < 0) return -1; + } + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + void* buf = taosMemoryCalloc(1, dataStrLen); + if (buf == NULL) return -1; + + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + pRetrieve->useconds = 0; + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->streamBlockType = pBlock->info.type; + pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); + pRetrieve->skey = htobe64(pBlock->info.window.skey); + pRetrieve->ekey = htobe64(pBlock->info.window.ekey); + pRetrieve->version = htobe64(pBlock->info.version); + pRetrieve->watermark = htobe64(pBlock->info.watermark); + memcpy(pRetrieve->parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); + + int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); + pRetrieve->numOfCols = htonl(numOfCols); + + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(pReq->dataLen, &actualLen); + taosArrayPush(pReq->data, &buf); + + pReq->totalLen += dataStrLen; + return 0; +} + +int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, + SEpSet* pEpSet) { + void* buf = NULL; + int32_t code = -1; + SRpcMsg msg = {0}; + + int32_t tlen; + tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code); + if (code < 0) { + return -1; + } + + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + ((SMsgHead*)buf)->vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) { + if (buf) { + rpcFreeCont(buf); + } + return code; + } + + tEncoderClear(&encoder); + + msg.info.noResp = 1; + initRpcMsg(&msg,TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); + + tmsgSendReq(pEpSet, &msg); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, + pReq->taskId, vgId); + + return 0; +} + +int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { + void* buf = NULL; + int32_t code = -1; + SRpcMsg msg = {0}; + + // serialize + int32_t tlen; + tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code); + if (code < 0) { + goto FAIL; + } + + code = -1; + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + goto FAIL; + } + + ((SMsgHead*)buf)->vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) { + goto FAIL; + } + tEncoderClear(&encoder); + + initRpcMsg(&msg, pTask->msgInfo.msgType, buf, tlen + sizeof(SMsgHead)); + qDebug("s-task:%s dispatch msg to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); + + return tmsgSendReq(pEpSet, &msg); + +FAIL: + if (buf) { + rpcFreeCont(buf); + } + + return code; +} + +int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, + int64_t groupId) { + uint32_t hashValue = 0; + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + if (pTask->pNameMap == NULL) { + pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + } + + void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); + if (pVal) { + SBlockName* pBln = (SBlockName*)pVal; + hashValue = pBln->hashValue; + if (!pDataBlock->info.parTbName[0]) { + memset(pDataBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); + memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName)); + } + } else { + char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); + if (ctbName == NULL) { + return -1; + } + + if (pDataBlock->info.parTbName[0]) { + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + } else { + buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + } + + /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ + SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo; + hashValue = + taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); + taosMemoryFree(ctbName); + SBlockName bln = {0}; + bln.hashValue = hashValue; + memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); + if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { + tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); + } + } + + bool found = false; + // TODO: optimize search + int32_t j; + for (j = 0; j < vgSz; j++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + ASSERT(pVgInfo->vgId > 0); + + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + return -1; + } + + if (pReqs[j].blockNum == 0) { + atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + } + + pReqs[j].blockNum++; + found = true; + break; + } + } + ASSERT(found); + return 0; +} + +void doRetryDispatchData(void* param, void* tmrId) { + SStreamTask* pTask = param; + ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); + + int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); + if (code != TSDB_CODE_SUCCESS) { + qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); + atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bcb479e71e..3d8c14d1f2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -16,8 +16,6 @@ #include "streamInt.h" // maximum allowed processed block batches. One block may include several submit blocks -#define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 4 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 static int32_t updateCheckPointInfo(SStreamTask* pTask); @@ -399,61 +397,6 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, - const char* id) { - int32_t retryTimes = 0; - int32_t MAX_RETRY_TIMES = 5; - - while (1) { - if (streamTaskShouldPause(&pTask->status)) { - qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); - return TSDB_CODE_SUCCESS; - } - - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); - if (qItem == NULL) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { - taosMsleep(10); - qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); - continue; - } - - qDebug("===stream===break batchSize:%d", *numOfBlocks); - return TSDB_CODE_SUCCESS; - } - - // do not merge blocks for sink node - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - *numOfBlocks = 1; - *pInput = qItem; - return TSDB_CODE_SUCCESS; - } - - if (*pInput == NULL) { - ASSERT((*numOfBlocks) == 0); - *pInput = qItem; - } else { - // todo we need to sort the data block, instead of just appending into the array list. - void* newRet = streamMergeQueueItem(*pInput, qItem); - if (newRet == NULL) { - qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); - streamQueueProcessFail(pTask->inputQueue); - return TSDB_CODE_SUCCESS; - } - - *pInput = newRet; - } - - *numOfBlocks += 1; - streamQueueProcessSuccess(pTask->inputQueue); - - if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); - return TSDB_CODE_SUCCESS; - } - } -} - /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -468,21 +411,28 @@ int32_t streamExecForAll(SStreamTask* pTask) { // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); - /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); + /*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); } - break; + // no data in the inputQ, return now + return 0; } if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); - qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); - streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); - continue; + if (pInput->type == STREAM_INPUT__DATA_BLOCK) { + qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); + streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); + continue; + } + } else { + ASSERT(pInput->type == STREAM_INPUT__CHECKPOINT); + ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); + pTask->status.taskStatus = TASK_STATUS__CK_READY; + return 0; } int64_t st = taosGetTimestampMs(); @@ -519,6 +469,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + } else if (pItem->type == STREAM_CHECKPOINT) { + const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*) pInput; + qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); } else { ASSERT(0); } @@ -533,9 +486,14 @@ int32_t streamExecForAll(SStreamTask* pTask) { id, el, resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); - } - return 0; + // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. + if (pInput->type == STREAM_INPUT__CHECKPOINT) { + ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); + pTask->status.taskStatus = TASK_STATUS__CK_READY; + return 0; + } + } } bool streamTaskIsIdle(const SStreamTask* pTask) { @@ -574,9 +532,27 @@ int32_t streamTryExec(SStreamTask* pTask) { qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); - if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && - (!streamTaskShouldPause(&pTask->status))) { - streamSchedExec(pTask); + if (pTask->status.taskStatus == TASK_STATUS__CK_READY) { + // check for all tasks, and do generate the vnode-wide checkpoint data. + // todo extract method + SStreamMeta* pMeta = pTask->pMeta; + int32_t remain = atomic_sub_fetch_32(&pMeta->notCkptReadyTasks, 1); + if (remain <= 0) { // all tasks are in TASK_STATUS__CK_READY state + streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); + } + + // send check point response to upstream task + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + streamTaskSendCheckpointSourceRsp(pTask, pMeta->vgId); + } else { + streamTaskSendCheckpointRsp(pTask, pMeta->vgId); + } + + } else { + if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && + (!streamTaskShouldPause(&pTask->status))) { + streamSchedExec(pTask); + } } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index aaf9fdec72..18d2d6b7a5 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -107,8 +107,8 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { } #endif -#define MAX_STREAM_EXEC_BATCH_NUM 128 -#define MIN_STREAM_EXEC_BATCH_NUM 16 +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 4 // todo refactor: // read data from input queue @@ -119,6 +119,7 @@ typedef struct SQueueReader { int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms } SQueueReader; +#if 0 SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) { int32_t numOfBlocks = 0; int32_t tryCount = 0; @@ -164,3 +165,58 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* return pRet; } +#endif + +int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id) { + int32_t retryTimes = 0; + int32_t MAX_RETRY_TIMES = 5; + + while (1) { + if (streamTaskShouldPause(&pTask->status)) { + qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { + taosMsleep(10); + qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); + continue; + } + + qDebug("===stream===break batchSize:%d", *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + // do not merge blocks for sink node and check point data block + if ((pTask->info.taskLevel == TASK_LEVEL__SINK) || (qItem->type == STREAM_INPUT__CHECKPOINT)) { + *numOfBlocks = 1; + *pInput = qItem; + return TSDB_CODE_SUCCESS; + } + + if (*pInput == NULL) { + ASSERT((*numOfBlocks) == 0); + *pInput = qItem; + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = streamMergeQueueItem(*pInput, qItem); + if (newRet == NULL) { + qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + streamQueueProcessFail(pTask->inputQueue); + return TSDB_CODE_SUCCESS; + } + + *pInput = newRet; + } + + *numOfBlocks += 1; + streamQueueProcessSuccess(pTask->inputQueue); + + if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + return TSDB_CODE_SUCCESS; + } + } +} diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index ffce0af79e..1cf1be4ba2 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -286,29 +286,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) { return streamScanExec(pTask, 100); } -int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { - SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; - - // serialize - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - req.taskId = pTask->fixedEpDispatcher.taskId; - streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgs = taosArrayGetSize(vgInfo); - - qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr, - numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus)); - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.taskId = pVgInfo->taskId; - streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - } - } - - return 0; -} - static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1;