Merge pull request #13565 from taosdata/feature/stream
feat(stream): snode process stream msg
This commit is contained in:
commit
3ce93a18e6
|
@ -172,10 +172,6 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
|
||||||
|
@ -186,12 +182,8 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
|
||||||
|
@ -205,11 +197,12 @@ enum {
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_QND_MSG)
|
TD_NEW_MSG_SEG(TDMT_QND_MSG)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_SND_MSG)
|
//shared by snode and vnode
|
||||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
|
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
|
||||||
|
|
|
@ -210,7 +210,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -94,9 +94,10 @@ SArray *smGetMsgHandles() {
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
// Requests handled by SNODE
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
/*if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;*/
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
|
@ -210,8 +210,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma,
|
dDebug("vgId:%d, start to create vnode, tsma:%d standby:%d", createReq.vgId, createReq.isTsma, createReq.standby);
|
||||||
createReq.standby);
|
|
||||||
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
|
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
|
||||||
|
|
||||||
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
|
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
|
||||||
|
@ -333,11 +332,6 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_CONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_DISCONNECT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
|
||||||
// if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_SET_CUR, vmPutMsgToWriteQueue, 0)== NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
@ -352,13 +346,14 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -95,6 +95,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
int32_t vgId = ntohl(pHead->vgId);
|
int32_t vgId = ntohl(pHead->vgId);
|
||||||
if (vgId == QNODE_HANDLE) {
|
if (vgId == QNODE_HANDLE) {
|
||||||
pWrapper = &pDnode->wrappers[QNODE];
|
pWrapper = &pDnode->wrappers[QNODE];
|
||||||
|
} else if (vgId == SNODE_HANDLE) {
|
||||||
|
pWrapper = &pDnode->wrappers[SNODE];
|
||||||
} else if (vgId == MNODE_HANDLE) {
|
} else if (vgId == MNODE_HANDLE) {
|
||||||
pWrapper = &pDnode->wrappers[MNODE];
|
pWrapper = &pDnode->wrappers[MNODE];
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -132,7 +132,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
|
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,7 +156,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
|
||||||
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
terrno = TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
|
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,7 +222,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
|
||||||
// dispatch
|
// dispatch
|
||||||
pTask->dispatchType = TASK_DISPATCH__NONE;
|
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||||
|
|
||||||
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
|
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -267,8 +267,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
|
||||||
// dispatch
|
// dispatch
|
||||||
pTask->dispatchType = TASK_DISPATCH__NONE;
|
pTask->dispatchType = TASK_DISPATCH__NONE;
|
||||||
|
|
||||||
/*mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);*/
|
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pStream->fixedSinkVg.vgId);
|
||||||
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pStream->fixedSinkVg.vgId);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -361,7 +360,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
ASSERT(taosArrayGetSize(pArray) == 1);
|
ASSERT(taosArrayGetSize(pArray) == 1);
|
||||||
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
|
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
|
||||||
/*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/
|
/*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/
|
||||||
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
||||||
|
|
||||||
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
|
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
|
||||||
|
@ -407,7 +406,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
|
||||||
|
|
||||||
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
|
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
|
||||||
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
||||||
ASSERT(pDb);
|
ASSERT(pDb);
|
||||||
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
|
||||||
|
@ -438,7 +437,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
} else {
|
} else {
|
||||||
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
||||||
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
|
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
|
||||||
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
|
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
|
||||||
// one sink only
|
// one sink only
|
||||||
ASSERT(taosArrayGetSize(pArray) == 1);
|
ASSERT(taosArrayGetSize(pArray) == 1);
|
||||||
|
|
|
@ -54,8 +54,8 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
};
|
};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);
|
/*mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);*/
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
|
||||||
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,7 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
// stream deploy
|
// stream deploy
|
||||||
// stream stop/resume
|
// stream stop/resume
|
||||||
// operator exec
|
// operator exec
|
||||||
if (pMsg->msgType == TDMT_SND_TASK_DEPLOY) {
|
if (pMsg->msgType == TDMT_STREAM_TASK_DEPLOY) {
|
||||||
void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask));
|
SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask));
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
|
|
@ -158,7 +158,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_TASK_DEPLOY: {
|
case TDMT_STREAM_TASK_DEPLOY: {
|
||||||
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
}
|
}
|
||||||
|
@ -238,21 +238,19 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
|
|
||||||
case TDMT_VND_TABLE_META:
|
case TDMT_VND_TABLE_META:
|
||||||
return vnodeGetTableMeta(pVnode, pMsg);
|
return vnodeGetTableMeta(pVnode, pMsg);
|
||||||
|
|
||||||
case TDMT_VND_CONSUME:
|
case TDMT_VND_CONSUME:
|
||||||
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
|
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
|
||||||
|
|
||||||
case TDMT_VND_TASK_RUN: {
|
case TDMT_STREAM_TASK_RUN:
|
||||||
int32_t code = tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
||||||
pMsg->pCont = NULL;
|
case TDMT_STREAM_TASK_DISPATCH:
|
||||||
return code;
|
|
||||||
}
|
|
||||||
case TDMT_VND_TASK_DISPATCH:
|
|
||||||
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
|
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_TASK_RECOVER:
|
case TDMT_STREAM_TASK_RECOVER:
|
||||||
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_TASK_DISPATCH_RSP:
|
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
||||||
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_TASK_RECOVER_RSP:
|
case TDMT_STREAM_TASK_RECOVER_RSP:
|
||||||
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||||
|
@ -262,9 +260,9 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
|
|
||||||
int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
vTrace("message in write queue is processing");
|
vTrace("message in write queue is processing");
|
||||||
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
SDeleteRes res = {0};
|
SDeleteRes res = {0};
|
||||||
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
|
|
|
@ -26,7 +26,7 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
|
||||||
pRunReq->streamId = pTask->streamId;
|
pRunReq->streamId = pTask->streamId;
|
||||||
pRunReq->taskId = pTask->taskId;
|
pRunReq->taskId = pTask->taskId;
|
||||||
SRpcMsg msg = {
|
SRpcMsg msg = {
|
||||||
.msgType = TDMT_VND_TASK_RUN,
|
.msgType = TDMT_STREAM_TASK_RUN,
|
||||||
.pCont = pRunReq,
|
.pCont = pRunReq,
|
||||||
.contLen = sizeof(SStreamTaskRunReq),
|
.contLen = sizeof(SStreamTaskRunReq),
|
||||||
};
|
};
|
||||||
|
|
|
@ -190,9 +190,9 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* dat
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qType;
|
int32_t qType;
|
||||||
if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) {
|
if (pTask->dispatchMsgType == TDMT_STREAM_TASK_DISPATCH) {
|
||||||
qType = FETCH_QUEUE;
|
qType = FETCH_QUEUE;
|
||||||
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH_WRITE) {
|
} else if (pTask->dispatchMsgType == TDMT_VND_STREAM_DISPATCH_WRITE) {
|
||||||
qType = WRITE_QUEUE;
|
qType = WRITE_QUEUE;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
Loading…
Reference in New Issue