fix(stream): send hb rsp explicitly.
This commit is contained in:
parent
b361eb4a7c
commit
8740c3cb4a
|
@ -29,7 +29,7 @@
|
||||||
|
|
||||||
#define MND_STREAM_MAX_NUM 60
|
#define MND_STREAM_MAX_NUM 60
|
||||||
|
|
||||||
typedef struct SMStreamNodeCheckMsg {
|
typedef struct {
|
||||||
int8_t placeHolder; // // to fix windows compile error, define place holder
|
int8_t placeHolder; // // to fix windows compile error, define place holder
|
||||||
} SMStreamNodeCheckMsg;
|
} SMStreamNodeCheckMsg;
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
#include "mndStream.h"
|
#include "mndStream.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t placeholder; // placeholder
|
||||||
|
} SMStreamHbRspMsg;
|
||||||
|
|
||||||
typedef struct SFailedCheckpointInfo {
|
typedef struct SFailedCheckpointInfo {
|
||||||
int64_t streamUid;
|
int64_t streamUid;
|
||||||
int64_t checkpointId;
|
int64_t checkpointId;
|
||||||
|
@ -222,11 +226,11 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
|
||||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SStreamHbMsg req = {0};
|
SStreamHbMsg req = {0};
|
||||||
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
SArray *pFailedTasks = NULL;
|
||||||
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
SArray *pOrphanTasks = NULL;
|
||||||
|
|
||||||
if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){
|
if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
|
||||||
if(suspendAllStreams(pMnode, &pReq->info) < 0){
|
if (suspendAllStreams(pMnode, &pReq->info) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -244,6 +248,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||||
|
|
||||||
|
pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||||
|
pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
||||||
|
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
// extract stream task list
|
// extract stream task list
|
||||||
|
@ -349,5 +356,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
taosArrayDestroy(pFailedTasks);
|
taosArrayDestroy(pFailedTasks);
|
||||||
taosArrayDestroy(pOrphanTasks);
|
taosArrayDestroy(pOrphanTasks);
|
||||||
|
|
||||||
|
{
|
||||||
|
pReq->info.handle = NULL; // disable auto rsp
|
||||||
|
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)};
|
||||||
|
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue