Merge remote-tracking branch 'origin/3.0' into fix/dnode

This commit is contained in:
Shengliang Guan 2022-05-26 22:39:37 +08:00
commit 84b3de325a
9 changed files with 33 additions and 32 deletions

View File

@ -195,11 +195,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
@ -236,9 +233,13 @@ enum {
// Requests handled by SNODE // Requests handled by SNODE
TD_NEW_MSG_SEG(TDMT_SND_MSG) TD_NEW_MSG_SEG(TDMT_SND_MSG)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) //TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) //TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) //TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL)
// Requests handled by SCHEDULER // Requests handled by SCHEDULER
TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG)

View File

@ -96,7 +96,7 @@ SArray *smGetMsgHandles() {
// Requests handled by SNODE // Requests handled by SNODE
if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER; /*if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;*/
code = 0; code = 0;
_OVER: _OVER:

View File

@ -310,9 +310,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_PIPE_EXEC, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_MERGE_EXEC, vmPutNodeMsgToMergeQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_WRITE_EXEC, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;

View File

@ -359,7 +359,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// one merge only // one merge only
ASSERT(taosArrayGetSize(pArray) == 1); ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC; /*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
@ -404,7 +405,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE; pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDb); ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
@ -434,7 +436,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} }
} else { } else {
pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0); SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only // one sink only
ASSERT(taosArrayGetSize(pArray) == 1); ASSERT(taosArrayGetSize(pArray) == 1);

View File

@ -103,8 +103,8 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
sndMetaDeployTask(pSnode->pMeta, pTask); sndMetaDeployTask(pSnode->pMeta, pTask);
} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) { /*} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
sndProcessTaskExecReq(pSnode, pMsg); /*sndProcessTaskExecReq(pSnode, pMsg);*/
} else { } else {
ASSERT(0); ASSERT(0);
} }
@ -112,9 +112,9 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// operator exec // operator exec
if (pMsg->msgType == TDMT_SND_TASK_EXEC) { /*if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
sndProcessTaskExecReq(pSnode, pMsg); /*sndProcessTaskExecReq(pSnode, pMsg);*/
} else { /*} else {*/
ASSERT(0); ASSERT(0);
} /*}*/
} }

View File

@ -84,9 +84,9 @@ int32_t diffFunction(SqlFunctionCtx *pCtx);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx);
int32_t firstlastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);

View File

@ -1073,7 +1073,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = firstFunction, .processFunc = firstFunction,
.finalizeFunc = firstlastFinalize, .finalizeFunc = firstLastFinalize,
.combineFunc = firstCombine, .combineFunc = firstCombine,
}, },
{ {
@ -1084,7 +1084,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = lastFunction, .processFunc = lastFunction,
.finalizeFunc = firstlastFinalize, .finalizeFunc = firstLastFinalize,
.combineFunc = lastCombine, .combineFunc = lastCombine,
}, },
{ {

View File

@ -2227,7 +2227,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t firstlastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);

View File

@ -278,13 +278,13 @@ int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
} }
int32_t qType; int32_t qType;
if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) {
qType = FETCH_QUEUE; qType = FETCH_QUEUE;
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC ||*/
pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { /*pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) {*/
qType = MERGE_QUEUE; /*qType = MERGE_QUEUE;*/
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { /*} else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) {*/
qType = WRITE_QUEUE; /*qType = WRITE_QUEUE;*/
} else { } else {
ASSERT(0); ASSERT(0);
} }