From 3fd6770c81f5e1da6ae56a3b41d740fd9dd4ff91 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 29 Apr 2024 05:52:54 +0000 Subject: [PATCH 01/10] avoid invalid read --- source/libs/stream/src/streamCheckpoint.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e9dfbf7693..0de7e83871 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -25,6 +25,7 @@ typedef struct { SStreamTask* pTask; int64_t dbRefId; + void* pMeta; } SAsyncUploadArg; static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); @@ -437,7 +438,7 @@ int32_t uploadCheckpointData(void* param) { return -1; } - if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, + if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, ((SStreamMeta*)arg->pMeta)->bkdChkptMgt, arg->chkpId, (int8_t)(arg->type), &path, toDelFiles)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); } @@ -489,6 +490,7 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha arg->chkpId = chkpId; arg->pTask = pTask; arg->dbRefId = taskGetDBRef(pTask->pBackend); + arg->pMeta = pTask->pMeta; return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); } From d5f140aee2177b762901436bc421698c3381daef Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 29 Apr 2024 15:23:00 +0800 Subject: [PATCH 02/10] fix(tdb/btc): free txn early when closing btc --- source/libs/tdb/src/db/tdbBtree.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 7a62b38b16..543e06aae6 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -2473,6 +2473,10 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { } int tdbBtcClose(SBTC *pBtc) { + if (pBtc->freeTxn) { + tdbTxnClose(pBtc->pTxn); + } + if (pBtc->iPage < 0) return 0; for (;;) { @@ -2496,10 +2500,6 @@ int tdbBtcClose(SBTC *pBtc) { tdbFree(pBtc->coder.pVal); } - if (pBtc->freeTxn) { - tdbTxnClose(pBtc->pTxn); - } - return 0; } From 94fc209d7d74a785f8c54b1ada154b7d4f72542a Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 29 Apr 2024 16:17:04 +0800 Subject: [PATCH 03/10] fix scanoperator using sma info of last group --- source/libs/executor/src/scanoperator.c | 1 + tests/system-test/2-query/count.py | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c4c3722455..78f012cb98 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -711,6 +711,7 @@ static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, i pInfo->tableStartIndex = TARRAY_ELEM_IDX(pInfo->base.pTableListInfo->pTableList, *pKeyInfo); pInfo->tableEndIndex = (pInfo->tableStartIndex + (*size) - 1); pInfo->pResBlock->info.blankFill = false; + taosMemoryFreeClear(pInfo->pResBlock->pBlockAgg); if (!pInfo->needCountEmptyTable) { pInfo->countState = TABLE_COUNT_STATE_END; diff --git a/tests/system-test/2-query/count.py b/tests/system-test/2-query/count.py index 2f82ece0ec..155cb39b62 100644 --- a/tests/system-test/2-query/count.py +++ b/tests/system-test/2-query/count.py @@ -257,7 +257,18 @@ class TDTestCase: os.system(f'taos -f {sql_file}') tdSql.query('select count(c_1) from d2.t2 where c_1 < 10', queryTimes=1) tdSql.checkData(0, 0, 0) - tdSql.execute('drop database d2') + tdSql.query('select count(c_1), min(c_1),tbname from d2.can partition by tbname', queryTimes=1) + tdSql.checkData(0, 0, 0) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, 't3') + + tdSql.checkData(1, 0, 15) + tdSql.checkData(1, 1, 1471617148940980000) + tdSql.checkData(1, 2, 't2') + + tdSql.checkData(2, 0, 0) + tdSql.checkData(2, 1, None) + tdSql.checkData(2, 2, 't1') def run(self): self.test_count_with_sma_data() From 785faf50aaf3f6f6566721fb9e7b95d8b95e2684 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 29 Apr 2024 16:27:11 +0800 Subject: [PATCH 04/10] btc/close: free txn only if iPage < 0 --- source/libs/tdb/src/db/tdbBtree.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 543e06aae6..26652cfffb 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -2473,12 +2473,13 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { } int tdbBtcClose(SBTC *pBtc) { - if (pBtc->freeTxn) { - tdbTxnClose(pBtc->pTxn); + if (pBtc->iPage < 0) { + if (pBtc->freeTxn) { + tdbTxnClose(pBtc->pTxn); + } + return 0; } - if (pBtc->iPage < 0) return 0; - for (;;) { if (NULL == pBtc->pPage) { tdbError("tdb/btc-close: null ptr pPage."); @@ -2500,6 +2501,10 @@ int tdbBtcClose(SBTC *pBtc) { tdbFree(pBtc->coder.pVal); } + if (pBtc->freeTxn) { + tdbTxnClose(pBtc->pTxn); + } + return 0; } From 62a127780124380276e08c216b892de4fd91810d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 29 Apr 2024 16:50:00 +0800 Subject: [PATCH 05/10] fix(util): fix the assign of pointer data in ssdatablock. --- include/common/tdatablock.h | 2 +- source/common/src/tdatablock.c | 90 +++++++++++++++++++--------------- 2 files changed, 52 insertions(+), 40 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 5df042f83d..32cd7bb2ab 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -250,7 +250,7 @@ int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n); int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n); int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); -int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src); +int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc); SSDataBlock* createDataBlock(); void* blockDataDestroy(SSDataBlock* pBlock); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 6630ad59b1..1c63174bf6 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -21,6 +21,8 @@ #define MALLOC_ALIGN_BYTES 32 +static void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc); + int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (pColumnInfoData->reassigned) { @@ -1563,24 +1565,28 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { return 0; } -int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) { - blockDataCleanup(dst); - int32_t code = blockDataEnsureCapacity(dst, src->info.rows); +int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) { + blockDataCleanup(pDst); + + int32_t code = blockDataEnsureCapacity(pDst, pSrc->info.rows); if (code != TSDB_CODE_SUCCESS) { terrno = code; return code; } - size_t numOfCols = taosArrayGetSize(src->pDataBlock); + size_t numOfCols = taosArrayGetSize(pSrc->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i); - SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i); - colDataAssign(pDst, pSrc, src->info.rows, &src->info); + SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); + SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i); + colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info); } - uint32_t cap = dst->info.capacity; - dst->info = src->info; - dst->info.capacity = cap; + uint32_t cap = pDst->info.capacity; + + pDst->info = pSrc->info; + copyPkVal(&pDst->info, &pSrc->info); + + pDst->info.capacity = cap; return TSDB_CODE_SUCCESS; } @@ -1663,62 +1669,68 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) { return pBlock; } +void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) { + if (!IS_VAR_DATA_TYPE(pSrc->pks[0].type)) { + return; + } + + // prepare the pk buffer if needed + SValue* p = &pDst->pks[0]; + + p->type = pDst->pks[0].type; + p->pData = taosMemoryCalloc(1, pDst->pks[0].nData); + p->nData = pDst->pks[0].nData; + memcpy(p->pData, pDst->pks[0].pData, p->nData); + + p = &pDst->pks[1]; + p->type = pDst->pks[1].type; + p->pData = taosMemoryCalloc(1, pDst->pks[1].nData); + p->nData = pDst->pks[1].nData; + memcpy(p->pData, pDst->pks[1].pData, p->nData); +} + SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { if (pDataBlock == NULL) { return NULL; } - SSDataBlock* pBlock = createDataBlock(); - pBlock->info = pDataBlock->info; + SSDataBlock* pDstBlock = createDataBlock(); + pDstBlock->info = pDataBlock->info; - pBlock->info.rows = 0; - pBlock->info.capacity = 0; - pBlock->info.rowSize = 0; - pBlock->info.id = pDataBlock->info.id; - pBlock->info.blankFill = pDataBlock->info.blankFill; + pDstBlock->info.rows = 0; + pDstBlock->info.capacity = 0; + pDstBlock->info.rowSize = 0; + pDstBlock->info.id = pDataBlock->info.id; + pDstBlock->info.blankFill = pDataBlock->info.blankFill; size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; - blockDataAppendColInfo(pBlock, &colInfo); + blockDataAppendColInfo(pDstBlock, &colInfo); } - // prepare the pk buffer if necessary - if (IS_VAR_DATA_TYPE(pDataBlock->info.pks[0].type)) { - SValue* pVal = &pBlock->info.pks[0]; - - pVal->type = pDataBlock->info.pks[0].type; - pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); - pVal->nData = pDataBlock->info.pks[0].nData; - memcpy(pVal->pData, pDataBlock->info.pks[0].pData, pVal->nData); - - SValue* p = &pBlock->info.pks[1]; - p->type = pDataBlock->info.pks[1].type; - p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); - p->nData = pDataBlock->info.pks[1].nData; - memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData); - } + copyPkVal(&pDstBlock->info, &pDataBlock->info); if (copyData) { - int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows); + int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { terrno = code; - blockDataDestroy(pBlock); + blockDataDestroy(pDstBlock); return NULL; } for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info); } - pBlock->info.rows = pDataBlock->info.rows; - pBlock->info.capacity = pDataBlock->info.rows; + pDstBlock->info.rows = pDataBlock->info.rows; + pDstBlock->info.capacity = pDataBlock->info.rows; } - return pBlock; + return pDstBlock; } SSDataBlock* createDataBlock() { From 1f85cb42e65f05aa6ace816adad423ead8e06b0e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 29 Apr 2024 17:22:40 +0800 Subject: [PATCH 06/10] fix:[TS-4728]transform strore data if data is too big --- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tq.c | 26 +++-- source/dnode/vnode/src/tq/tqMeta.c | 144 ++++++++++++++++++++++++++- source/dnode/vnode/src/tq/tqOffset.c | 4 + 4 files changed, 162 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index bd8b73ed33..e2ecdca59f 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -136,6 +136,7 @@ int32_t tqMetaGetHandle(STQ* pTq, const char* key); int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); STqOffsetStore* tqOffsetOpen(STQ* pTq); +int32_t tqMetaTransform(STQ* pTq); void tqOffsetClose(STqOffsetStore*); STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0e6b85bd2b..2add128eff 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -86,15 +86,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { } int32_t tqInitialize(STQ* pTq) { - if (tqMetaOpen(pTq) < 0) { - return -1; - } - - pTq->pOffsetStore = tqOffsetOpen(pTq); - if (pTq->pOffsetStore == NULL) { - return -1; - } - int32_t vgId = TD_VID(pTq->pVnode); pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); if (pTq->pStreamMeta == NULL) { @@ -102,6 +93,23 @@ int32_t tqInitialize(STQ* pTq) { } /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); + + if (tqMetaTransform(pTq) < 0) { + return -1; + } + pTq->pOffsetStore = tqOffsetOpen(pTq); + if (pTq->pOffsetStore == NULL) { + return -1; + } + + if (tqMetaRestoreCheckInfo(pTq) < 0) { + return -1; + } + + pTq->pOffsetStore = tqOffsetOpen(pTq); + if (pTq->pOffsetStore == NULL) { + return -1; + } return 0; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 82547af8d4..76322c527f 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -88,10 +88,6 @@ int32_t tqMetaOpen(STQ* pTq) { return -1; } - if (tqMetaRestoreCheckInfo(pTq) < 0) { - return -1; - } - return 0; } @@ -378,6 +374,146 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); } +static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){ + TBC* pCur = NULL; + if (tdbTbcOpen(pExecStoreOld, &pCur, NULL) < 0) { + return -1; + } + + TXN* txn; + if (tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + return -1; + } + + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + + tdbTbcMoveToFirst(pCur); + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + if (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn) < 0) { + tqError("transform sub info error"); + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + return -1; + } + } + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + + if (tdbCommit(pMetaDB, txn) < 0) { + return -1; + } + + if (tdbPostCommit(pMetaDB, txn) < 0) { + return -1; + } + return 0; +} + +int32_t tqMetaTransform(STQ* pTq) { + int32_t len = strlen(pTq->path) + 64; + char* maindb = taosMemoryCalloc(1, len); + sprintf(maindb, "%s%s%s", pTq->path, TD_DIRSEP, TDB_MAINDB_NAME); + + if(!taosCheckExistFile(maindb)){ + taosMemoryFree(maindb); + char* tpath = taosMemoryCalloc(1, len); + if(tpath == NULL){ + return -1; + } + sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe"); + taosMemoryFree(pTq->path); + pTq->path = tpath; + return tqMetaOpen(pTq); + } + + int32_t code = 0; + TDB* pMetaDB = NULL; + TTB* pExecStore = NULL; + TTB* pCheckStore = NULL; + char* offsetNew = NULL; + char* offset = tqOffsetBuildFName(pTq->path, 0); + if(offset == NULL){ + code = -1; + goto END; + } + + + if (tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL) < 0) { + code = -1; + goto END; + } + + if (tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0) < 0) { + code = -1; + goto END; + } + + if (tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0) < 0) { + code = -1; + goto END; + } + + char* tpath = taosMemoryCalloc(1, len); + if(tpath == NULL){ + code = -1; + goto END; + } + sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe"); + taosMemoryFree(pTq->path); + pTq->path = tpath; + if (tqMetaOpen(pTq) < 0) { + code = -1; + goto END; + } + + if( tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore) < 0){ + code = -1; + goto END; + } + + if(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore) < 0){ + code = -1; + goto END; + } + + tdbTbClose(pExecStore); + pExecStore = NULL; + tdbTbClose(pCheckStore); + pCheckStore = NULL; + tdbClose(pMetaDB); + pMetaDB = NULL; + + offsetNew = tqOffsetBuildFName(pTq->path, 0); + if(offsetNew == NULL){ + code = -1; + goto END; + } + if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){ + tqError("copy offset file error"); + code = -1; + goto END; + } + + taosRemoveFile(maindb); + taosRemoveFile(offset); + + END: + taosMemoryFree(maindb); + taosMemoryFree(offset); + taosMemoryFree(offsetNew); + + tdbTbClose(pExecStore); + tdbTbClose(pCheckStore); + tdbClose(pMetaDB); + + return code; +} + //int32_t tqMetaRestoreHandle(STQ* pTq) { // int code = 0; // TBC* pCur = NULL; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 7321e73d28..8b0e039ad5 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -25,6 +25,10 @@ struct STqOffsetStore { char* tqOffsetBuildFName(const char* path, int32_t fVer) { int32_t len = strlen(path); char* fname = taosMemoryCalloc(1, len + 40); + if(fname == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer); return fname; } From f99cad9443e18b16b7a094c496564cdced3e0f47 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 29 Apr 2024 11:30:38 +0000 Subject: [PATCH 07/10] opt transfer --- source/dnode/mnode/impl/src/mndSync.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 28c857319f..573b75ff5a 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -66,7 +66,7 @@ static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { } static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t code = tmsgSendReq(pEpSet, pMsg); + int32_t code = tmsgSendSyncReq(pEpSet, pMsg); if (code != 0) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; From cd3f3046f7bfdd9c307d6c93cbd269d3ccbfc258 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 29 Apr 2024 19:36:41 +0800 Subject: [PATCH 08/10] fix:memory leak --- source/dnode/vnode/src/tq/tq.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2add128eff..baef3d9bd6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -97,10 +97,6 @@ int32_t tqInitialize(STQ* pTq) { if (tqMetaTransform(pTq) < 0) { return -1; } - pTq->pOffsetStore = tqOffsetOpen(pTq); - if (pTq->pOffsetStore == NULL) { - return -1; - } if (tqMetaRestoreCheckInfo(pTq) < 0) { return -1; From bf1c12af539dd49752588b49d47d40749eaecdd2 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 30 Apr 2024 09:30:36 +0800 Subject: [PATCH 09/10] use tsma when partition by tag --- source/libs/planner/src/planOptimizer.c | 2 +- tests/system-test/2-query/tsma.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 40e120fc13..687a9c6d4c 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -6041,7 +6041,7 @@ static EDealRes tsmaOptTagCheck(SNode* pNode, void* pContext) { STsmaOptTagCheckCtx* pCtx = pContext; for (int32_t i = 0; i < pCtx->pTsma->pTags->size; ++i) { SSchema* pSchema = taosArrayGet(pCtx->pTsma->pTags, i); - if (pSchema->colId == pCol->colId) { + if (strcmp(pSchema->name, pCol->colName) == 0) { found = true; } } diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 38cb1504f6..f80aab0e82 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1064,6 +1064,12 @@ class TDTestCase: .should_query_with_tsma('tsma2', '2018-09-17 09:30:00', '2018-09-17 09:59:59.999') .should_query_with_table('meters', '2018-09-17 10:00:00.000', '2018-09-17 10:23:19.664').get_qc()) + sql = f"SELECT avg(c1), avg(c2) FROM {db_name}.meters WHERE ts >= '2018-09-17 09:00:00.009' AND ts < '2018-09-17 10:23:19.665' PARTITION BY t5 INTERVAL(30m)" + ctxs.append(TSMAQCBuilder().with_sql(sql) + .should_query_with_table('meters', '2018-09-17 09:00:00.009', '2018-09-17 09:29:59.999') + .should_query_with_tsma('tsma2', '2018-09-17 09:30:00', '2018-09-17 09:59:59.999') + .should_query_with_table('meters', '2018-09-17 10:00:00.000', '2018-09-17 10:23:19.664').get_qc()) + sql = f"select avg(c1), avg(c2) from {db_name}.meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m, 25m) SLIDING(10m)" ctxs.append(TSMAQCBuilder().with_sql(sql) .should_query_with_table('meters', '2018-09-17 09:00:00.009', '2018-09-17 09:04:59.999') From 0da5cfea818a4edbfd45e0819e5cd64f580cefdc Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 30 Apr 2024 09:44:34 +0800 Subject: [PATCH 10/10] tsma user manual --- docs/en/12-taos-sql/27-indexing.md | 2 ++ docs/en/14-reference/12-config/index.md | 6 +++--- docs/zh/12-taos-sql/27-indexing.md | 2 ++ docs/zh/14-reference/12-config/index.md | 4 ++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/en/12-taos-sql/27-indexing.md b/docs/en/12-taos-sql/27-indexing.md index 9f77e21599..b76ec5776b 100644 --- a/docs/en/12-taos-sql/27-indexing.md +++ b/docs/en/12-taos-sql/27-indexing.md @@ -56,6 +56,8 @@ If there are other TSMA created based on the TSMA being deleted, the delete oper ## TSMA Calculation The calculation result of TSMA is a super table in the same database as the original table, but it is not visible to users. It cannot be deleted and will be automatically deleted when `DROP TSMA` is executed. The calculation of TSMA is done through stream computing, which is a background asynchronous process. The calculation result of TSMA is not guaranteed to be real-time, but it can guarantee eventual correctness. +If there is no data in the original subtable, the corresponding output subtable may not be created. Therefore, in count queries, even if `countAlwaysReturnValue` is configured, the result of this subtable will not be returned. + When there is a large amount of historical data, after creating TSMA, the stream computing will first calculate the historical data. During this period, newly created TSMA will not be used. The calculation will be automatically recalculated when data updates, deletions, or expired data arrive. During the recalculation period, the TSMA query results are not guaranteed to be real-time. If you want to query real-time data, you can use the hint `/*+ skip_tsma() */` in the SQL statement or disable the `querySmaOptimize` parameter to query from the original data. ## Using and Limitations of TSMA diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md index a130bca65f..fd6d3a2930 100755 --- a/docs/en/14-reference/12-config/index.md +++ b/docs/en/14-reference/12-config/index.md @@ -206,11 +206,11 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo | Attribute | Description | | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| Applicable | Server only | +| Applicable | Server and Client | | Meaning | count()/hyperloglog() return value or not if the input data is empty or NULL | -| Vlue Range | 0: Return empty line, 1: Return 0 | +| Value Range | 0: Return empty line, 1: Return 0 | | Default | 1 | -| Notes | When this parameter is setting to 1, for queries containing GROUP BY, PARTITION BY and INTERVAL clause, and input data in certain groups or windows is empty or NULL, the corresponding groups or windows have no return values | +| Notes | When this parameter is setting to 1, for queries containing INTERVAL clause or the queries using TSMA, and input data in certain groups or windows is empty or NULL, the corresponding groups or windows have no return values. Server and client use the same value| ### maxNumOfDistinctRes diff --git a/docs/zh/12-taos-sql/27-indexing.md b/docs/zh/12-taos-sql/27-indexing.md index 189042c27a..02605da843 100644 --- a/docs/zh/12-taos-sql/27-indexing.md +++ b/docs/zh/12-taos-sql/27-indexing.md @@ -55,6 +55,8 @@ DROP TSMA [db_name.]tsma_name; ## TSMA的计算 TSMA的计算结果为与原始表相同库下的一张超级表, 此表用户不可见. 不可删除, 在`DROP TSMA`时自动删除. TSMA的计算是通过流计算完成的, 此过程为后台异步过程, TSMA的计算结果不保证实时性, 但可以保证最终正确性. +TSMA计算时若原始子表内没有数据, 则可能不会创建对应的输出子表, 因此在count查询中, 即使配置了`countAlwaysReturnValue`, 也不会返回该表的结果. + 当存在大量历史数据时, 创建TSMA之后, 流计算将会首先计算历史数据, 此期间新创建的TSMA不会被使用. 数据更新删除或者过期数据到来时自动重新计算影响部分数据。 在重新计算期间 TSMA 查询结果不保证实时性。若希望查询实时数据, 可以通过在 SQL 中添加 hint `/*+ skip_tsma() */` 或者关闭参数`querySmaOptimize`从原始数据查询。 ## TSMA的使用与限制 diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index 01aa944d95..f0347f424c 100755 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -224,11 +224,11 @@ taos -C | 属性 | 说明 | | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------- | -| 适用范围 | 仅服务端适用 | +| 适用范围 | 服务端和客户端适用 | | 含义 | count/hyperloglog函数在输入数据为空或者NULL的情况下是否返回值 | | 取值范围 | 0:返回空行,1:返回 0 | | 缺省值 | 1 | -| 补充说明 | 该参数设置为 1 时,如果查询中含有 GROUP BY,PARTITION BY 以及 INTERVAL 子句且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果 | +| 补充说明 | 该参数设置为 1 时,如果查询中含有 INTERVAL 子句或者该查询使用了TSMA时, 且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果. 注意此参数客户端和服务端值应保持一致. | ### multiResultFunctionStarReturnTags