diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1f2d597496..11756c47ba 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -253,6 +253,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9e2705461d..e1af12389b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -270,6 +270,7 @@ typedef struct SStreamStatus { int8_t taskStatus; int8_t schedStatus; int8_t keepTaskStatus; + bool transferState; } SStreamStatus; typedef struct SHistDataRange { @@ -454,10 +455,10 @@ typedef struct { int64_t streamId; int32_t taskId; int32_t childId; -} SStreamRecoverFinishReq; +} SStreamRecoverFinishReq, SStreamTransferReq; -int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq); -int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq); +int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq); +int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq); typedef struct { int64_t streamId; @@ -583,6 +584,9 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask); int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq); int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask); + +int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); + // agg level int32_t streamAggRecoverPrepare(SStreamTask* pTask); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 814a155cfb..d9081e6e43 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -728,8 +728,9 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index cf1481a113..41fb50a0df 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -277,7 +277,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); - tDecodeSStreamRecoverFinishReq(&decoder, &req); + tDecodeStreamRecoverFinishReq(&decoder, &req); tDecoderClear(&decoder); // find task diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 635893fee2..57e7ad8abf 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -245,7 +245,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckLogInWal(STQ* pTq, int64_t version); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a7adf247c8..71e28c3bad 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1102,29 +1102,35 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); - if (pTask->info.fillHistory) {/* - // 1. stop the related stream task, get the current scan wal version of stream task, ver1. + if (pTask->info.fillHistory) { + // 1. stop the related stream task, get the current scan wal version of stream task, ver. SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { // todo handle error } + // todo here we should use another flag to avoid user resume the task pStreamTask->status.taskStatus = TASK_STATUS__PAUSE; - // if it's an source task, extract the last version in wal. + int64_t ver = pTask->dataRange.range.maxVer; + int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + ASSERT(latestVer >= ver); + ver = latestVer; - // 2. wait for downstream tasks to completed + // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1] - // 3. do secondary scan of the history data scan, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1] - + // 3. notify the downstream tasks to transfer executor state after handle all history blocks. + code = streamDispatchTransferStateMsg(pTask); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. - - // 5. resume the related stream task. -*/ + + streamMetaReleaseTask(pMeta, pTask); } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of @@ -1138,53 +1144,23 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { return code; } -#if 0 - // build msg to launch next step - SStreamRecoverStep2Req req; - code = streamBuildSourceRecover2Req(pTask, &req); - if (code < 0) { - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return -1; - } - - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { - return 0; - } - - // serialize msg - int32_t len = sizeof(SStreamRecoverStep1Req); - - void* serializedReq = rpcMallocCont(len); - if (serializedReq == NULL) { - tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr); - return -1; - } - - memcpy(serializedReq, &req, len); - - // dispatch msg - tqDebug("s-task:%s start recover block stage", pTask->id.idStr); - - SRpcMsg rpcMsg = { - .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; - tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg); -#endif - return 0; } -int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - int32_t code = 0; - - SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; +// notify the downstream tasks to transfer executor state after handle all history blocks. +int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { + SStreamTransferReq* pReq = (SStreamTransferReq*)msg; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask == NULL) { return -1; } - // do recovery step 2 + ASSERT(pTask->streamTaskId.taskId != 0); + pTask->status.transferState = true; // persistent data? + +#if 0 + // do check if current task handle all data in the input queue int64_t st = taosGetTimestampMs(); tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st); @@ -1231,8 +1207,11 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t atomic_store_8(&pTask->info.fillHistory, 0); streamMetaSaveTask(pTq->pStreamMeta, pTask); +#endif + streamSchedExec(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); + return 0; } @@ -1245,7 +1224,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeSStreamRecoverFinishReq(&decoder, &req); + tDecodeStreamRecoverFinishReq(&decoder, &req); tDecoderClear(&decoder); // find task diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 3a806ff0de..47b899aa3d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -326,10 +326,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; - if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) { - if (tqCheckLogInWal(pVnode->pTq, ver)) return 0; - } - // skip header pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead); @@ -425,11 +421,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } } break; - case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: { - if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { - goto _err; - } - } break; case TDMT_STREAM_TASK_CHECK_RSP: { if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) { goto _err; @@ -593,6 +584,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE: return tqProcessTaskRecover1Req(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: + return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen); case TDMT_STREAM_RECOVER_FINISH: return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_RECOVER_FINISH_RSP: diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c01a95b288..c866e2cc21 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -267,8 +267,8 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR msg.pCont = buf; msg.msgType = TDMT_STREAM_TASK_CHECK; - qDebug("s-task:%s dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, - pReq->streamId, pReq->downstreamTaskId, nodeId); + qDebug("s-task:%s (level:%d) dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, + pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); return 0; @@ -281,7 +281,7 @@ int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecove SRpcMsg msg = {0}; int32_t tlen; - tEncodeSize(tEncodeSStreamRecoverFinishReq, pReq, tlen, code); + tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code); if (code < 0) { return -1; } @@ -297,7 +297,7 @@ int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecove SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) { + if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) { if (buf) { rpcFreeCont(buf); } @@ -385,8 +385,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); } - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo; hashValue = diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e4d28e7dd7..5264a5f043 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -386,6 +386,23 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pInput == NULL) { + if (pTask->info.fillHistory && pTask->status.transferState) { + // todo transfer task state here + SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); + ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); + + ASSERT(pStreamTask->status.taskStatus == STREAM_STATUS__PAUSE); + + // update the scan data range for source task. + + + streamSetStatusNormal(pStreamTask); + streamSchedExec(pStreamTask); + + streamMetaReleaseTask(pTask->pMeta, pStreamTask); + + // todo set the task with specified status, to avoid execute this process again + } break; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f1e43df230..9efe6fec42 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -285,6 +285,72 @@ int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) { return 0; } +static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) { + void* buf = NULL; + int32_t code = -1; + SRpcMsg msg = {0}; + + int32_t tlen; + tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code); + if (code < 0) { + return -1; + } + + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + ((SMsgHead*)buf)->vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) { + if (buf) { + rpcFreeCont(buf); + } + return code; + } + + tEncoderClear(&encoder); + + msg.contLen = tlen + sizeof(SMsgHead); + msg.pCont = buf; + msg.msgType = TDMT_STREAM_TRANSFER_STATE; + msg.info.noResp = 1; + + tmsgSendReq(pEpSet, &msg); + qDebug("s-task:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId); + + return 0; +} + +int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { + SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; + + // serialize + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr, + pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus); + + req.taskId = pTask->fixedEpDispatcher.taskId; + doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + + int32_t numOfVgs = taosArrayGetSize(vgInfo); + for (int32_t i = 0; i < numOfVgs; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + req.taskId = pVgInfo->taskId; + doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + } + } + + return 0; +} + // agg int32_t streamAggRecoverPrepare(SStreamTask* pTask) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); @@ -465,7 +531,7 @@ int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp return 0; } -int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) { +int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; @@ -473,7 +539,7 @@ int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverF tEndEncode(pEncoder); return pEncoder->pos; } -int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) { +int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;