diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6fb9c75e6b..3b38362647 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -221,7 +221,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SYNC_MSG) - TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timeout", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 05aea3f68c..fa5e20ab6f 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); +int32_t mndScheduleStream1(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen, double filesFactor); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 27d13d66b6..d8a61461dd 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -403,6 +403,10 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { char logBuf[512] = {0}; char *syncNodeStr = sync2SimpleStr(pMgmt->sync); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + static int64_t mndTick = 0; + if (++mndTick % 10 == 1) { + mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); + } syncRpcMsgLog2(logBuf, pMsg); taosMemoryFree(syncNodeStr); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index d07355dfa9..deb7382183 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -127,6 +127,61 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet return 0; } +int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) { + pTask->dispatchType = TASK_DISPATCH__NONE; + // sink + if (pStream->createdBy == STREAM_CREATED_BY__SMA) { + pTask->sinkType = TASK_SINK__SMA; + pTask->smaSink.smaId = pStream->smaId; + } else { + pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.stbUid = pStream->targetStbUid; + memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); + pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); + } + return 0; +} + +int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) { + pTask->sinkType = TASK_SINK__NONE; + if (pStream->fixedSinkVgId == 0) { + pTask->dispatchType = TASK_DISPATCH__SHUFFLE; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); + ASSERT(pDb); + + if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + sdbRelease(pMnode->pSdb, pDb); + + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(pVgs); + SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); + int32_t sinkLvSize = taosArrayGetSize(sinkLv); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + for (int32_t j = 0; j < sinkLvSize; j++) { + SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); + if (pLastLevelTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pLastLevelTask->taskId; + break; + } + } + } + } else { + pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + SArray* pArray = taosArrayGetP(pStream->tasks, 0); + // one sink only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; + } + } + return 0; +} + int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t msgLen; pTask->nodeId = pVgroup->vgId; @@ -139,6 +194,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } + ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE); mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId); return 0; } @@ -182,7 +238,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } -int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; SArray* tasks = taosArrayGetP(pStream->tasks, 0); @@ -234,7 +290,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p return 0; } -int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ASSERT(pStream->fixedSinkVgId != 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid); @@ -279,6 +335,146 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr return 0; } +int32_t mndScheduleStream1(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { + SSdb* pSdb = pMnode->pSdb; + SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); + if (pPlan == NULL) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + ASSERT(pStream->vgNum == 0); + + int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); + ASSERT(totLevel <= 2); + pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); + + bool hasExtraSink = false; + bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; + if (totLevel == 2 || externalTargetDB) { + SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskOneLevel); + // add extra sink + hasExtraSink = true; + if (pStream->fixedSinkVgId == 0) { + mndAddShuffleSinkTasksToStream(pMnode, pTrans, pStream); + } else { + mndAddFixedSinkTaskToStream(pMnode, pTrans, pStream); + } + } + if (totLevel > 1) { + SStreamTask* pFinalTask; + // inner plan + { + SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskInnerLevel); + + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0); + SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); + ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); + + pFinalTask = tNewSStreamTask(pStream->uid); + mndAddTaskToTaskSet(taskInnerLevel, pFinalTask); + // input + pFinalTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; + + // dispatch + mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pFinalTask); + + // exec + pFinalTask->execType = TASK_EXEC__PIPE; + SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid); + if (mndAssignTaskToVg(pMnode, pTrans, pFinalTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } + } + + // source plan + SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskSourceLevel); + + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 1); + SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); + ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); + + void* pIter = NULL; + while (1) { + SVgObj* pVgroup; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pStream->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + mndAddTaskToTaskSet(taskSourceLevel, pTask); + + // input + pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; + + // add fixed vg dispatch + pTask->sinkType = TASK_SINK__NONE; + pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; + pTask->dispatchType = TASK_DISPATCH__FIXED; + + pTask->fixedEpDispatcher.taskId = pFinalTask->taskId; + pTask->fixedEpDispatcher.nodeId = pFinalTask->nodeId; + pTask->fixedEpDispatcher.epSet = pFinalTask->epSet; + + // exec + pTask->execType = TASK_EXEC__PIPE; + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } + } + } + + if (totLevel == 1) { + SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskOneLevel); + + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0); + ASSERT(LIST_LENGTH(inner->pNodeList) == 1); + SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); + ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); + + void* pIter = NULL; + while (1) { + SVgObj* pVgroup; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pStream->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + mndAddTaskToTaskSet(taskOneLevel, pTask); + + // input + pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK; + + // sink or dispatch + if (hasExtraSink) { + mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pTask); + } else { + mndAddSinkToTask(pMnode, pTrans, pStream, pTask); + } + + // exec + pTask->execType = TASK_EXEC__PIPE; + if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } + } + } + return 0; +} + int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); @@ -300,14 +496,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // add extra sink hasExtraSink = true; if (pStream->fixedSinkVgId == 0) { - mndAddShuffledSinkToStream(pMnode, pTrans, pStream); + mndAddShuffleSinkTasksToStream(pMnode, pTrans, pStream); } else { - mndAddFixedSinkToStream(pMnode, pTrans, pStream); + mndAddFixedSinkTaskToStream(pMnode, pTrans, pStream); } } for (int32_t level = 0; level < totLevel; level++) { - SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskOneLevel); SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level); ASSERT(LIST_LENGTH(inner->pNodeList) == 1); @@ -357,18 +554,17 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } // dispatch part - if (level == 0) { + if (level == 0 && !hasExtraSink) { pTask->dispatchType = TASK_DISPATCH__NONE; } else { // add fixed ep dispatcher int32_t lastLevel = level - 1; - ASSERT(lastLevel == 0); if (hasExtraSink) lastLevel++; + ASSERT(lastLevel == 0); SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel); // one merge only ASSERT(taosArrayGetSize(pArray) == 1); SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - /*pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;*/ pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; pTask->dispatchType = TASK_DISPATCH__FIXED; @@ -465,8 +661,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } sdbRelease(pSdb, pVgroup); } - - taosArrayPush(pStream->tasks, &taskOneLevel); } #if 0 diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7abe9e3c0d..4a2c0a59a1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -269,7 +269,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } - if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { + if (mndScheduleStream1(pMnode, pTrans, pStream) < 0) { mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index da3c8e7c93..51ed739693 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -305,6 +305,10 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { char logBuf[512] = {0}; char *syncNodeStr = sync2SimpleStr(pVnode->sync); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + static int64_t vndTick = 0; + if (++vndTick % 10 == 1) { + vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); + } syncRpcMsgLog2(logBuf, pMsg); taosMemoryFree(syncNodeStr); @@ -902,4 +906,4 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo // 2. adjust hash range / compact / remove wals / rename vgroups // 3. reload sync return 0; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 26dbf6c47a..0a19e16d5c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1066,7 +1066,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); snprintf(s, len, - "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " + "syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " "electTimerLogicClock:%lu, " "electTimerLogicClockUser:%lu, " "electTimerMS:%d, replicaNum:%d", @@ -1873,4 +1873,4 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) } } return pSender; -} \ No newline at end of file +} diff --git a/tools/taos-tools b/tools/taos-tools index 717f5aaa5f..932da0f4ca 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 717f5aaa5f0a1b4d92bb2ae68858fec554fb5eda +Subproject commit 932da0f4cac013c2eded824d1d4d01cfa6168fa3