From a16b010e9ff18f857dec685e057a70dd2d878700 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 26 Oct 2022 11:35:15 +0800 Subject: [PATCH 1/3] fix: memory leak --- source/common/src/tmsg.c | 11 +++++++++-- source/dnode/vnode/src/tq/tq.c | 13 +++---------- source/libs/wal/src/walRef.c | 4 ++++ source/libs/wal/src/walSeek.c | 4 ++++ 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index aea689c0de..a001126b21 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5988,7 +5988,11 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { if (pRsp->withSchema) { SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper)); if (pSW == NULL) return -1; - if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) { + taosMemoryFree(pSW); + return -1; + } + taosArrayPush(pRsp->blockSchema, &pSW); } @@ -6069,7 +6073,10 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { if (pRsp->withSchema) { SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper)); if (pSW == NULL) return -1; - if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) { + taosMemoryFree(pSW); + return -1; + } taosArrayPush(pRsp->blockSchema, &pSW); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8bf1522d6c..42a597a305 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -433,16 +433,9 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su } #endif - if (subType == TOPIC_SUB_TYPE__COLUMN) { - pRsp->withSchema = false; - } else { - pRsp->withSchema = true; - pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); - if (pRsp->blockSchema == NULL) { - // TODO free - return -1; - } - } + ASSERT(subType == TOPIC_SUB_TYPE__COLUMN); + pRsp->withSchema = false; + return 0; } diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 5a14bcf962..f9994fd315 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -32,6 +32,7 @@ SWalRef *walOpenRef(SWal *pWal) { return pRef; } +#if 0 void walCloseRef(SWal *pWal, int64_t refId) { SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); if (ppRef == NULL) return; @@ -39,6 +40,7 @@ void walCloseRef(SWal *pWal, int64_t refId) { taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t)); taosMemoryFree(pRef); } +#endif int32_t walRefVer(SWalRef *pRef, int64_t ver) { SWal *pWal = pRef->pWal; @@ -65,10 +67,12 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { return 0; } +#if 0 void walUnrefVer(SWalRef *pRef) { pRef->refId = -1; pRef->refFile = -1; } +#endif SWalRef *walRefCommittedVer(SWal *pWal) { SWalRef *pRef = walOpenRef(pWal); diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 4b75db52b7..2cb6614b01 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -19,6 +19,7 @@ #include "tref.h" #include "walInt.h" +#if 0 static int64_t walSeekWritePos(SWal* pWal, int64_t ver) { int64_t code = 0; @@ -47,6 +48,7 @@ static int64_t walSeekWritePos(SWal* pWal, int64_t ver) { } return 0; } +#endif int walInitWriteFile(SWal* pWal) { TdFilePtr pIdxTFile, pLogTFile; @@ -134,6 +136,7 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) { return fileFirstVer; } +#if 0 int walSeekWriteVer(SWal* pWal, int64_t ver) { int64_t code; if (ver == pWal->vers.lastVer) { @@ -158,3 +161,4 @@ int walSeekWriteVer(SWal* pWal, int64_t ver) { return 0; } +#endif From 8dbe7ac37720f5f512edcc1afb72faaf4dd4cac1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 26 Oct 2022 14:02:36 +0800 Subject: [PATCH 2/3] fix: failed case --- source/client/src/clientTmq.c | 2 +- tests/system-test/7-tmq/tmqConsFromTsdb1.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a4ca1f796b..c18e1e3e40 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1619,7 +1619,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosGetQitem(tmq->qall, (void**)&rspWrapper); if (rspWrapper == NULL) { - tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId); + /*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/ return NULL; } } diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1.py b/tests/system-test/7-tmq/tmqConsFromTsdb1.py index 499f837ccc..d0ab8d4fe3 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1.py @@ -86,7 +86,7 @@ class TDTestCase: 'rowsPerTbl': 10000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 15, + 'pollDelay': 25, 'showMsg': 1, 'showRow': 1, 'snapshot': 1} @@ -157,7 +157,7 @@ class TDTestCase: 'rowsPerTbl': 10000, 'batchNum': 10, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 10, + 'pollDelay': 25, 'showMsg': 1, 'showRow': 1, 'snapshot': 1} From 4b5807c7f020a6bc2c10d146145a39ac9da2fe61 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 26 Oct 2022 15:54:00 +0800 Subject: [PATCH 3/3] fix(tmq): set precision --- examples/c/tmq.c | 4 ++-- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tqExec.c | 15 +++++++++------ source/dnode/vnode/src/tq/tqPush.c | 2 +- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/examples/c/tmq.c b/examples/c/tmq.c index d3fc803c94..1147671ea1 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -47,7 +47,7 @@ static int32_t msg_process(TAOS_RES* msg) { int32_t precision = taos_result_precision(msg); rows++; taos_print_row(buf, row, fields, numOfFields); - printf("row content: %s\n", buf); + printf("precision: %d, row content: %s\n", precision, buf); } return rows; @@ -70,7 +70,7 @@ static int32_t init_env() { taos_free_result(pRes); // create database - pRes = taos_query(pConn, "create database tmqdb"); + pRes = taos_query(pConn, "create database tmqdb precision 'ns'"); if (taos_errno(pRes) != 0) { printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index edd23c80be..ef54adf0d6 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -155,7 +155,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); -int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols); +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 30f6f81aa9..a6e8767a4d 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -15,14 +15,14 @@ #include "tq.h" -int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; pRetrieve->useconds = 0; - pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->precision = precision; pRetrieve->compressed = 0; pRetrieve->completed = 1; pRetrieve->numOfRows = htonl(pBlock->info.rows); @@ -95,7 +95,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs break; } - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); pRsp->blockNum++; if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -174,7 +174,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } } - tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); + tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock), + pTq->pVnode->config.tsdbCfg.precision); pRsp->blockNum++; if (pOffset->type == TMQ_OFFSET__LOG) { continue; @@ -256,7 +257,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp pRsp->createTableNum++; } } - tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock)); + tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), + pTq->pVnode->config.tsdbCfg.precision); blockDataFreeRes(&block); tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); pRsp->blockNum++; @@ -291,7 +293,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp pRsp->createTableNum++; } } - tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock)); + tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock), + pTq->pVnode->config.tsdbCfg.precision); blockDataFreeRes(&block); tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); pRsp->blockNum++; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 35cb305042..12d5b4112b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -270,7 +270,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) break; } - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); pRsp->blockNum++; }