From f1ff5dcec484f87bd07f03627731ebd492d0a3c4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 28 Mar 2023 22:53:57 +0800 Subject: [PATCH 1/3] fix(query): use hashmap to keep the multiple schema. --- include/util/tsimplehash.h | 7 +++ source/dnode/vnode/src/tsdb/tsdbRead.c | 59 ++++++++++++++------------ source/util/src/tsimplehash.c | 38 ++++++++++------- 3 files changed, 62 insertions(+), 42 deletions(-) diff --git a/include/util/tsimplehash.h b/include/util/tsimplehash.h index c9df911476..987a9fe2a8 100644 --- a/include/util/tsimplehash.h +++ b/include/util/tsimplehash.h @@ -48,6 +48,13 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn); */ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj); +/** + * set the free function pointer + * @param pHashObj + * @param freeFp + */ +void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp); + int32_t tSimpleHashPrint(const SSHashObj *pHashObj); /** diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 89d3239c33..3d8b2a4031 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -15,6 +15,7 @@ #include "osDef.h" #include "tsdb.h" +#include "tsimplehash.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -177,7 +178,8 @@ struct STsdbReader { STsdbReadSnap* pReadSnap; SIOCostSummary cost; STSchema* pSchema; // the newest version schema - STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times +// STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times + SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema SDataFReader* pFileReader; // the file reader SDelFReader* pDelFReader; // the del file reader SArray* pDelIdx; // del file block index; @@ -1858,28 +1860,23 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* return pReader->pSchema; } - if (pReader->pMemSchema == NULL) { - int32_t code = - metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } else { - return pReader->pMemSchema; - } + void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion)); + if (p != NULL) { + return *(STSchema**) p; } - if (pReader->pMemSchema->version == sversion) { - return pReader->pMemSchema; - } - - taosMemoryFreeClear(pReader->pMemSchema); - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema); - if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) { + STSchema* ptr = NULL; + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); + if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; } else { - return pReader->pMemSchema; + code = tSimpleHashPut(pReader->pSchemaMap, &sversion, sizeof(sversion), &ptr, POINTER_BYTES); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + return ptr; } } @@ -4002,6 +3999,11 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { return code; } +static void freeSchemaFunc(void* param) { + void* p = *(void**) param; + taosMemoryFree(p); +} + // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) { @@ -4078,6 +4080,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } } + pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); + if (pReader->pSchemaMap == NULL) { + tsdbError("failed init schema hash for reader", pReader->idStr); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc); if (pReader->pSchema != NULL) { code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); if (code != TSDB_CODE_SUCCESS) { @@ -4120,7 +4130,7 @@ void tsdbReaderClose(STsdbReader* pReader) { p->status.uidList.tableUidList = NULL; p->pReadSnap = NULL; p->pSchema = NULL; - p->pMemSchema = NULL; + p->pSchemaMap = NULL; p = pReader->innerReader[1]; @@ -4128,7 +4138,7 @@ void tsdbReaderClose(STsdbReader* pReader) { p->status.uidList.tableUidList = NULL; p->pReadSnap = NULL; p->pSchema = NULL; - p->pMemSchema = NULL; + p->pSchemaMap = NULL; tsdbReaderClose(pReader->innerReader[0]); tsdbReaderClose(pReader->innerReader[1]); @@ -4208,10 +4218,7 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->pSchema); - if (pReader->pMemSchema != pReader->pSchema) { - taosMemoryFree(pReader->pMemSchema); - } - + tSimpleHashCleanup(pReader->pSchemaMap); taosMemoryFreeClear(pReader); } @@ -4371,14 +4378,14 @@ int32_t tsdbReaderResume(STsdbReader* pReader) { pPrevReader->status.pTableMap = pReader->status.pTableMap; pPrevReader->status.uidList = pReader->status.uidList; pPrevReader->pSchema = pReader->pSchema; - pPrevReader->pMemSchema = pReader->pMemSchema; + pPrevReader->pSchemaMap = pReader->pSchemaMap; pPrevReader->pReadSnap = pReader->pReadSnap; pNextReader->capacity = 1; pNextReader->status.pTableMap = pReader->status.pTableMap; pNextReader->status.uidList = pReader->status.uidList; pNextReader->pSchema = pReader->pSchema; - pNextReader->pMemSchema = pReader->pMemSchema; + pNextReader->pSchemaMap = pReader->pSchemaMap; pNextReader->pReadSnap = pReader->pReadSnap; code = doOpenReaderImpl(pPrevReader); diff --git a/source/util/src/tsimplehash.c b/source/util/src/tsimplehash.c index 70acffed5d..310fdd27ab 100644 --- a/source/util/src/tsimplehash.c +++ b/source/util/src/tsimplehash.c @@ -28,19 +28,23 @@ #define HASH_INDEX(v, c) ((v) & ((c)-1)) -#define FREE_HASH_NODE(_n) \ - do { \ - taosMemoryFreeClear(_n); \ +#define FREE_HASH_NODE(_n, fp) \ + do { \ + if (fp) { \ + fp((_n)->data); \ + } \ + taosMemoryFreeClear(_n); \ } while (0); struct SSHashObj { - SHNode **hashList; - size_t capacity; // number of slots - int64_t size; // number of elements in hash table - _hash_fn_t hashFp; // hash function - _equal_fn_t equalFp; // equal function - SArray* pHashNodeBuf;// hash node allocation buffer, 1k size of each page by default - int32_t offset; // allocation offset in current page + SHNode **hashList; + size_t capacity; // number of slots + int64_t size; // number of elements in hash table + _hash_fn_t hashFp; // hash function + _equal_fn_t equalFp; // equal function + _hash_free_fn_t freeFp; // free function + SArray *pHashNodeBuf; // hash node allocation buffer, 1k size of each page by default + int32_t offset; // allocation offset in current page }; static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { @@ -71,7 +75,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) { pHashObj->capacity = taosHashCapacity((int32_t)capacity); pHashObj->equalFp = memcmp; - pHashObj->pHashNodeBuf = taosArrayInit(10, sizeof(void*)); pHashObj->offset = 0; pHashObj->size = 0; @@ -92,6 +95,10 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) { return (int32_t) pHashObj->size; } +void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp) { + pHashObj->freeFp = freeFp; +} + static void* doInternalAlloc(SSHashObj* pHashObj, int32_t size) { #if 0 void** p = taosArrayGetLast(pHashObj->pHashNodeBuf); @@ -306,7 +313,8 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { } else { pPrev->next = pNode->next; } - FREE_HASH_NODE(pNode); + + FREE_HASH_NODE(pNode, pHashObj->freeFp); pHashObj->size -= 1; code = TSDB_CODE_SUCCESS; break; @@ -341,7 +349,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke *pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL; } - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pNode, pHashObj->freeFp); pHashObj->size -= 1; break; } @@ -370,14 +378,13 @@ void tSimpleHashClear(SSHashObj *pHashObj) { while (pNode) { pNext = pNode->next; - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pNode, pHashObj->freeFp); pNode = pNext; } pHashObj->hashList[i] = NULL; } - taosArrayClearEx(pHashObj->pHashNodeBuf, destroyItems); pHashObj->offset = 0; pHashObj->size = 0; } @@ -388,7 +395,6 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) { } tSimpleHashClear(pHashObj); - taosArrayDestroy(pHashObj->pHashNodeBuf); taosMemoryFreeClear(pHashObj->hashList); taosMemoryFree(pHashObj); } From 7a050d64bdc6573201d26af6b3046495358f0e5e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Mar 2023 10:27:05 +0800 Subject: [PATCH 2/3] fix(query): set ptr to be NULL. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 5 +++-- source/util/src/tsimplehash.c | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 3d8b2a4031..d9f6b66d6a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3673,7 +3673,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p TSDBKEY ik = TSDBROW_KEY(piRow); if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema); if (code != TSDB_CODE_SUCCESS) { @@ -3686,7 +3686,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - tsdbRowMerge(&merge, pRow); + pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + tsdbRowMergerAdd(&merge, pRow, pSchema); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/util/src/tsimplehash.c b/source/util/src/tsimplehash.c index 310fdd27ab..ec1991923f 100644 --- a/source/util/src/tsimplehash.c +++ b/source/util/src/tsimplehash.c @@ -75,6 +75,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) { pHashObj->capacity = taosHashCapacity((int32_t)capacity); pHashObj->equalFp = memcmp; + pHashObj->freeFp = NULL; pHashObj->offset = 0; pHashObj->size = 0; From 8cf8ac84d71e06f674147f99e0ff03acc6c2ef9d Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 29 Mar 2023 14:19:15 +0800 Subject: [PATCH 3/3] fix(tsdb/read): use correct scheme for mem & imem merging --- source/dnode/vnode/src/tsdb/tsdbRead.c | 37 +++++++++++++------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 3d8b2a4031..1906ae1abd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -177,15 +177,15 @@ struct STsdbReader { SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; SIOCostSummary cost; - STSchema* pSchema; // the newest version schema -// STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times - SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema - SDataFReader* pFileReader; // the file reader - SDelFReader* pDelFReader; // the del file reader - SArray* pDelIdx; // del file block index; - SBlockInfoBuf blockInfoBuf; - int32_t step; - STsdbReader* innerReader[2]; + STSchema* pSchema; // the newest version schema + // STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times + SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema + SDataFReader* pFileReader; // the file reader + SDelFReader* pDelFReader; // the del file reader + SArray* pDelIdx; // del file block index; + SBlockInfoBuf blockInfoBuf; + int32_t step; + STsdbReader* innerReader[2]; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); @@ -1862,11 +1862,11 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion)); if (p != NULL) { - return *(STSchema**) p; + return *(STSchema**)p; } STSchema* ptr = NULL; - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -3674,8 +3674,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); - int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema); + int32_t code = tsdbRowMergerInit2(&merge, pSchema, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4000,7 +4001,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } static void freeSchemaFunc(void* param) { - void* p = *(void**) param; + void* p = *(void**)param; taosMemoryFree(p); } @@ -4082,7 +4083,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); if (pReader->pSchemaMap == NULL) { - tsdbError("failed init schema hash for reader", pReader->idStr); + tsdbError("failed init schema hash for reader %s", pReader->idStr); code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -4710,7 +4711,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { return TSDB_CODE_SUCCESS; } - SReaderStatus* pStatus = &pReader->status; + SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; pReader->order = pCond->order; @@ -4731,9 +4732,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { resetDataBlockIterator(pBlockIter, pReader->order); resetTableListIndex(&pReader->status); - bool asc = ASCENDING_TRAVERSE(pReader->order); - int32_t step = asc? 1:-1; - int64_t ts = asc? pReader->window.skey - 1 : pReader->window.ekey + 1; + bool asc = ASCENDING_TRAVERSE(pReader->order); + int32_t step = asc ? 1 : -1; + int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1; resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step); int32_t code = 0;