diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1ea9714bf9..8fd09afb90 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1949,6 +1949,7 @@ typedef struct { char* ast; char subStbName[TSDB_TABLE_FNAME_LEN]; }; + char* subStbFilterAst; } SCMCreateTopicReq; int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq); @@ -2758,6 +2759,7 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR tlen += taosEncodeString(buf, pReq->qmsg); } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { tlen += taosEncodeFixedI64(buf, pReq->suid); + tlen += taosEncodeString(buf, pReq->qmsg); } return tlen; } @@ -2773,6 +2775,7 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { buf = taosDecodeString(buf, &pReq->qmsg); } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { + buf = taosDecodeString(buf, &pReq->qmsg); buf = taosDecodeFixedI64(buf, &pReq->suid); } return (void*)buf; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index b7e6c42e3b..e3a75ecabc 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -82,6 +82,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id); +int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* pTagCond, void* pTagIndexCond, SArray **tableList); + /** * set the task Id, usually used by message queue process * @param tinfo diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7102e556cc..1eb73ab842 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3841,6 +3841,10 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo if (tEncodeI32(&encoder, strlen(pReq->sql)) < 0) return -1; if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; + if (TOPIC_SUB_TYPE__TABLE == pReq->subType) { + if (tEncodeI32(&encoder, strlen(pReq->subStbFilterAst)) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->subStbFilterAst) < 0) return -1; + } tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -3879,6 +3883,15 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1; } + if (TOPIC_SUB_TYPE__TABLE == pReq->subType) { + if (tDecodeI32(&decoder, &astLen) < 0) return -1; + if (astLen > 0) { + pReq->subStbFilterAst = taosMemoryCalloc(1, astLen + 1); + if (pReq->subStbFilterAst == NULL) return -1; + if (tDecodeCStrTo(&decoder, pReq->subStbFilterAst) < 0) return -1; + } + } + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -3889,6 +3902,8 @@ void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) { taosMemoryFreeClear(pReq->sql); if (TOPIC_SUB_TYPE__COLUMN == pReq->subType) { taosMemoryFreeClear(pReq->ast); + }else if(TOPIC_SUB_TYPE__TABLE == pReq->subType) { + taosMemoryFreeClear(pReq->subStbFilterAst); } } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 734f624be0..0248a195db 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -591,6 +591,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } + } else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){ + pVgEp->qmsg = taosStrdup(pTopic->ast); } else { pVgEp->qmsg = taosStrdup(""); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index f6da370916..7d71aae3f4 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -462,6 +462,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.stbUid = pStb->uid; mndReleaseStb(pMnode, pStb); + topicObj.ast = taosStrdup(pCreate->ast); + topicObj.astLen = strlen(pCreate->ast) + 1; } /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/ /*topicObj.ast = NULL;*/ diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index e431ca4a01..7895690f94 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -72,6 +72,8 @@ typedef struct { typedef struct { int64_t suid; + char* qmsg; // SubPlanToString + SNode* node; } STqExecTb; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 311d637be8..be3d6bc614 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -70,6 +70,8 @@ static void destroyTqHandle(void* data) { } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { walCloseReader(pData->pWalReader); tqCloseReader(pData->execHandle.pTqReader); + taosMemoryFreeClear(pData->execHandle.execTb.qmsg); + nodesDestroyNode(pData->execHandle.execTb.node); } if(pData->msg != NULL) { rpcFreeCont(pData->msg->pCont); @@ -470,7 +472,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle tqHandle = {0}; pHandle = &tqHandle; - uint64_t oldConsumerId = pHandle->consumerId; memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); pHandle->consumerId = req.newConsumerId; pHandle->epoch = -1; @@ -514,14 +515,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); pHandle->execHandle.execTb.suid = req.suid; + pHandle->execHandle.execTb.qmsg = req.qmsg; + req.qmsg = NULL; - SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); - vnodeGetCtbIdList(pVnode, req.suid, tbUidList); - tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid); - for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { - int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); - tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); + if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { + tqError("nodesStringToNode error in sub stable, since %s", terrstr()); + return -1; } + + SArray* tbUidList = NULL; + ret = qGetTableList(req.suid, pVnode->pMeta, pVnode, pHandle->execHandle.execTb.node, NULL, &tbUidList); + if(ret != TDB_CODE_SUCCESS) { + tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, req.subKey, pHandle->consumerId); + taosArrayDestroy(tbUidList); + goto end; + } + tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, req.suid); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); @@ -532,8 +541,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey, - pHandle->consumerId, oldConsumerId); + tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, + pHandle->consumerId); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); goto end; } else { diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index f3ecaa08f6..ee7af5b2bf 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -37,6 +37,7 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1; + if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1; } tEndEncode(pEncoder); return pEncoder->pos; @@ -64,6 +65,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { } } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1; } tEndDecode(pDecoder); return 0; @@ -336,13 +338,19 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); - vnodeGetCtbIdList(pTq->pVnode, handle.execHandle.execTb.suid, tbUidList); - tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid); - for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { - int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); - tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); + if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) { + tqError("nodesStringToNode error in sub stable, since %s", terrstr()); + return -1; } + + SArray* tbUidList = NULL; + int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, handle.execHandle.execTb.node, NULL, &tbUidList); + if(ret != TDB_CODE_SUCCESS) { + tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId); + taosArrayDestroy(tbUidList); + goto end; + } + tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid); handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 1fbdb25528..38f5307384 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1040,34 +1040,15 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (isAdd) { - SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); - SMetaReader mr = {0}; - metaReaderInit(&mr, pTq->pVnode->pMeta, 0); - for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) { - uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i); - - int32_t code = metaGetTableEntryByUidCache(&mr, *id); - if (code != TSDB_CODE_SUCCESS) { - tqError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); - continue; - } - - tDecoderClear(&mr.coder); - if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pTqHandle->execHandle.execTb.suid) { - tqDebug("table uid %" PRId64 " does not add to tq handle", *id); - continue; - } - - tqDebug("table uid %" PRId64 " add to tq handle", *id); - taosArrayPush(qa, id); + SArray* list = NULL; + int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, pTqHandle->execHandle.execTb.node, NULL, &list); + if(ret != TDB_CODE_SUCCESS) { + tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); + taosArrayDestroy(list); + return ret; } - - metaReaderClear(&mr); - if (taosArrayGetSize(qa) > 0) { - tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, qa); - } - - taosArrayDestroy(qa); + tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list); + taosArrayDestroy(list); } else { tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList); } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c51dc39b5b..1fb35ae271 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -935,6 +935,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN SArray* pBlockList = NULL; SSDataBlock* pResBlock = NULL; SScalarParam output = {0}; + SArray* pUidTagList = NULL; tagFilterAssist ctx = {0}; ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); @@ -954,7 +955,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; // int64_t stt = taosGetTimestampUs(); - SArray* pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo)); + pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo)); copyExistedUids(pUidTagList, pUidList); FilterCondType condType = checkTagCond(pTagCond); @@ -1121,6 +1122,19 @@ _end: return code; } +int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* pTagCond, void* pTagIndexCond, SArray **tableList){ + SScanPhysiNode node = {0}; + node.suid = suid; + node.uid = suid; + node.tableType = TSDB_SUPER_TABLE; + STableListInfo* pTableListInfo = tableListCreate(); + int code = getTableList(metaHandle, pVnode, &node, pTagCond, pTagIndexCond, pTableListInfo, "qGetTableList"); + *tableList = pTableListInfo->pTableList; + pTableListInfo->pTableList = NULL; + tableListDestroy(pTableListInfo); + return code; +} + size_t getTableTagsBufLen(const SNodeList* pGroups) { size_t keyLen = 0; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f4c86d4849..f90fe59908 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5764,6 +5764,7 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name); tNameGetFullDbName(&name, pReq->subDbName); tNameExtractFullName(&name, pReq->subStbName); + code = nodesNodeToString(pStmt->pQuery, false, &pReq->subStbFilterAst, NULL); } else if ('\0' != pStmt->subDbName[0]) { pReq->subType = TOPIC_SUB_TYPE__DB; tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->subDbName, strlen(pStmt->subDbName));