diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index c3d2010351..ff9b573464 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -67,9 +67,10 @@ typedef enum { * Create the exec task for stream mode * @param pMsg * @param SReadHandle + * @param vgId * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId); /** * Create the exec task for queue mode @@ -77,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); * @param SReadHandle * @return */ -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); /** diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index d4ca81a6a9..3d1b356f8c 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -75,9 +75,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; - pTask->pMsgCb = &pSnode->msgCb; - pTask->startVer = ver; pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); @@ -90,11 +88,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), .pStateBackend = pTask->pState, }; - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); + + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0); ASSERT(pTask->exec.executor); streamSetupTrigger(pTask); - return 0; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 99e171dde1..3ed1b083e4 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -282,7 +282,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat .initTqReader = 1, .pStateBackend = pStreamState, }; - pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); + pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode)); if (!pRSmaInfo->taskInfo[idx]) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; return TSDB_CODE_FAILED; @@ -864,7 +864,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t TSDB_CHECK_CODE(code, lino, _exit); } - dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); + dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode)); if (!dstTaskInfo) { code = TSDB_CODE_RSMA_QTASKINFO_CREATE; TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cac5d7a30c..6a707bf4b0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -906,7 +906,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); - tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, + SVnode* pVnode = pTq->pVnode; + int32_t vgId = TD_VID(pVnode); + + tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); @@ -935,7 +938,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->fetchMeta = req.withMeta; // TODO version should be assigned and refed during preprocess - SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); + SWalRef* pRef = walRefCommittedVer(pVnode->pWal); if (pRef == NULL) { taosMemoryFree(req.qmsg); return -1; @@ -945,8 +948,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->pRef = pRef; SReadHandle handle = { - .meta = pTq->pVnode->pMeta, - .vnode = pTq->pVnode, + .meta = pVnode->pMeta, + .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver, @@ -959,38 +962,38 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg req.qmsg = NULL; pHandle->execHandle.task = - qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL); + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.task, &scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { - pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); + pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); + pHandle->execHandle.pExecReader = tqOpenReader(pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { - pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); pHandle->execHandle.execTb.suid = req.suid; SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); - vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); - tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid); + vnodeGetCtbIdList(pVnode, req.suid, tbUidList); + tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid); for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); - tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); + tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); } - pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); + pHandle->execHandle.pExecReader = tqOpenReader(pVnode); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); @@ -1043,6 +1046,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } #endif + int32_t vgId = TD_VID(pTq->pVnode); pTask->refCnt = 1; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; @@ -1055,9 +1059,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; - pTask->pMsgCb = &pTq->pVnode->msgCb; - pTask->startVer = ver; // expand executor @@ -1077,7 +1079,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .initTqReader = 1, .pStateBackend = pTask->pState, }; - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); if (pTask->exec.executor == NULL) { return -1; } @@ -1092,7 +1095,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), .pStateBackend = pTask->pState, }; - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); + + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId); if (pTask->exec.executor == NULL) { return -1; } @@ -1122,9 +1126,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } streamSetupTrigger(pTask); - - tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", TD_VID(pTq->pVnode), pTask->taskId, - pTask->selfChildId, pTask->taskLevel); + tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel); return 0; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index ce0aa144f9..548c224e9d 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -274,6 +274,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { return -1; } + int32_t vgId = TD_VID(pTq->pVnode); void* pKey = NULL; int kLen = 0; void* pVal = NULL; @@ -304,7 +305,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { handle.execHandle.task = - qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL); + qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, NULL); if (handle.execHandle.task == NULL) { tqError("cannot create exec task for %s", handle.subKey); return -1; @@ -324,7 +325,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL); } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); @@ -341,7 +342,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL); } tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2b608f9b09..0617c83113 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -115,6 +115,7 @@ typedef struct STaskIdInfo { uint64_t subplanId; uint64_t templateId; char* str; + int32_t vgId; } STaskIdInfo; enum { @@ -834,8 +835,8 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); void doDestroyTask(SExecTaskInfo* pTaskInfo); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, - char* sql, EOPTR_EXEC_MODEL model); +int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, + int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a05399615e..c01a297f80 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -218,7 +218,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, return code; } -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) { if (msg == NULL) { // create raw scan SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); @@ -231,7 +231,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n pTaskInfo->cost.created = taosGetTimestampUs(); pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE; - pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo); + pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo); if (NULL == pTaskInfo->pRoot) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pTaskInfo); @@ -248,7 +248,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE); + code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE); if (code != TSDB_CODE_SUCCESS) { nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); @@ -274,13 +274,11 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n return pTaskInfo; } -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) { if (msg == NULL) { return NULL; } - /*qDebugL("stream task string %s", (const char*)msg);*/ - struct SSubplan* pPlan = NULL; int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { @@ -289,7 +287,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); + code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); @@ -468,11 +466,11 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, taosThreadOnce(&initPoolOnce, initRefPool); - qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); + qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId); - int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model); + int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model); if (code != TSDB_CODE_SUCCESS) { - qError("failed to createExecTaskInfoImpl, code: %s", tstrerror(code)); + qError("failed to createExecTaskInfo, code: %s", tstrerror(code)); goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3691a9255b..b32041847c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1975,7 +1975,7 @@ static char* buildTaskId(uint64_t taskId, uint64_t queryId) { return p; } -static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) { +static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) { SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); if (pTaskInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1990,6 +1990,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); + pTaskInfo->id.vgId = vgId; pTaskInfo->id.queryId = queryId; pTaskInfo->id.str = buildTaskId(taskId, queryId); return pTaskInfo; @@ -2178,7 +2179,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo #ifndef NDEBUG int32_t sz = tableListGetSize(pTableListInfo); - qDebug("create stream task, total:%d", sz); + qDebug("vgId:%d create stream task, total qualified tables:%d, %s", pTaskInfo->id.vgId, sz, idstr); for (int32_t i = 0; i < sz; i++) { STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); @@ -2439,17 +2440,14 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT return TSDB_CODE_SUCCESS; } -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, - char* sql, EOPTR_EXEC_MODEL model) { - uint64_t queryId = pPlan->id.queryId; - - *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName); +int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, + int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) { + *pTaskInfo = doCreateExecTaskInfo(pPlan->id.queryId, taskId, vgId, model, pPlan->dbFName); if (*pTaskInfo == NULL) { goto _complete; } if (pHandle) { - /*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/ if (pHandle->pStateBackend) { (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend; }