diff --git a/include/util/tsimplehash.h b/include/util/tsimplehash.h index c9df911476..987a9fe2a8 100644 --- a/include/util/tsimplehash.h +++ b/include/util/tsimplehash.h @@ -48,6 +48,13 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn); */ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj); +/** + * set the free function pointer + * @param pHashObj + * @param freeFp + */ +void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp); + int32_t tSimpleHashPrint(const SSHashObj *pHashObj); /** diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 255db3379f..c0d54ea87a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -15,6 +15,7 @@ #include "osDef.h" #include "tsdb.h" +#include "tsimplehash.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -176,14 +177,15 @@ struct STsdbReader { SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; SIOCostSummary cost; - STSchema* pSchema; // the newest version schema - STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times - SDataFReader* pFileReader; // the file reader - SDelFReader* pDelFReader; // the del file reader - SArray* pDelIdx; // del file block index; - SBlockInfoBuf blockInfoBuf; - int32_t step; - STsdbReader* innerReader[2]; + STSchema* pSchema; // the newest version schema + // STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times + SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema + SDataFReader* pFileReader; // the file reader + SDelFReader* pDelFReader; // the del file reader + SArray* pDelIdx; // del file block index; + SBlockInfoBuf blockInfoBuf; + int32_t step; + STsdbReader* innerReader[2]; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); @@ -1887,28 +1889,23 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* return pReader->pSchema; } - if (pReader->pMemSchema == NULL) { - int32_t code = - metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } else { - return pReader->pMemSchema; - } + void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion)); + if (p != NULL) { + return *(STSchema**)p; } - if (pReader->pMemSchema->version == sversion) { - return pReader->pMemSchema; - } - - taosMemoryFreeClear(pReader->pMemSchema); - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema); - if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) { + STSchema* ptr = NULL; + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr); + if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; } else { - return pReader->pMemSchema; + code = tSimpleHashPut(pReader->pSchemaMap, &sversion, sizeof(sversion), &ptr, POINTER_BYTES); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + return ptr; } } @@ -3743,8 +3740,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); - int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema); + int32_t code = tsdbRowMergerInit2(&merge, pSchema, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3755,7 +3753,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - tsdbRowMerge(&merge, pRow); + pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + tsdbRowMergerAdd(&merge, pRow, pSchema); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); if (code != TSDB_CODE_SUCCESS) { @@ -4084,6 +4083,11 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { return code; } +static void freeSchemaFunc(void* param) { + void* p = *(void**)param; + taosMemoryFree(p); +} + // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) { @@ -4160,6 +4164,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } } + pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); + if (pReader->pSchemaMap == NULL) { + tsdbError("failed init schema hash for reader %s", pReader->idStr); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc); if (pReader->pSchema != NULL) { code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); if (code != TSDB_CODE_SUCCESS) { @@ -4202,7 +4214,7 @@ void tsdbReaderClose(STsdbReader* pReader) { p->status.uidList.tableUidList = NULL; p->pReadSnap = NULL; p->pSchema = NULL; - p->pMemSchema = NULL; + p->pSchemaMap = NULL; p = pReader->innerReader[1]; @@ -4210,7 +4222,7 @@ void tsdbReaderClose(STsdbReader* pReader) { p->status.uidList.tableUidList = NULL; p->pReadSnap = NULL; p->pSchema = NULL; - p->pMemSchema = NULL; + p->pSchemaMap = NULL; tsdbReaderClose(pReader->innerReader[0]); tsdbReaderClose(pReader->innerReader[1]); @@ -4290,10 +4302,7 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->pSchema); - if (pReader->pMemSchema != pReader->pSchema) { - taosMemoryFree(pReader->pMemSchema); - } - + tSimpleHashCleanup(pReader->pSchemaMap); taosMemoryFreeClear(pReader); } @@ -4453,14 +4462,14 @@ int32_t tsdbReaderResume(STsdbReader* pReader) { pPrevReader->status.pTableMap = pReader->status.pTableMap; pPrevReader->status.uidList = pReader->status.uidList; pPrevReader->pSchema = pReader->pSchema; - pPrevReader->pMemSchema = pReader->pMemSchema; + pPrevReader->pSchemaMap = pReader->pSchemaMap; pPrevReader->pReadSnap = pReader->pReadSnap; pNextReader->capacity = 1; pNextReader->status.pTableMap = pReader->status.pTableMap; pNextReader->status.uidList = pReader->status.uidList; pNextReader->pSchema = pReader->pSchema; - pNextReader->pMemSchema = pReader->pMemSchema; + pNextReader->pSchemaMap = pReader->pSchemaMap; pNextReader->pReadSnap = pReader->pReadSnap; code = doOpenReaderImpl(pPrevReader); @@ -4815,7 +4824,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { return TSDB_CODE_SUCCESS; } - SReaderStatus* pStatus = &pReader->status; + SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; pReader->order = pCond->order; @@ -4836,9 +4845,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { resetDataBlockIterator(pBlockIter, pReader->order); resetTableListIndex(&pReader->status); - bool asc = ASCENDING_TRAVERSE(pReader->order); - int32_t step = asc? 1:-1; - int64_t ts = asc? pReader->window.skey - 1 : pReader->window.ekey + 1; + bool asc = ASCENDING_TRAVERSE(pReader->order); + int32_t step = asc ? 1 : -1; + int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1; resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step); int32_t code = 0; diff --git a/source/util/src/tsimplehash.c b/source/util/src/tsimplehash.c index 70acffed5d..ec1991923f 100644 --- a/source/util/src/tsimplehash.c +++ b/source/util/src/tsimplehash.c @@ -28,19 +28,23 @@ #define HASH_INDEX(v, c) ((v) & ((c)-1)) -#define FREE_HASH_NODE(_n) \ - do { \ - taosMemoryFreeClear(_n); \ +#define FREE_HASH_NODE(_n, fp) \ + do { \ + if (fp) { \ + fp((_n)->data); \ + } \ + taosMemoryFreeClear(_n); \ } while (0); struct SSHashObj { - SHNode **hashList; - size_t capacity; // number of slots - int64_t size; // number of elements in hash table - _hash_fn_t hashFp; // hash function - _equal_fn_t equalFp; // equal function - SArray* pHashNodeBuf;// hash node allocation buffer, 1k size of each page by default - int32_t offset; // allocation offset in current page + SHNode **hashList; + size_t capacity; // number of slots + int64_t size; // number of elements in hash table + _hash_fn_t hashFp; // hash function + _equal_fn_t equalFp; // equal function + _hash_free_fn_t freeFp; // free function + SArray *pHashNodeBuf; // hash node allocation buffer, 1k size of each page by default + int32_t offset; // allocation offset in current page }; static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { @@ -71,7 +75,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) { pHashObj->capacity = taosHashCapacity((int32_t)capacity); pHashObj->equalFp = memcmp; - pHashObj->pHashNodeBuf = taosArrayInit(10, sizeof(void*)); + pHashObj->freeFp = NULL; pHashObj->offset = 0; pHashObj->size = 0; @@ -92,6 +96,10 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) { return (int32_t) pHashObj->size; } +void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp) { + pHashObj->freeFp = freeFp; +} + static void* doInternalAlloc(SSHashObj* pHashObj, int32_t size) { #if 0 void** p = taosArrayGetLast(pHashObj->pHashNodeBuf); @@ -306,7 +314,8 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { } else { pPrev->next = pNode->next; } - FREE_HASH_NODE(pNode); + + FREE_HASH_NODE(pNode, pHashObj->freeFp); pHashObj->size -= 1; code = TSDB_CODE_SUCCESS; break; @@ -341,7 +350,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke *pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL; } - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pNode, pHashObj->freeFp); pHashObj->size -= 1; break; } @@ -370,14 +379,13 @@ void tSimpleHashClear(SSHashObj *pHashObj) { while (pNode) { pNext = pNode->next; - FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pNode, pHashObj->freeFp); pNode = pNext; } pHashObj->hashList[i] = NULL; } - taosArrayClearEx(pHashObj->pHashNodeBuf, destroyItems); pHashObj->offset = 0; pHashObj->size = 0; } @@ -388,7 +396,6 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) { } tSimpleHashClear(pHashObj); - taosArrayDestroy(pHashObj->pHashNodeBuf); taosMemoryFreeClear(pHashObj->hashList); taosMemoryFree(pHashObj); }