From b46098793d13e867ae9ff559c107dbe01fce39d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Feb 2023 12:16:45 +0800 Subject: [PATCH 1/5] refactor: do some internal refactor and add some logs for tmq. --- include/os/osMemory.h | 3 + source/dnode/mnode/impl/src/mndConsumer.c | 73 ++++++++++------------ source/dnode/mnode/impl/src/mndSubscribe.c | 8 --- source/dnode/vnode/src/tq/tq.c | 6 +- source/dnode/vnode/src/tq/tqExec.c | 4 +- source/dnode/vnode/src/tq/tqMeta.c | 2 +- source/dnode/vnode/src/tq/tqPush.c | 22 ++++--- source/dnode/vnode/src/tq/tqRead.c | 45 +++++++------ source/dnode/vnode/src/vnd/vnodeSvr.c | 10 +-- source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/executor.c | 6 +- source/libs/executor/src/scanoperator.c | 3 +- 12 files changed, 94 insertions(+), 91 deletions(-) diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 3f57c72933..cd2d4a3494 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -29,7 +29,10 @@ extern "C" { #define calloc CALLOC_FUNC_TAOS_FORBID #define realloc REALLOC_FUNC_TAOS_FORBID #define free FREE_FUNC_TAOS_FORBID +#ifdef strdup +#undef strdup #define strdup STRDUP_FUNC_TAOS_FORBID +#endif #endif // ifndef ALLOW_FORBID_FUNC #endif // if !defined(WINDOWS) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6621b6a3f9..768fe45e3e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -15,17 +15,11 @@ #define _DEFAULT_SOURCE #include "mndConsumer.h" -#include "mndDb.h" -#include "mndDnode.h" -#include "mndMnode.h" #include "mndPrivilege.h" #include "mndShow.h" -#include "mndStb.h" #include "mndSubscribe.h" #include "mndTopic.h" #include "mndTrans.h" -#include "mndUser.h" -#include "mndVgroup.h" #include "tcompare.h" #include "tname.h" @@ -209,6 +203,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { taosMemoryFree(pConsumerNew); mndTransDrop(pTrans); return 0; + FAIL: tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); @@ -580,6 +575,10 @@ static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const ch return 0; } +static void* topicNameDup(void* p){ + return taosStrdup((char*) p); +} + int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; char *msgStr = pMsg->pCont; @@ -616,15 +615,15 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); - pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; - taosArrayDestroy(pConsumerNew->rebNewTopics); - pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance. - subscribe.topicNames = NULL; - for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i)); - taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); - } + // set the update type + pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); + + // all subscribed topics should re-balance. + taosArrayDestroy(pConsumerNew->rebNewTopics); + pConsumerNew->rebNewTopics = pTopicList; + subscribe.topicNames = NULL; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over; if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; @@ -646,17 +645,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { goto _over; } + // set the update type pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); - for (int32_t i = 0; i < newTopicNum; i++) { - char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i)); - taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); - } - - int32_t oldTopicNum = 0; - if (pExistedConsumer->currentTopics) { - oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); - } + int32_t oldTopicNum = (pExistedConsumer->currentTopics)? taosArrayGetSize(pExistedConsumer->currentTopics):0; int32_t i = 0, j = 0; while (i < oldTopicNum || j < newTopicNum) { @@ -692,11 +685,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } } - if (pExistedConsumer && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && - taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { - /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/ - /*pConsumerNew->updateType = */ - /*}*/ + // no topics need to be rebalanced + if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { goto _over; } @@ -718,8 +708,9 @@ _over: tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } + // TODO: replace with destroy subscribe msg - if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); + taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); return code; } @@ -750,12 +741,12 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { CM_ENCODE_OVER: taosMemoryFreeClear(buf); if (terrno != 0) { - mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); + mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); sdbFreeRaw(pRaw); return NULL; } - mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer); + mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer); return pRaw; } @@ -823,8 +814,8 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { } static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mDebug("consumer:0x%" PRIx64 " perform delete action, status:%s", pConsumer->consumerId, - mndConsumerStatusName(pConsumer->status)); + mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, + pConsumer->status, mndConsumerStatusName(pConsumer->status)); tDeleteSMqConsumerObj(pConsumer); return 0; } @@ -1075,22 +1066,23 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * // consumer group char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN); - varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + STR_TO_VARSTR(cgroup, pConsumer->cgroup); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); // client id char clientId[256 + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(clientId), pConsumer->clientId, 256); - varDataSetLen(clientId, strlen(varDataVal(clientId))); + STR_TO_VARSTR(clientId, pConsumer->clientId); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false); // status char status[20 + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20); - varDataSetLen(status, strlen(varDataVal(status))); + const char* pStatusName = mndConsumerStatusName(pConsumer->status); + STR_TO_VARSTR(status, pStatusName); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)status, false); @@ -1123,8 +1115,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * numOfRows++; } + taosRUnLockLatch(&pConsumer->lock); sdbRelease(pSdb, pConsumer); + + pBlock->info.rows = numOfRows; } pShow->numOfRows += numOfRows; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 8d8fd1b9b5..8544994c3e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -16,15 +16,10 @@ #define _DEFAULT_SOURCE #include "mndSubscribe.h" #include "mndConsumer.h" -#include "mndDb.h" -#include "mndDnode.h" -#include "mndMnode.h" #include "mndScheduler.h" #include "mndShow.h" -#include "mndStb.h" #include "mndTopic.h" #include "mndTrans.h" -#include "mndUser.h" #include "mndVgroup.h" #include "tcompare.h" #include "tname.h" @@ -1041,7 +1036,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock } // do not show for cleared subscription -#if 1 int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); @@ -1087,8 +1081,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock numOfRows++; } -#endif - pBlock->info.rows = numOfRows; taosRUnLockLatch(&pSub->lock); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c9bbb30380..c465617975 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -217,7 +217,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { char buf2[80] = {0}; tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset); tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s", + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; @@ -604,7 +604,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index e92da7668a..5638228641 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -113,9 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs return -1; } - ASSERT(pRsp->withTbName == false); - ASSERT(pRsp->withSchema == false); - + ASSERT(!(pRsp->withTbName || pRsp->withSchema)); return 0; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 12fd15c63a..a85e6e0a70 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -197,7 +197,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { return -1; } - tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey, + tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode)); void* buf = taosMemoryCalloc(1, vlen); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index e747262277..5a25d7e894 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -193,7 +193,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); taosWUnLockLatch(&pHandle->pushHandle.lock); - tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64, + tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64, TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); @@ -210,25 +210,30 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); int32_t len = msgLen - sizeof(SSubmitReq2Msg); - tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, - TMSG_INFO(msgType), msg, pReq, len); - if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost taosWLockLatch(&pTq->pushLock); - if (taosHashGetSize(pTq->pPushMgr) != 0) { - tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr)); + int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); + if (numOfRegisteredPush > 0) { + tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", + pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); + SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); + void* data = taosMemoryMalloc(len); if (data == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to copy data for stream since out of memory"); taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); taosArrayDestroy(cachedKeyLens); + + // unlock + taosWUnLockLatch(&pTq->pushLock); return -1; } + memcpy(data, pReq, len); void* pIter = NULL; @@ -262,7 +267,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) }; qStreamSetScanMemData(task, submit); - // exec + // here start to scan submit block to extract the subscribed data while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -278,7 +283,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) pRsp->blockNum++; } - tqDebug("vgId:%d, tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey, + tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", pTq->pVnode->config.vgId, pPushEntry->subKey, pRsp->blockNum); if (pRsp->blockNum > 0) { // set offset @@ -295,6 +300,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqPushDataRsp(pTq, pPushEntry); } } + // delete entry for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { void* key = taosArrayGetP(cachedKeys, i); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 2c8f20d8cd..4bf6320c32 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -292,9 +292,12 @@ void tqCloseReader(STqReader* pReader) { int32_t tqSeekVer(STqReader* pReader, int64_t ver) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) { + tqError("tmq poll: wal reader failed to seek to ver:%"PRId64, ver); return -1; + } else { + tqDebug("tmq poll: wal reader seek to ver:%"PRId64, ver); + return 0; } - return 0; } int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { @@ -302,28 +305,33 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (1) { if (!fromProcessedMsg) { - if (walNextValidMsg(pReader->pWalReader) < 0) { - pReader->ver = - pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped); + SWalReader* pWalReader = pReader->pWalReader; + + if (walNextValidMsg(pWalReader) < 0) { + pReader->ver = pWalReader->curVersion - (pWalReader->curInvalid | pWalReader->curStopped); ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; ret->fetchType = FETCH_TYPE__NONE; tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); return -1; } - void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - int64_t ver = pReader->pWalReader->pHead->head.version; + + void* body = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); + int64_t ver = pWalReader->pHead->head.version; + + tqDebug("tmq poll: extract submit msg from wal, version:%"PRId64" len:%d", ver, bodyLen); + #if 0 - if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { + if (pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { // TODO do filter ret->fetchType = FETCH_TYPE__META; - ret->meta = pReader->pWalReader->pHead->head.body; + ret->meta = pWalReader->pHead->head.body; return 0; } else { #endif tqReaderSetSubmitReq2(pReader, body, bodyLen, ver); - /*tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);*/ + /*tqReaderSetDataMsg(pReader, body, pWalReader->pHead->head.version);*/ #if 0 } #endif @@ -358,7 +366,7 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v // if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; // while (true) { // if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1; -// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen, +// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen, // pReader->msgIter.len, pReader->msgIter.uid); // if (pReader->pBlock == NULL) break; // } @@ -371,10 +379,8 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v #endif int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { - ASSERT(pReader->msg2.msgStr == NULL); - ASSERT(msgStr); - ASSERT(msgLen); - ASSERT(ver >= 0); + ASSERT(pReader->msg2.msgStr == NULL && msgStr && msgLen && (ver >= 0)); + pReader->msg2.msgStr = msgStr; pReader->msg2.msgLen = msgLen; pReader->msg2.ver = ver; @@ -421,7 +427,10 @@ bool tqNextDataBlock(STqReader* pReader) { #endif bool tqNextDataBlock2(STqReader* pReader) { - if (pReader->msg2.msgStr == NULL) return false; + if (pReader->msg2.msgStr == NULL) { + return false; + } + ASSERT(pReader->setMsg == 1); tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, @@ -528,7 +537,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) if (pReader->pSchema == NULL) { tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", - pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion); + pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion); pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; @@ -538,7 +547,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", - pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer); + pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer); pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8651478afa..a3438b611e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1264,8 +1264,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq goto _exit; } - for (int32_t i = 1; i < nColData; i++) { - if (aColData[i].nVal != aColData[0].nVal) { + for (int32_t j = 1; j < nColData; j++) { + if (aColData[j].nVal != aColData[0].nVal) { code = TSDB_CODE_INVALID_MSG; goto _exit; } @@ -1299,8 +1299,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1); // create table - if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == - 0) { // create table success + if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) { + // create table success if (newTbUids == NULL && (newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) { @@ -1330,7 +1330,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq pSubmitRsp->affectedRows += affectedRows; } - // update table uid list + // update the affected table uid list if (taosArrayGetSize(newTbUids) > 0) { vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1712cba0f5..5df3b14a5b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -133,8 +133,7 @@ typedef struct { int64_t snapshotVer; // const SSubmitReq* pReq; - SPackedData submit; - + SPackedData submit; SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; int8_t recoverStep; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 347ac369d8..b3d3e8aab7 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1035,8 +1035,9 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); - ASSERT(pTaskInfo->streamInfo.submit.msgStr == NULL); + ASSERT((pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE )&& (pTaskInfo->streamInfo.submit.msgStr == NULL)); + qDebug("set the submit block for future scan"); + pTaskInfo->streamInfo.submit = submit; return 0; } @@ -1050,6 +1051,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { return 0; } + if (subType == TOPIC_SUB_TYPE__COLUMN) { uint16_t type = pOperator->operatorType; pOperator->status = OP_OPENED; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a238b84993..e1594b13a8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1562,7 +1562,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; - qDebug("queue scan called"); + qDebug("start to exec queue scan"); if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pInfo->tqReader->msg2.msgStr == NULL) { @@ -1587,7 +1587,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SSDataBlock block = {0}; int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); - if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; } From a28156e832d4298a15806fd7d6019ad0076e42a5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Feb 2023 12:35:50 +0800 Subject: [PATCH 2/5] refactor: do some internal refactor and add some logs for tmq. --- source/libs/executor/src/executor.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b3d3e8aab7..9611cdab67 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1048,15 +1048,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.returned = 0; + if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) { return 0; } if (subType == TOPIC_SUB_TYPE__COLUMN) { - uint16_t type = pOperator->operatorType; pOperator->status = OP_OPENED; + // TODO add more check - if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { ASSERT(pOperator->numOfDownstream == 1); pOperator = pOperator->pDownstream[0]; } @@ -1070,7 +1071,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ + // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one + // those data are from the snapshot in tsdb, besides the data in the wal file. int64_t uid = pOffset->uid; int64_t ts = pOffset->ts; @@ -1129,7 +1131,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts, pTableScanInfo->currentTable, numOfTables); - /*}*/ } else { ASSERT(0); } @@ -1172,7 +1173,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); pTaskInfo->streamInfo.schema = mtInfo.schema; - qDebug("tmqsnap qStreamPrepareScan snapshot data uid %" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); + qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; @@ -1180,7 +1181,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); return -1; } - qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); + qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); } else if (pOffset->type == TMQ_OFFSET__LOG) { SStreamRawScanInfo* pInfo = pOperator->info; tsdbReaderClose(pInfo->dataReader); From e1be7dd39905d692896049a7898147dd7a2331cc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 27 Feb 2023 17:55:30 +0800 Subject: [PATCH 3/5] fix(query): fix memory leak. --- source/dnode/mnode/impl/src/mndConsumer.c | 2 ++ source/util/src/thashutil.c | 5 ----- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 768fe45e3e..280f3b0ecc 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -618,6 +618,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { // set the update type pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); // all subscribed topics should re-balance. @@ -647,6 +648,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { // set the update type pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); int32_t oldTopicNum = (pExistedConsumer->currentTopics)? taosArrayGetSize(pExistedConsumer->currentTopics):0; diff --git a/source/util/src/thashutil.c b/source/util/src/thashutil.c index 21b9359076..f9c7eb1f56 100644 --- a/source/util/src/thashutil.c +++ b/source/util/src/thashutil.c @@ -50,11 +50,6 @@ uint32_t taosDJB2Hash(const char *key, uint32_t len) { return hash; } -uint32_t xxHash(const char *key, uint32_t len) { - int32_t seed = 0xcc9e2d51; - return XXH32(key, len, seed); -} - uint32_t MurmurHash3_32(const char *key, uint32_t len) { const uint8_t *data = (const uint8_t *)key; const int32_t nblocks = len >> 2u; From d18dd3067bfd2c5db185a26479efbba01da4f227 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Feb 2023 00:38:37 +0800 Subject: [PATCH 4/5] other: merge main. --- include/os/osMemory.h | 2 - source/dnode/vnode/src/tq/tqRead.c | 2 + source/libs/executor/src/executor.c | 1 + tests/develop-test/test.py | 10 +- tests/parallel_test/cases.task | 30 +- .../system-test/0-others/tag_index_advance.py | 520 ++++++++++++++++++ tests/system-test/0-others/tag_index_basic.py | 37 +- tests/system-test/pytest.sh | 4 +- tests/system-test/test.py | 10 +- 9 files changed, 585 insertions(+), 31 deletions(-) create mode 100644 tests/system-test/0-others/tag_index_advance.py diff --git a/include/os/osMemory.h b/include/os/osMemory.h index b197dfd8a2..44a97bf055 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -33,8 +33,6 @@ extern "C" { #undef strdup #define strdup STRDUP_FUNC_TAOS_FORBID #endif -#endif // ifndef ALLOW_FORBID_FUNC - #endif // ifndef ALLOW_FORBID_FUNC #endif // if !defined(WINDOWS) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 4bf6320c32..5352ebe6d4 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -291,6 +291,8 @@ void tqCloseReader(STqReader* pReader) { } int32_t tqSeekVer(STqReader* pReader, int64_t ver) { + // todo set the correct vgId + tqDebug("tmq poll: vgId:%d wal seek to version:%"PRId64, 0, ver); if (walReadSeekVer(pReader->pWalReader, ver) < 0) { tqError("tmq poll: wal reader failed to seek to ver:%"PRId64, ver); return -1; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9611cdab67..c2bf001c86 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1067,6 +1067,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; tsdbReaderClose(pTSInfo->base.dataReader); pTSInfo->base.dataReader = NULL; + // let's seek to the next version in wal file if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { return -1; } diff --git a/tests/develop-test/test.py b/tests/develop-test/test.py index 1b0f0d0aed..600925174f 100644 --- a/tests/develop-test/test.py +++ b/tests/develop-test/test.py @@ -343,7 +343,7 @@ if __name__ == "__main__": for i in range(cursor.rowcount): if res[i][0] == "queryPolicy": if int(res[i][1]) == int(queryPolicy): - tdLog.success( + tdLog.info( f"alter queryPolicy to {queryPolicy} successfully" ) else: @@ -402,7 +402,7 @@ if __name__ == "__main__": for i in range(cursor.rowcount): if res[i][0] == "queryPolicy": if int(res[i][1]) == int(queryPolicy): - tdLog.success( + tdLog.info( f"alter queryPolicy to {queryPolicy} successfully" ) else: @@ -471,7 +471,7 @@ if __name__ == "__main__": # for i in range(tdSql.queryRows): # if tdSql.queryResult[i][0] == "queryPolicy" : # if int(tdSql.queryResult[i][1]) == int(queryPolicy): - # tdLog.success('alter queryPolicy to %d successfully'%queryPolicy) + # tdLog.info('alter queryPolicy to %d successfully'%queryPolicy) # else : # tdLog.debug(tdSql.queryResult) # tdLog.exit("alter queryPolicy to %d failed"%queryPolicy) @@ -484,7 +484,7 @@ if __name__ == "__main__": for i in range(cursor.rowcount): if res[i][0] == "queryPolicy": if int(res[i][1]) == int(queryPolicy): - tdLog.success( + tdLog.info( f"alter queryPolicy to {queryPolicy} successfully" ) else: @@ -545,7 +545,7 @@ if __name__ == "__main__": for i in range(cursor.rowcount): if res[i][0] == "queryPolicy": if int(res[i][1]) == int(queryPolicy): - tdLog.success( + tdLog.info( f"alter queryPolicy to {queryPolicy} successfully" ) else: diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index d708227cba..90b71751f5 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -308,7 +308,7 @@ ,,y,script,./test.sh -f tsim/vnode/replica3_repeat.sim ,,y,script,./test.sh -f tsim/vnode/replica3_vgroup.sim ,,y,script,./test.sh -f tsim/vnode/replica3_many.sim -#,,y,script,./test.sh -f tsim/vnode/replica3_import.sim +,,y,script,./test.sh -f tsim/vnode/replica3_import.sim ,,y,script,./test.sh -f tsim/vnode/stable_balance_replica1.sim ,,y,script,./test.sh -f tsim/vnode/stable_dnode2_stop.sim ,,y,script,./test.sh -f tsim/vnode/stable_dnode2.sim @@ -680,8 +680,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 6 -M 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 @@ -913,13 +913,13 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arccos.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arctan.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/avg.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 3 @@ -927,9 +927,9 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/statecount.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/statecount.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tail.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ttl_comment.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distribute_agg_count.py -Q 3 @@ -945,7 +945,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3 @@ -991,7 +991,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/timetruncate.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Timediff.py -Q 4 -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 4 +#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/bottom.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py -Q 4 @@ -1086,7 +1086,7 @@ ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/json_tag.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/query_json.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sample_csv_json.py -,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sml_json_alltypes.py +#,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sml_json_alltypes.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/taosdemoTestQueryWithJson.py -R ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/telnet_tcp.py -R diff --git a/tests/system-test/0-others/tag_index_advance.py b/tests/system-test/0-others/tag_index_advance.py new file mode 100644 index 0000000000..a8d6cde85a --- /dev/null +++ b/tests/system-test/0-others/tag_index_advance.py @@ -0,0 +1,520 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE +import taos +import sys +import time +import os +import socket +import subprocess +import random +import string +import random + + +from util.log import * +from util.sql import * +from util.cases import * +from util.common import * +from util.sqlset import * + +from util.dnodes import * +from util.dnodes import TDDnodes +from util.dnodes import TDDnode + +# +# -------------- util -------------------------- +# +def pathSize(path): + + total_size = 0 + for dirpath, dirnames, filenames in os.walk(path): + for i in filenames: + #use join to concatenate all the components of path + f = os.path.join(dirpath, i) + #use getsize to generate size in bytes and add it to the total size + total_size += os.path.getsize(f) + #print(dirpath) + + print(" %s %.02f MB"%(path, total_size/1024/1024)) + return total_size + + ''' + total = 0 + with os.scandir(path) as it: + for entry in it: + if entry.is_file(): + total += entry.stat().st_size + elif entry.is_dir(): + total += pathSize(entry.path) + + print(" %s %.02f MB"%(path, total/1024/1024)) + return total + ''' + +# +# --------------- cluster ------------------------ +# + +class MyDnodes(TDDnodes): + def __init__(self ,dnodes_lists): + super(MyDnodes,self).__init__() + self.dnodes = dnodes_lists # dnode must be TDDnode instance + self.simDeployed = False + +class TagCluster: + noConn = True + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to excute {__file__}") + self.TDDnodes = None + self.depoly_cluster(5) + self.master_dnode = self.TDDnodes.dnodes[0] + self.host=self.master_dnode.cfgDict["fqdn"] + conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir) + tdSql.init(conn1.cursor()) + + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + + def depoly_cluster(self ,dnodes_nums): + + testCluster = False + valgrind = 0 + hostname = socket.gethostname() + dnodes = [] + start_port = 6030 + for num in range(1, dnodes_nums+1): + dnode = TDDnode(num) + dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") + dnode.addExtraCfg("fqdn", f"{hostname}") + dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") + dnode.addExtraCfg("monitorFqdn", hostname) + dnode.addExtraCfg("monitorPort", 7043) + dnodes.append(dnode) + + self.TDDnodes = MyDnodes(dnodes) + self.TDDnodes.init("") + self.TDDnodes.setTestCluster(testCluster) + self.TDDnodes.setValgrind(valgrind) + + self.TDDnodes.setAsan(tdDnodes.getAsan()) + self.TDDnodes.stopAll() + for dnode in self.TDDnodes.dnodes: + self.TDDnodes.deploy(dnode.index,{}) + + for dnode in self.TDDnodes.dnodes: + self.TDDnodes.starttaosd(dnode.index) + + # create cluster + dnode_first_host = "" + sql = "" + for dnode in self.TDDnodes.dnodes[1:]: + # print(dnode.cfgDict) + dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"] + if dnode_first_host == "": + dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0] + dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1] + sql += f"create dnode '{dnode_id}'; " + + cmd = f"{self.getBuildPath()}/build/bin/taos -h {dnode_first_host} -P {dnode_first_port} -s " + cmd += f'"{sql}"' + print(cmd) + os.system(cmd) + + time.sleep(2) + tdLog.info(" create cluster done! ") + + + def getConnection(self, dnode): + host = dnode.cfgDict["fqdn"] + port = dnode.cfgDict["serverPort"] + config_dir = dnode.cfgDir + return taos.connect(host=host, port=int(port), config=config_dir) + + def run(self): + tdLog.info(" create cluster ok.") + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +class PerfDB: + def __init__(self): + self.sqls = [] + self.spends= [] + + # execute + def execute(self, sql): + print(f" perfdb execute {sql}") + stime = time.time() + ret = tdSql.execute(sql, 1) + spend = time.time() - stime + + self.sqls.append(sql) + self.spends.append(spend) + return ret + + # query + def query(self, sql): + print(f" perfdb query {sql}") + start = time.time() + ret = tdSql.query(sql, None, 1) + spend = time.time() - start + self.sqls.append(sql) + self.spends.append(spend) + return ret + + +# +# ----------------- TDTestCase ------------------ +# +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + tdLog.debug("start to execute %s" % __file__) + self.dbs = [PerfDB(), PerfDB()] + self.cur = 0 + self.tagCluster = TagCluster() + self.tagCluster.init(conn, logSql, replicaVar) + self.lenBinary = 64 + self.lenNchar = 32 + + # column + self.column_dict = { + 'ts': 'timestamp', + 'col1': 'tinyint', + 'col2': 'smallint', + 'col3': 'int', + 'col4': 'bigint', + 'col5': 'tinyint unsigned', + 'col6': 'smallint unsigned', + 'col7': 'int unsigned', + 'col8': 'bigint unsigned', + 'col9': 'float', + 'col10': 'double', + 'col11': 'bool', + 'col12': f'varchar({self.lenBinary})', + 'col13': f'nchar({self.lenNchar})' + } + # tag + self.tag_dict = { + 't1': 'tinyint', + 't2': 'smallint', + 't3': 'int', + 't4': 'bigint', + 't5': 'tinyint unsigned', + 't6': 'smallint unsigned', + 't7': 'int unsigned', + 't8': 'bigint unsigned', + 't9': 'float', + 't10': 'double', + 't11': 'bool', + 't12': f'varchar({self.lenBinary})', + 't13': f'nchar({self.lenNchar})', + 't14': 'timestamp' + } + + # generate specail wide random string + def random_string(self, count): + letters = string.ascii_letters + return ''.join(random.choice(letters) for i in range(count)) + + # execute + def execute(self, sql): + obj = self.dbs[self.cur] + return obj.execute(sql) + + # query + def query(self, sql): + return self.dbs[self.cur].query(sql) + + def set_stb_sql(self,stbname,column_dict,tag_dict): + column_sql = '' + tag_sql = '' + for k,v in column_dict.items(): + column_sql += f"{k} {v}, " + for k,v in tag_dict.items(): + tag_sql += f"{k} {v}, " + create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})' + return create_stb_sql + + # create datbase + def create_database(self, dbname, vgroups, replica): + sql = f'create database {dbname} vgroups {vgroups} replica {replica}' + tdSql.execute(sql) + #tdSql.execute(sql) + tdSql.execute(f'use {dbname}') + + # create stable and child tables + def create_table(self, stbname, tbname, count): + # create stable + create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict) + tdSql.execute(create_table_sql) + + # create child table + tdLog.info(f" start create {count} child tables.") + for i in range(count): + ti = i % 128 + binTxt = self.random_string(self.lenBinary) + tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"{binTxt}","nch{i}",now' + sql = f'create table {tbname}{i} using {stbname} tags({tags})' + tdSql.execute(sql) + if i > 0 and i % 1000 == 0: + tdLog.info(f" child table count = {i}") + + tdLog.info(f" end create {count} child tables.") + + + # create stable and child tables + def create_tagidx(self, stbname): + cnt = -1 + for key in self.tag_dict.keys(): + # first tag have default index, so skip + if cnt == -1: + cnt = 0 + continue; + sql = f'create index idx_{key} on {stbname} ({key})' + tdLog.info(f" sql={sql}") + tdSql.execute(sql) + cnt += 1 + tdLog.info(f' create {cnt} tag indexs ok.') + + # insert to child table d1 data + def insert_data(self, tbname): + # d1 insert 3 rows + for i in range(3): + sql = f'insert into {tbname}1(ts,col1) values(now+{i}s,{i});' + tdSql.execute(sql) + # d20 insert 4 + for i in range(4): + sql = f'insert into {tbname}20(ts,col1) values(now+{i}s,{i});' + tdSql.execute(sql) + + # check show indexs + def show_tagidx(self, dbname, stbname): + sql = f'select index_name,column_name from information_schema.ins_indexes where db_name="{dbname}"' + tdSql.query(sql) + rows = len(self.tag_dict.keys())-1 + tdSql.checkRows(rows) + + for i in range(rows): + col_name = tdSql.getData(i, 1) + idx_name = f'idx_{col_name}' + tdSql.checkData(i, 0, idx_name) + + tdLog.info(f' show {rows} tag indexs ok.') + + # query with tag idx + def query_tagidx(self, stbname): + sql = f'select * from meters where t2=1' + self.query(sql) + tdSql.checkRows(3) + + sql = f'select * from meters where t2<10' + self.query(sql) + tdSql.checkRows(3) + + sql = f'select * from meters where t2>10' + self.query(sql) + tdSql.checkRows(4) + + sql = f'select * from meters where t3<30' + self.query(sql) + tdSql.checkRows(7) + + sql = f'select * from meters where t12="11"' + tdSql.query(sql) + tdSql.checkRows(0) + + sql = f'select * from meters where (t4 < 10 or t5 = 20) and t6= 30' + self.query(sql) + tdSql.checkRows(0) + + sql = f'select * from meters where (t7 < 20 and t8 = 20) or t4 = 20' + self.query(sql) + tdSql.checkRows(4) + + sql = f'select * from meters where t12 like "%ab%" ' + self.query(sql) + tdSql.checkRows(0) + + sql = f'select * from meters where t13 = "d20" ' + self.query(sql) + tdSql.checkRows(0) + + sql = f'select * from meters where t13 = "nch20" ' + self.query(sql) + tdSql.checkRows(4) + + sql = f'select * from meters where tbname = "d20" ' + self.query(sql) + tdSql.checkRows(4) + + + # drop child table + def drop_tables(self, tbname, count): + # table d1 and d20 have verify data , so can not drop + start = random.randint(21, count/2) + end = random.randint(count/2 + 1, count - 1) + for i in range(start, end): + sql = f'drop table {tbname}{i}' + tdSql.execute(sql) + cnt = end - start + 1 + tdLog.info(f' drop table from {start} to {end} count={cnt}') + + # drop tag index + def drop_tagidx(self, dbname, stbname): + # drop index + cnt = -1 + for key in self.tag_dict.keys(): + # first tag have default index, so skip + if cnt == -1: + cnt = 0 + continue; + sql = f'drop index idx_{key}' + tdSql.execute(sql) + cnt += 1 + + # check idx result is 0 + sql = f'select index_name,column_name from information_schema.ins_indexes where db_name="{dbname}"' + tdSql.query(sql) + tdSql.checkRows(0) + tdLog.info(f' drop {cnt} tag indexs ok.') + + # show performance + def show_performance(self, count) : + db = self.dbs[0] + db1 = self.dbs[1] + cnt = len(db.sqls) + cnt1 = len(db1.sqls) + if cnt != len(db1.sqls): + tdLog.info(f" datebase sql count not equal. cnt={cnt} cnt1={cnt1}\n") + return False + + tdLog.info(f" database sql cnt ={cnt}") + print(f" ----------------- performance (child tables = {count})--------------------") + print(" No time(index) time(no-index) diff(col3-col2) rate(col2/col3) sql") + for i in range(cnt): + key = db.sqls[i] + value = db.spends[i] + key1 = db1.sqls[i] + value1 = db1.spends[i] + diff = value1 - value + rate = value/value1*100 + print(" %d %.3fs %.3fs %.3fs %d%% %s"%(i+1, value, value1, diff, rate, key)) + print(" --------------------- end ------------------------") + return True + + def show_diskspace(self): + #calc + selfPath = os.path.dirname(os.path.realpath(__file__)) + projPath = "" + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + # total + vnode2_size = pathSize(projPath + "sim/dnode2/data/vnode/vnode2/") + vnode3_size = pathSize(projPath + "sim/dnode3/data/vnode/vnode3/") + vnode4_size = pathSize(projPath + "sim/dnode4/data/vnode/vnode4/") + vnode5_size = pathSize(projPath + "sim/dnode5/data/vnode/vnode5/") + + # show + print(" ----------------- disk space --------------------") + idx_size = vnode2_size + vnode3_size + noidx_size = vnode4_size + vnode5_size + + print(" index = %.02f M"%(idx_size/1024/1024)) + print(" no-index = %.02f M"%(noidx_size/1024/1024)) + print(" index/no-index = %.2f multiple"%(idx_size/noidx_size)) + + print(" -------------------- end ------------------------") + + + + + # main + def testdb(self, dbname, stable, tbname, count, createidx): + # cur + if createidx: + self.cur = 0 + else : + self.cur = 1 + + # do + self.create_database(dbname, 2, 1) + self.create_table(stable, tbname, count) + if(createidx): + self.create_tagidx(stable) + self.insert_data(tbname) + if(createidx): + self.show_tagidx(dbname,stable) + self.query_tagidx(stable) + #self.drop_tables(tbname, count) + #if(createidx): + # self.drop_tagidx(dbname, stable) + # query after delete , expect no crash + #self.query_tagidx(stable) + tdSql.execute(f'flush database {dbname}') + + # run + def run(self): + self.tagCluster.run() + + # var + dbname = "tagindex" + dbname1 = dbname + "1" + stable = "meters" + tbname = "d" + count = 10000 + + # test db + tdLog.info(f" ------------- {dbname} ----------") + self.testdb(dbname, stable, tbname, count, True) + tdLog.info(f" ------------- {dbname1} ----------") + self.testdb(dbname1, stable, tbname, count, False) + + # show test result + self.show_performance(count) + + self.tagCluster.TDDnodes.stopAll() + sec = 10 + print(f" sleep {sec}s wait taosd stopping ...") + time.sleep(sec) + + self.show_diskspace() + + + def stop(self): + self.tagCluster.stop() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/0-others/tag_index_basic.py b/tests/system-test/0-others/tag_index_basic.py index 195d8910e7..96c3dca016 100644 --- a/tests/system-test/0-others/tag_index_basic.py +++ b/tests/system-test/0-others/tag_index_basic.py @@ -107,11 +107,11 @@ class TDTestCase: def insert_data(self, tbname): # d1 insert 3 rows for i in range(3): - sql = f'insert into {tbname}1(ts,col1) values(now,{i});' + sql = f'insert into {tbname}1(ts,col1) values(now+{i}s,{i});' tdSql.execute(sql) # d20 insert 4 for i in range(4): - sql = f'insert into {tbname}20(ts,col1) values(now,{i});' + sql = f'insert into {tbname}20(ts,col1) values(now+{i}s,{i});' tdSql.execute(sql) # check show indexs @@ -150,6 +150,22 @@ class TDTestCase: tdSql.query(sql) tdSql.checkRows(0) + sql = f'select t12 ,t13,tbname from meters where t13="nch20"' + tdSql.query(sql) + tdSql.checkRows(4) + + sql = f'select * from meters where t12 like "%ab%" ' + tdSql.query(sql) + tdSql.checkRows(0) + + sql = f'select * from meters where t13 = "d20" ' + tdSql.query(sql) + tdSql.checkRows(0) + + sql = f'select * from meters where tbname = "d20" ' + tdSql.query(sql) + tdSql.checkRows(4) + sql = f'select * from meters where (t4 < 10 or t5 = 20) and t6= 30' tdSql.query(sql) tdSql.checkRows(0) @@ -188,6 +204,22 @@ class TDTestCase: tdSql.checkRows(0) tdLog.info(f' drop {cnt} tag indexs ok.') + # create long name idx + def longname_idx(self, stbname): + long_name = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjjkkkkkkkkkkllllllllllmmmmmmmmmm" + key = "t3" + sql = f'create index {long_name} on {stbname} ({key})' + tdSql.error(sql) + + long_name = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff" + key = "t3" + sql = f'create index {long_name} on {stbname} ({key})' + tdLog.info(f"{sql}") + tdSql.execute(sql) + sql = f'drop index {long_name}' + tdLog.info(f"{sql}") + tdSql.execute(sql) + # run def run(self): # var @@ -204,6 +236,7 @@ class TDTestCase: self.drop_tagidx(stable) # query after delete , expect no crash self.query_tagidx(stable) + self.longname_idx(stable) def stop(self): diff --git a/tests/system-test/pytest.sh b/tests/system-test/pytest.sh index 792cab98f7..a76efb62f3 100755 --- a/tests/system-test/pytest.sh +++ b/tests/system-test/pytest.sh @@ -94,7 +94,7 @@ else sleep 1 done - AsanFileSuccessLen=$(grep -w successfully $AsanFile | wc -l) + AsanFileSuccessLen=$(grep -w "successfully executed" $AsanFile | wc -l) echo "AsanFileSuccessLen:" $AsanFileSuccessLen if [ $AsanFileSuccessLen -gt 0 ]; then @@ -104,4 +104,4 @@ else echo "Execute script failure" exit 1 fi -fi +fi \ No newline at end of file diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 2017aad1ca..0c62c182f7 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -318,7 +318,7 @@ if __name__ == "__main__": for i in range(cursor.rowcount): if res[i][0] == "queryPolicy" : if int(res[i][1]) == int(queryPolicy): - tdLog.success(f'alter queryPolicy to {queryPolicy} successfully') + tdLog.info(f'alter queryPolicy to {queryPolicy} successfully') else: tdLog.debug(res) tdLog.exit(f"alter queryPolicy to {queryPolicy} failed") @@ -371,7 +371,7 @@ if __name__ == "__main__": for i in range(cursor.rowcount): if res[i][0] == "queryPolicy" : if int(res[i][1]) == int(queryPolicy): - tdLog.success(f'alter queryPolicy to {queryPolicy} successfully') + tdLog.info(f'alter queryPolicy to {queryPolicy} successfully') else: tdLog.debug(res) tdLog.exit(f"alter queryPolicy to {queryPolicy} failed") @@ -439,7 +439,7 @@ if __name__ == "__main__": # for i in range(tdSql.queryRows): # if tdSql.queryResult[i][0] == "queryPolicy" : # if int(tdSql.queryResult[i][1]) == int(queryPolicy): - # tdLog.success('alter queryPolicy to %d successfully'%queryPolicy) + # tdLog.info('alter queryPolicy to %d successfully'%queryPolicy) # else : # tdLog.debug(tdSql.queryResult) # tdLog.exit("alter queryPolicy to %d failed"%queryPolicy) @@ -452,7 +452,7 @@ if __name__ == "__main__": for i in range(cursor.rowcount): if res[i][0] == "queryPolicy" : if int(res[i][1]) == int(queryPolicy): - tdLog.success(f'alter queryPolicy to {queryPolicy} successfully') + tdLog.info(f'alter queryPolicy to {queryPolicy} successfully') else: tdLog.debug(res) tdLog.exit(f"alter queryPolicy to {queryPolicy} failed") @@ -509,7 +509,7 @@ if __name__ == "__main__": for i in range(cursor.rowcount): if res[i][0] == "queryPolicy" : if int(res[i][1]) == int(queryPolicy): - tdLog.success(f'alter queryPolicy to {queryPolicy} successfully') + tdLog.info(f'alter queryPolicy to {queryPolicy} successfully') else: tdLog.debug(res) tdLog.exit(f"alter queryPolicy to {queryPolicy} failed") From b92de1e9e6a5d6b32b89cd8356c0b231ca2adb8b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Feb 2023 09:34:16 +0800 Subject: [PATCH 5/5] fix(query): fix invalid read. --- source/libs/executor/src/executil.c | 46 +++++++++++++------------- source/libs/nodes/src/nodesUtilFuncs.c | 6 ++-- tools/shell/src/shellAuto.c | 9 +++-- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 6b6f5cfe93..9537751ff0 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -850,32 +850,32 @@ static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTa tagVal.cid = pColInfo->info.colId; if (p1->pTagVal == NULL) { colDataSetNULL(pColInfo, i); - } - - const char* p = metaGetTableTagVal(p1->pTagVal, pColInfo->info.type, &tagVal); - - if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) { - colDataSetNULL(pColInfo, i); - } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { - colDataSetVal(pColInfo, i, p, false); - } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { - char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1); - varDataSetLen(tmp, tagVal.nData); - memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData); - colDataSetVal(pColInfo, i, tmp, false); -#if TAG_FILTER_DEBUG - qDebug("tagfilter varch:%s", tmp + 2); -#endif - taosMemoryFree(tmp); } else { - colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false); + const char* p = metaGetTableTagVal(p1->pTagVal, pColInfo->info.type, &tagVal); + + if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) { + colDataSetNULL(pColInfo, i); + } else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) { + colDataSetVal(pColInfo, i, p, false); + } else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { + char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1); + varDataSetLen(tmp, tagVal.nData); + memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData); + colDataSetVal(pColInfo, i, tmp, false); #if TAG_FILTER_DEBUG - if (pColInfo->info.type == TSDB_DATA_TYPE_INT) { - qDebug("tagfilter int:%d", *(int*)(&tagVal.i64)); - } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) { - qDebug("tagfilter double:%f", *(double*)(&tagVal.i64)); - } + qDebug("tagfilter varch:%s", tmp + 2); #endif + taosMemoryFree(tmp); + } else { + colDataSetVal(pColInfo, i, (const char*)&tagVal.i64, false); +#if TAG_FILTER_DEBUG + if (pColInfo->info.type == TSDB_DATA_TYPE_INT) { + qDebug("tagfilter int:%d", *(int*)(&tagVal.i64)); + } else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) { + qDebug("tagfilter double:%f", *(double*)(&tagVal.i64)); + } +#endif + } } } } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 0419c883e6..024130b5f8 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -2101,9 +2101,9 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal) { case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: - pVal->pz = taosMemoryMalloc(pVal->nLen + VARSTR_HEADER_SIZE + 1); - memcpy(pVal->pz, pNode->datum.p, pVal->nLen + VARSTR_HEADER_SIZE); - pVal->pz[pVal->nLen + VARSTR_HEADER_SIZE] = 0; + pVal->pz = taosMemoryMalloc(pVal->nLen + 1); + memcpy(pVal->pz, pNode->datum.p, pVal->nLen); + pVal->pz[pVal->nLen] = 0; break; case TSDB_DATA_TYPE_JSON: pVal->nLen = getJsonValueLen(pNode->datum.p); diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 1d872e3f0d..a8bc93dd0c 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -98,6 +98,7 @@ SWords shellCommands[] = { {"describe ", 0, 0, NULL}, {"delete from where ", 0, 0, NULL}, {"drop database ", 0, 0, NULL}, + {"drop index ", 0, 0, NULL}, {"drop table ", 0, 0, NULL}, {"drop dnode ", 0, 0, NULL}, {"drop mnode on dnode ;", 0, 0, NULL}, @@ -384,7 +385,7 @@ void showHelp() { create table using tags ...\n\ create database ...\n\ create dnode \"fqdn:port\" ...\n\ - create index ...\n\ + create index on (tag_column_name);\n\ create mnode on dnode ;\n\ create qnode on dnode ;\n\ create stream into as select ...\n\ @@ -404,6 +405,7 @@ void showHelp() { drop consumer group ... \n\ drop topic ;\n\ drop stream ;\n\ + drop index ;\n\ ----- E ----- \n\ explain select clause ...\n\ ----- F ----- \n\ @@ -534,7 +536,7 @@ SWord* addWord(const char* p, int32_t len, bool pattern) { word->len = len; // check format - if (pattern) { + if (pattern && len > 0) { word->type = wordType(p, len); } else { word->type = WT_TEXT; @@ -1724,6 +1726,9 @@ bool matchEnd(TAOS* con, SShellCmd* cmd) { if (strlen(last) == 0) { goto _return; } + if (strcmp(last, " ") == 0) { + goto _return; + } // match database if (elast == NULL) {