diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d083d20058..83e0bd33a6 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -53,8 +53,7 @@ typedef struct { void* sContext; // SSnapContext* - void* pStateBackend; - int64_t fillHistoryVer1; + void* pStateBackend; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2a957b0d16..534d86b1f1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -445,6 +445,15 @@ typedef struct { int32_t taskId; } SStreamRecoverStep1Req, SStreamRecoverStep2Req; +typedef struct { + int64_t streamId; + int32_t taskId; + int32_t childId; +} SStreamRecoverFinishReq; + +int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq); +int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq); + #if 0 typedef struct { int64_t streamId; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 47a260c147..aea689c0de 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4853,6 +4853,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeCStr(&encoder, pReq->sourceDB) < 0) return -1; if (tEncodeCStr(&encoder, pReq->targetStbFullName) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; + if (tEncodeI8(&encoder, pReq->fillHistory) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1; if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1; @@ -4889,6 +4890,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea if (tDecodeCStrTo(&decoder, pReq->sourceDB) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->targetStbFullName) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->fillHistory) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1; if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 81940f539d..8bf1522d6c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -905,7 +905,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState, - .fillHistoryVer1 = pTask->fillHistory ? ver : -1, }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.executor); @@ -1090,9 +1089,24 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t code; // deserialize + int32_t len; + SStreamRecoverFinishReq req; + + SDecoder decoder; + tDecoderInit(&decoder, msg, sizeof(SStreamRecoverFinishReq)); + tDecodeSStreamRecoverFinishReq(&decoder, &req); + tDecoderClear(&decoder); + // find task + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, req.taskId); + if (pTask == NULL) { + return -1; + } // do process request - // + if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) { + return -1; + } + return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 12e3582c85..a7af6ca96c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -988,8 +988,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, - destroyBlockDistScanOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL); return pOperator; _error: @@ -2235,7 +2235,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pGroupTags = pTableScanNode->pGroupTags; int32_t numOfCols = 0; - int32_t code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + int32_t code = + extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); @@ -2330,7 +2331,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } else { taosArrayDestroy(pColIds); } - pTaskInfo->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1; // create the pseduo columns info if (pTableScanNode->scan.pScanPseudoCols != NULL) { @@ -2361,8 +2361,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pOperator->pTaskInfo = pTaskInfo; __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL); return pOperator; @@ -3930,8 +3929,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL); return pOperator; @@ -4043,7 +4041,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi int32_t num = 0; int32_t numOfExprs = 0; SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); - int32_t code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + int32_t code = + extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) { @@ -4066,8 +4065,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL); return pOperator; @@ -4563,7 +4561,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, + &pInfo->matchInfo); code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a18419c0f7..ebfbacaa9e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1832,7 +1832,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2638,7 +2639,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); @@ -2708,8 +2710,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL, - destroyStateWindowOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2782,8 +2784,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, - destroySWindowOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4233,13 +4235,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh .minTs = INT64_MAX, }; - if (pTaskInfo->streamInfo.fillHistoryVer1 != -1) { - pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger; - pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark; - pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - pInfo->twAggSup.deleteMark = INT64_MAX; - } - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; @@ -4264,9 +4259,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, - NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, + destroyStreamSessionAggOperatorInfo, NULL); if (downstream) { initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex); @@ -4411,9 +4405,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pOperator->name = "StreamSessionSemiAggOperator"; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL, - destroyStreamSessionAggOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL, + destroyStreamSessionAggOperatorInfo, NULL); } pInfo->pGroupIdTbNameMap = @@ -4747,13 +4740,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys .minTs = INT64_MAX, }; - if (pTaskInfo->streamInfo.fillHistoryVer1 != -1) { - pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger; - pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark; - pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - pInfo->twAggSup.deleteMark = INT64_MAX; - } - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); SExprSupp* pSup = &pOperator->exprSupp; @@ -4789,8 +4775,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->status = OP_NOT_OPENED; pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, - destroyStreamStateOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, destroyStreamStateOperatorInfo, NULL); initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, pInfo->primaryTsIndex); code = appendDownstream(pOperator, &downstream, 1); @@ -5066,8 +5052,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->pTaskInfo = pTaskInfo; pOperator->info = miaInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, - destroyMAIOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, destroyMAIOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5378,8 +5364,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge pOperator->pTaskInfo = pTaskInfo; pOperator->info = pMergeIntervalInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, - destroyMergeIntervalOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, destroyMergeIntervalOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5544,13 +5530,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); - if (pTaskInfo->streamInfo.fillHistoryVer1 != -1) { - pTaskInfo->streamInfo.triggerSaved = twAggSupp.calTrigger; - pTaskInfo->streamInfo.deleteMarkSaved = twAggSupp.deleteMark; - twAggSupp.calTrigger = STREAM_TRIGGER_AT_ONCE; - twAggSupp.deleteMark = INT64_MAX; - } - pOperator->pTaskInfo = pTaskInfo; pInfo->interval = interval; pInfo->twAggSup = twAggSupp; @@ -5618,9 +5597,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, - NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, + destroyStreamFinalIntervalOperatorInfo, NULL); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 45d9cfa50c..417ddfa80d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -210,6 +210,46 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis return 0; } +int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, + SEpSet* pEpSet) { + void* buf = NULL; + int32_t code = -1; + SRpcMsg msg = {0}; + + int32_t tlen; + tEncodeSize(tEncodeSStreamRecoverFinishReq, pReq, tlen, code); + if (code < 0) { + return -1; + } + + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + return -1; + } + + ((SMsgHead*)buf)->vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) { + goto FAIL; + } + tEncoderClear(&encoder); + + msg.contLen = tlen + sizeof(SMsgHead); + msg.pCont = buf; + msg.msgType = TDMT_VND_STREAM_RECOVER_FINISH; + + tmsgSendReq(pEpSet, &msg); + + code = 0; + return 0; +FAIL: + if (buf) rpcFreeCont(buf); + return code; +} + int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; @@ -244,9 +284,10 @@ int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, tmsgSendReq(pEpSet, &msg); code = 0; -FAIL: - if (code < 0 && buf) rpcFreeCont(buf); return 0; +FAIL: + if (buf) rpcFreeCont(buf); + return code; } int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ab6074f8d4..98e4e77cb0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -141,9 +141,10 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { return -1; } - SEncoder encoder; + SEncoder encoder = {0}; tEncoderInit(&encoder, buf, len); tEncodeSStreamTask(&encoder, pTask); + tEncoderClear(&encoder); if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) { ASSERT(0); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 7027e046a3..12de2fafc1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -62,8 +62,12 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { } int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { + SStreamRecoverFinishReq req = { + .streamId = pTask->streamId, + .taskId = pTask->taskId, + .childId = pTask->selfChildId, + }; if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - /*SStreamFillFinish*/ } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } return 0; @@ -87,6 +91,7 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { if (qStreamRecoverFinish(exec) < 0) { return -1; } + streamSetStatusNormal(pTask); return 0; } @@ -99,6 +104,22 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { return 0; } +int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} +int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} #if 0 int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -216,6 +237,7 @@ int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCh return 0; } +#if 0 int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq) { if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; @@ -258,6 +280,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea } return 0; } +#endif int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { #if 0 @@ -353,6 +376,7 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } +#if 0 int32_t streamFetchRecoverStatus(SStreamTask* pTask, const SVgroupInfo* pVgInfo) { int32_t taskId = pVgInfo->taskId; int32_t nodeId = pVgInfo->vgId; @@ -423,6 +447,7 @@ int32_t streamFetchDownstreamStatus(SStreamMeta* pMeta, SStreamTask* pTask) { } return 0; } +#endif #if 0 int32_t streamProcessFetchStatusRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {