From 9648fe4025a4e885d6b26856f3b06436e129b995 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Jan 2022 10:16:59 +0800 Subject: [PATCH 1/3] [td-11818] fix a bug in query topic. --- source/libs/executor/src/executorimpl.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 45d1f57e68..8e30c0d711 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5467,6 +5467,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp return NULL; } + // todo dynamic set the value of 4096 + pInfo->pRes = createOutputBuf_rv(pExprInfo, 4096); + int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprInfo); SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t)); for(int32_t i = 0; i < numOfOutput; ++i) { @@ -7431,7 +7434,7 @@ SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SeqTableTagScan"; -// pOperator->operatorType = OP_TagScan; + pOperator->operatorType = OP_TagScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; From 6d7fd79709197868a7ef461baea69b71175c8aba Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 26 Jan 2022 11:19:26 +0800 Subject: [PATCH 2/3] fix query error --- source/client/test/clientTests.cpp | 2 -- source/dnode/mnode/impl/inc/mndDef.h | 6 +++-- source/dnode/vnode/src/tq/tq.c | 34 ++++++++++++++++++++-------- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index b73079741c..1136a813d3 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -570,7 +570,6 @@ TEST(testCase, create_topic_Test) { //taos_close(pConn); //} -#if 0 TEST(testCase, tmq_subscribe_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -595,7 +594,6 @@ TEST(testCase, tmq_subscribe_Test) { //if (msg == NULL) break; } } -#endif TEST(testCase, tmq_consume_Test) { } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2a23618798..5ec9173fc8 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -626,8 +626,10 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic int32_t tlen = 0; tlen += taosEncodeString(buf, pConsumerTopic->name); tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch); - ASSERT(pConsumerTopic->pVgInfo); - int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); + int32_t sz = 0; + if (pConsumerTopic->pVgInfo != NULL) { + sz = taosArrayGetSize(pConsumerTopic->pVgInfo); + } tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4fb4b0eeae..4322f7be79 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -757,12 +757,12 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { pTopic->buffer.lastOffset = pReq->offset; } - // put output into rsp - SMqConsumeRsp rsp = { - .consumerId = consumerId, - .numOfTopics = 1 - }; } + // put output into rsp + SMqConsumeRsp rsp = { + .consumerId = consumerId, + .numOfTopics = 1 + }; return 0; } @@ -822,14 +822,29 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { pReadHandle->pMsg = pMsg; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter); pReadHandle->ver = ver; memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); } bool tqNextDataBlock(STqReadHandle* pHandle) { - while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock)) { - if (pHandle->tbUid == pHandle->pBlock->uid) return true; + while (1) { + if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { + return false; + } + if (pHandle->pBlock == NULL) return false; + + pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid); + if (pHandle->tbUid == pHandle->pBlock->uid){ + pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); + pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); + pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); + pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); + pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); + return true; + } } return false; } @@ -845,8 +860,9 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t sversion = pHandle->pBlock->sversion; - SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); - STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); + //TODO : change sversion + SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, 0, true); + STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, 0); SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); if (pArray == NULL) { return NULL; From 3520e4649fbbd34c725d55e1a035554930fe438f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 26 Jan 2022 11:56:10 +0800 Subject: [PATCH 3/3] fix get schema error --- source/client/test/clientTests.cpp | 2 ++ source/dnode/vnode/src/tq/tq.c | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 1136a813d3..b73079741c 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -570,6 +570,7 @@ TEST(testCase, create_topic_Test) { //taos_close(pConn); //} +#if 0 TEST(testCase, tmq_subscribe_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -594,6 +595,7 @@ TEST(testCase, tmq_subscribe_Test) { //if (msg == NULL) break; } } +#endif TEST(testCase, tmq_consume_Test) { } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4322f7be79..e953d59527 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -861,8 +861,17 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t sversion = pHandle->pBlock->sversion; //TODO : change sversion - SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, 0, true); STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, 0); + + tb_uid_t quid; + STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pMeta, pHandle->pBlock->uid); + if (pTbCfg->type == META_CHILD_TABLE) { + quid = pTbCfg->ctbCfg.suid; + } else { + quid = pHandle->pBlock->uid; + } + + SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, quid, 0, true); SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); if (pArray == NULL) { return NULL;