Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0
This commit is contained in:
commit
a9e3ca41a9
|
@ -26,6 +26,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
|||
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
|
||||
bool isLeader, bool restored);
|
||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||
|
|
|
@ -88,6 +88,7 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
|
|
|
@ -837,6 +837,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
|
||||
#define MND_STREAM_MAX_NUM 60
|
||||
|
||||
typedef struct SMStreamNodeCheckMsg {
|
||||
typedef struct {
|
||||
int8_t placeHolder; // // to fix windows compile error, define place holder
|
||||
} SMStreamNodeCheckMsg;
|
||||
|
||||
|
|
|
@ -16,6 +16,10 @@
|
|||
#include "mndStream.h"
|
||||
#include "mndTrans.h"
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
} SMStreamHbRspMsg;
|
||||
|
||||
typedef struct SFailedCheckpointInfo {
|
||||
int64_t streamUid;
|
||||
int64_t checkpointId;
|
||||
|
@ -222,11 +226,11 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
|
|||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SStreamHbMsg req = {0};
|
||||
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
||||
SArray *pFailedTasks = NULL;
|
||||
SArray *pOrphanTasks = NULL;
|
||||
|
||||
if(grantCheckExpire(TSDB_GRANT_STREAMS) < 0){
|
||||
if(suspendAllStreams(pMnode, &pReq->info) < 0){
|
||||
if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) {
|
||||
if (suspendAllStreams(pMnode, &pReq->info) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -244,6 +248,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
|
||||
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||
|
||||
pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||
pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
||||
// extract stream task list
|
||||
|
@ -349,5 +356,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
taosArrayDestroy(pFailedTasks);
|
||||
taosArrayDestroy(pOrphanTasks);
|
||||
|
||||
{
|
||||
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)};
|
||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||
SMsgHead* pHead = rsp.pCont;
|
||||
pHead->vgId = htonl(req.vgId);
|
||||
|
||||
tmsgSendRsp(&rsp);
|
||||
|
||||
pReq->info.handle = NULL; // disable auto rsp
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -178,6 +178,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
||||
return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
|
||||
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
||||
return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg);
|
||||
default:
|
||||
sndError("invalid snode msg:%d", pMsg->msgType);
|
||||
ASSERT(0);
|
||||
|
|
|
@ -242,6 +242,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||
int32_t tqScanWal(STQ* pTq);
|
||||
|
|
|
@ -1220,3 +1220,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
|
||||
// this function is needed, do not try to remove it.
|
||||
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
|
|
|
@ -936,4 +936,11 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
|||
|
||||
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
|
||||
return taosArrayGetSize(pMeta->pTaskList);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -796,6 +796,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY:
|
||||
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
||||
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
|
|
@ -164,7 +164,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt
|
|||
} else {
|
||||
pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey);
|
||||
}
|
||||
SSessionKey tmpKey = {0};
|
||||
SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN};
|
||||
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pAggSup->stateStore.streamStateFreeCur(pCur);
|
||||
|
|
|
@ -1160,7 +1160,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
|||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg msg = {.info.noResp = 1};
|
||||
SRpcMsg msg = {0};
|
||||
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
|
||||
|
||||
pMeta->pHbInfo->hbCount += 1;
|
||||
|
|
|
@ -961,7 +961,7 @@ static void cliSendCb(uv_write_t* req, int status) {
|
|||
tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
|
||||
}
|
||||
}
|
||||
if (pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) {
|
||||
if (pMsg != NULL && pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) {
|
||||
rpcFreeCont(pMsg->msg.pCont);
|
||||
pMsg->msg.pCont = 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue