diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4e0c76f6de..e9a8820c0c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -29,7 +29,7 @@ #define MND_STREAM_MAX_NUM 60 -typedef struct SMStreamNodeCheckMsg { +typedef struct { int8_t placeHolder; // // to fix windows compile error, define place holder } SMStreamNodeCheckMsg; diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 5de442951c..e94daf6c93 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -16,6 +16,10 @@ #include "mndStream.h" #include "mndTrans.h" +typedef struct { + int8_t placeholder; // placeholder +} SMStreamHbRspMsg; + typedef struct SFailedCheckpointInfo { int64_t streamUid; int64_t checkpointId; @@ -222,11 +226,11 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; - SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); - SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); + SArray *pFailedTasks = NULL; + SArray *pOrphanTasks = NULL; - if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){ - if(suspendAllStreams(pMnode, &pReq->info) < 0){ + if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) { + if (suspendAllStreams(pMnode, &pReq->info) < 0) { return -1; } } @@ -244,6 +248,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); + pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); + pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); + taosThreadMutexLock(&execInfo.lock); // extract stream task list @@ -349,5 +356,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosArrayDestroy(pFailedTasks); taosArrayDestroy(pOrphanTasks); + { + pReq->info.handle = NULL; // disable auto rsp + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + tmsgSendRsp(&rsp); + } + return TSDB_CODE_SUCCESS; }