diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6494e06c45..2ac652342a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -801,7 +801,10 @@ typedef struct SVgroupInfo { uint32_t hashBegin; uint32_t hashEnd; SEpSet epSet; - int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT + union { + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT + int32_t taskId; // used in stream + }; } SVgroupInfo; typedef struct { diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 1682c6043d..fed539ce7d 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -281,6 +281,7 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 305912a72a..566bd1d282 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -119,6 +119,53 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } +int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { + SSdb* pSdb = pMnode->pSdb; + void* pIter = NULL; + SArray* tasks = taosArrayGetP(pStream->tasks, 0); + + ASSERT(taosArrayGetSize(pStream->tasks) == 1); + + while (1) { + SVgObj* pVgroup; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pStream->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + taosArrayPush(tasks, &pTask); + + pTask->nodeId = pVgroup->vgId; + pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); + + // source + pTask->sourceType = TASK_SOURCE__MERGE; + + // exec + pTask->execType = TASK_EXEC__NONE; + + // sink + if (smaId != -1) { + pTask->sinkType = TASK_SINK__SMA; + pTask->smaSink.smaId = smaId; + } else { + pTask->sinkType = TASK_SINK__TABLE; + } + + // dispatch + pTask->dispatchType = TASK_DISPATCH__NONE; + + mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); + } + return 0; +} + int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { SSdb* pSdb = pMnode->pSdb; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); @@ -132,6 +179,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i ASSERT(totLevel <= 2); pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); + bool hasExtraSink = false; + if (totLevel == 2) { + SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); + taosArrayPush(pStream->tasks, &taskOneLevel); + // add extra sink + hasExtraSink = true; + mndAddSinkToStream(pMnode, pTrans, pStream, smaId); + } + for (int32_t level = 0; level < totLevel; level++) { SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level); @@ -164,9 +220,13 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i // only for inplace pTask->sinkType = TASK_SINK__SHOW; pTask->showSink.reserved = 0; - if (smaId != -1) { - pTask->sinkType = TASK_SINK__SMA; - pTask->smaSink.smaId = smaId; + if (!hasExtraSink) { + if (smaId != -1) { + pTask->sinkType = TASK_SINK__SMA; + pTask->smaSink.smaId = smaId; + } else { + pTask->sinkType = TASK_SINK__TABLE; + } } } else { pTask->sinkType = TASK_SINK__NONE; @@ -175,17 +235,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i // dispatch part if (level == 0) { pTask->dispatchType = TASK_DISPATCH__NONE; - // if inplace sink, no dispatcher - // if fixed ep, add fixed ep dispatcher - // if shuffle, add shuffle dispatcher } else { // add fixed ep dispatcher int32_t lastLevel = level - 1; ASSERT(lastLevel == 0); + if (hasExtraSink) lastLevel++; SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel); // one merge only ASSERT(taosArrayGetSize(pArray) == 1); - SStreamTask* lastLevelTask = taosArrayGetP(pArray, lastLevel); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC; pTask->dispatchType = TASK_DISPATCH__FIXED; @@ -222,18 +280,43 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i /*pTask->sinkType = TASK_SINK__NONE;*/ // dispatch part - pTask->dispatchType = TASK_DISPATCH__NONE; -#if 0 - pTask->dispatchType = TASK_DISPATCH__SHUFFLE; - pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; - SDbObj* pDb = mndAcquireDb(pMnode, pStream->db); - ASSERT(pDb); - if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + ASSERT(hasExtraSink); + /*pTask->dispatchType = TASK_DISPATCH__NONE;*/ +#if 1 + + if (hasExtraSink) { + // add dispatcher + pTask->dispatchType = TASK_DISPATCH__SHUFFLE; + + pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + SDbObj* pDb = mndAcquireDb(pMnode, pStream->db); + ASSERT(pDb); + if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + sdbRelease(pSdb, pDb); + qDestroyQueryPlan(pPlan); + return -1; + } sdbRelease(pSdb, pDb); - qDestroyQueryPlan(pPlan); - return -1; + + // put taskId to useDbRsp + // TODO: optimize + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(pVgs); + SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); + int32_t sinkLvSize = taosArrayGetSize(sinkLv); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + for (int32_t j = 0; j < sinkLvSize; j++) { + SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); + /*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/ + if (pLastLevelTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pLastLevelTask->taskId; + /*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/ + break; + } + } + } } - sdbRelease(pSdb, pDb); #endif // exec part diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index efc7ac80e9..159ec5b3e8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -508,7 +508,9 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) { SStreamTaskExecReq req; tDecodeSStreamTaskExecReq(msg, &req); - int32_t taskId = req.taskId; + int32_t taskId = req.taskId; + ASSERT(taskId); + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); ASSERT(pTask); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 220f83bb3e..70651840f7 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -31,12 +31,13 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { ((SMsgHead*)buf)->vgId = 0; req.taskId = pTask->inplaceDispatcher.taskId; + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId); *ppEpSet = &pTask->fixedEpDispatcher.epSet; req.taskId = pTask->fixedEpDispatcher.taskId; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - int32_t nodeId = 0; // TODO fix tbname issue char ctbName[TSDB_TABLE_FNAME_LEN + 22]; // all groupId must be the same in an array @@ -52,10 +53,12 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* // TODO: optimize search process SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t sz = taosArrayGetSize(vgInfo); + int32_t nodeId = 0; for (int32_t i = 0; i < sz; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { nodeId = pVgInfo->vgId; + req.taskId = pVgInfo->taskId; *ppEpSet = &pVgInfo->epSet; break; } @@ -71,6 +74,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg->contLen = tlen; pMsg->code = 0; pMsg->msgType = pTask->dispatchMsgType; + /*pMsg->noResp = 1;*/ return 0; } @@ -80,7 +84,7 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb while (1) { pIter = taosHashIterate(data, pIter); if (pIter == NULL) return 0; - SArray* pData = (SArray*)pIter; + SArray* pData = *(SArray**)pIter; SRpcMsg dispatchMsg = {0}; SEpSet* pEpSet; if (streamBuildDispatchMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) { @@ -98,7 +102,6 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0; // exec - // TODO: for shuffle dispatcher, merge data by groupId if (pTask->execType != TASK_EXEC__NONE) { ASSERT(workId < pTask->exec.numOfRunners); void* exec = pTask->exec.runners[workId].executor;