diff --git a/include/common/tmsg.h b/include/common/tmsg.h index addb84046c..d9087f59c6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2589,18 +2589,6 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp); } -typedef struct { - int64_t streamId; - int32_t taskId; - int32_t sourceVg; - int64_t sourceVer; - SArray* data; // SArray -} SStreamDispatchReq; - -typedef struct { - int8_t inputStatus; -} SStreamDispatchRsp; - #define TD_AUTO_CREATE_TABLE 0x1 typedef struct { int64_t suid; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 93b2e75360..455898585a 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -200,6 +200,10 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bca947b84c..1604749af8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) if (ref == 0) { taosMemoryFree(pDataSubmit->data); taosMemoryFree(pDataSubmit->dataRef); - taosFreeQitem(pDataSubmit); + // taosFreeQitem(pDataSubmit); } } @@ -286,6 +286,36 @@ typedef struct { int32_t taskId; } SStreamTaskRunReq; +typedef struct { + int64_t streamId; + int32_t taskId; + int32_t sourceTaskId; + int32_t sourceVg; +#if 0 + int64_t sourceVer; +#endif + SArray* data; // SArray +} SStreamDispatchReq; + +typedef struct { + int64_t streamId; + int32_t taskId; + int8_t inputStatus; +} SStreamDispatchRsp; + +typedef struct { + int64_t streamId; + int32_t taskId; + int32_t sourceTaskId; + int32_t sourceVg; +} SStreamTaskRecoverReq; + +typedef struct { + int64_t streamId; + int32_t taskId; + int8_t inputStatus; +} SStreamTaskRecoverRsp; + int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input); int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input); int32_t streamDequeueOutput(SStreamTask* pTask, void** output); @@ -296,6 +326,12 @@ int32_t streamTaskRun(SStreamTask* pTask); int32_t streamTaskHandleInput(SStreamTask* pTask, void* data); +int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb); +int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg); +int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp); +int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); +int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 3da3d90ae1..f28209f982 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -314,6 +314,10 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1c89649313..23825e6f4a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -121,10 +121,18 @@ int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); -int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); +#if 0 +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); +int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); +#endif +int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data); +int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); // sma int32_t smaOpen(SVnode* pVnode); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9361c0e6d2..a8f9e8db00 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -105,12 +105,11 @@ static void tdSRowDemo() { } int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { - void* pIter = NULL; - STqExec* pExec = NULL; + void* pIter = NULL; while (1) { pIter = taosHashIterate(pTq->execs, pIter); if (pIter == NULL) break; - pExec = (STqExec*)pIter; + STqExec* pExec = (STqExec*)pIter; if (pExec->subType == TOPIC_SUB_TYPE__DB) { if (!isAdd) { int32_t sz = taosArrayGetSize(tbUidList); @@ -275,6 +274,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) } memcpy(data, msg, msgLen); + tqProcessStreamTriggerNew(pTq, data); + +#if 0 SRpcMsg req = { .msgType = TDMT_VND_STREAM_TRIGGER, .pCont = data, @@ -282,6 +284,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) }; tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req); +#endif return 0; } @@ -980,12 +983,24 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { + pTask->status = TASK_STATUS__IDLE; + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + + pTask->inputQ = taosOpenQueue(); + pTask->outputQ = taosOpenQueue(); + pTask->inputQAll = taosAllocateQall(); + pTask->outputQAll = taosAllocateQall(); + + if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL) + goto FAIL; + if (pTask->execType != TASK_EXEC__NONE) { // expand runners pTask->exec.numOfRunners = parallel; pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); if (pTask->exec.runners == NULL) { - return -1; + goto FAIL; } for (int32_t i = 0; i < parallel; i++) { STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); @@ -1007,6 +1022,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { } return 0; +FAIL: + if (pTask->inputQ) taosCloseQueue(pTask->inputQ); + if (pTask->outputQ) taosCloseQueue(pTask->outputQ); + if (pTask->inputQAll) taosFreeQall(pTask->inputQAll); + if (pTask->outputQAll) taosFreeQall(pTask->outputQAll); + if (pTask) taosMemoryFree(pTask); + return -1; } int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { @@ -1058,6 +1080,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo return 0; } +#if 0 int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) { SStreamDataSubmit* pSubmit = NULL; @@ -1108,6 +1131,7 @@ FAIL: } return -1; } +#endif int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) { SStreamTaskExecReq req; @@ -1125,25 +1149,28 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) return 0; } -int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { +int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* pReq) { void* pIter = NULL; bool failed = false; SStreamDataSubmit* pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); if (pSubmit == NULL) { failed = true; + goto SET_TASK_FAIL; } pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t)); if (pSubmit->dataRef == NULL) { failed = true; + goto SET_TASK_FAIL; } - pSubmit->type = STREAM_DATA_TYPE_SUBMIT_BLOCK; - pSubmit->sourceVer = ver; - pSubmit->sourceVg = pTq->pVnode->config.vgId; + pSubmit->type = STREAM_INPUT__DATA_SUBMIT; + /*pSubmit->sourceVer = ver;*/ + /*pSubmit->sourceVg = pTq->pVnode->config.vgId;*/ pSubmit->data = pReq; *pSubmit->dataRef = 1; +SET_TASK_FAIL: while (1) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; @@ -1162,7 +1189,18 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { int8_t execStatus = atomic_load_8(&pTask->status); if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { - // TODO dispatch task launch msg to fetch queue + SStreamTaskRunReq* pRunReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) continue; + // TODO: do we need htonl? + pRunReq->head.vgId = pTq->pVnode->config.vgId; + pRunReq->streamId = pTask->streamId; + pRunReq->taskId = pTask->taskId; + SRpcMsg msg = { + .msgType = TDMT_VND_TASK_RUN, + .pCont = pRunReq, + .contLen = sizeof(SStreamTaskRunReq), + }; + tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg); } } else { @@ -1174,11 +1212,53 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) { streamDataSubmitRefDec(pSubmit); return 0; } else { + if (pSubmit) { + if (pSubmit->dataRef) { + taosMemoryFree(pSubmit->dataRef); + } + taosFreeQitem(pSubmit); + } return -1; } } -int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // + SStreamTaskRunReq* pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessRunReq(pTask, &pTq->pVnode->msgCb); + return 0; +} + +int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { + SStreamDispatchReq* pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); + return 0; +} + +int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { + SStreamTaskRecoverReq* pReq = pMsg->pCont; + int32_t taskId = pReq->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); + return 0; +} + +int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { + SStreamDispatchRsp* pRsp = pMsg->pCont; + int32_t taskId = pRsp->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp); + return 0; +} + +int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { + SStreamTaskRecoverRsp* pRsp = pMsg->pCont; + int32_t taskId = pRsp->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + streamTaskProcessRecoverRsp(pTask, pRsp); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index dfd78d9dca..b6600b5d0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -106,11 +106,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pMsg->contLen - sizeof(SMsgHead)) < 0) { } } break; +#if 0 case TDMT_VND_TASK_WRITE_EXEC: { if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead), 0) < 0) { } } break; +#endif case TDMT_VND_ALTER_VNODE: break; default: @@ -181,11 +183,25 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId); + + case TDMT_VND_TASK_RUN: + return tqProcessTaskRunReq(pVnode->pTq, pMsg); + case TDMT_VND_TASK_DISPATCH: + return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); + case TDMT_VND_TASK_RECOVER: + return tqProcessTaskRecoverReq(pVnode->pTq, pMsg); + case TDMT_VND_TASK_DISPATCH_RSP: + return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); + case TDMT_VND_TASK_RECOVER_RSP: + return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); + +#if 0 case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_MERGE_EXEC: return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); case TDMT_VND_STREAM_TRIGGER: return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); +#endif case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 38b6f2b0e2..66a661481e 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -68,7 +68,7 @@ static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMs // get groupId, compute hash value uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); - // + // get node // TODO: optimize search process SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; @@ -152,13 +152,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) // exec while (1) { - SSDataBlock* output; + SSDataBlock* output = NULL; uint64_t ts = 0; if (qExecTask(exec, &output, &ts) < 0) { ASSERT(false); } if (output == NULL) break; - taosArrayPush(pRes, &output); + taosArrayPush(pRes, output); } // destroy @@ -189,7 +189,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { taosFreeQitem(data); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->blocks = pRes; taosWriteQitem(pTask->outputQ, resQ); @@ -209,7 +209,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { taosFreeQitem(data); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->blocks = pRes; taosWriteQitem(pTask->outputQ, resQ); @@ -231,7 +231,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { taosFreeQitem(data); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->blocks = pRes; taosWriteQitem(pTask->outputQ, resQ); @@ -253,7 +253,7 @@ int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { taosFreeQitem(data); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM); + SStreamDataBlock* resQ = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); resQ->type = STREAM_INPUT__DATA_BLOCK; resQ->blocks = pRes; taosWriteQitem(pTask->outputQ, resQ); @@ -392,12 +392,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* // 1.2 enqueue pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK; pBlock->sourceVg = pReq->sourceVg; - pBlock->sourceVer = pReq->sourceVer; + /*pBlock->sourceVer = pReq->sourceVer;*/ taosWriteQitem(pTask->inputQ, pBlock); // 1.3 rsp by input status SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp)); pCont->inputStatus = status; + pCont->streamId = pReq->streamId; + pCont->taskId = pReq->sourceTaskId; pRsp->pCont = pCont; pRsp->contLen = sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); @@ -439,12 +441,12 @@ int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { return 0; } -int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) { +int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) { // return 0; } -int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, char* msg) { +int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) { // return 0; }