Merge pull request #20686 from taosdata/fix/liaohj

fix(tsdb/read): use correct scheme for mem & imem merging
This commit is contained in:
Xiaoyu Wang 2023-03-29 15:38:21 +08:00 committed by GitHub
commit b9d905c2d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 55 deletions

View File

@ -48,6 +48,13 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn);
*/ */
int32_t tSimpleHashGetSize(const SSHashObj *pHashObj); int32_t tSimpleHashGetSize(const SSHashObj *pHashObj);
/**
* set the free function pointer
* @param pHashObj
* @param freeFp
*/
void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp);
int32_t tSimpleHashPrint(const SSHashObj *pHashObj); int32_t tSimpleHashPrint(const SSHashObj *pHashObj);
/** /**

View File

@ -15,6 +15,7 @@
#include "osDef.h" #include "osDef.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsimplehash.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
@ -176,14 +177,15 @@ struct STsdbReader {
SBlockLoadSuppInfo suppInfo; SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap; STsdbReadSnap* pReadSnap;
SIOCostSummary cost; SIOCostSummary cost;
STSchema* pSchema; // the newest version schema STSchema* pSchema; // the newest version schema
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times // STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SDataFReader* pFileReader; // the file reader SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDelFReader* pDelFReader; // the del file reader SDataFReader* pFileReader; // the file reader
SArray* pDelIdx; // del file block index; SDelFReader* pDelFReader; // the del file reader
SBlockInfoBuf blockInfoBuf; SArray* pDelIdx; // del file block index;
int32_t step; SBlockInfoBuf blockInfoBuf;
STsdbReader* innerReader[2]; int32_t step;
STsdbReader* innerReader[2];
}; };
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
@ -1887,28 +1889,23 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
return pReader->pSchema; return pReader->pSchema;
} }
if (pReader->pMemSchema == NULL) { void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion));
int32_t code = if (p != NULL) {
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema); return *(STSchema**)p;
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
} else {
return pReader->pMemSchema;
}
} }
if (pReader->pMemSchema->version == sversion) { STSchema* ptr = NULL;
return pReader->pMemSchema; int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
} if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pReader->pMemSchema);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
terrno = code; terrno = code;
return NULL; return NULL;
} else { } else {
return pReader->pMemSchema; code = tSimpleHashPut(pReader->pSchemaMap, &sversion, sizeof(sversion), &ptr, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
return ptr;
} }
} }
@ -3743,8 +3740,9 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
STSchema* piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
int32_t code = tsdbRowMergerInit(&merge, piRow, pSchema); int32_t code = tsdbRowMergerInit2(&merge, pSchema, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -3755,7 +3753,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code; return code;
} }
tsdbRowMerge(&merge, pRow); pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tsdbRowMergerAdd(&merge, pRow, pSchema);
code = code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -4084,6 +4083,11 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
return code; return code;
} }
static void freeSchemaFunc(void* param) {
void* p = *(void**)param;
taosMemoryFree(p);
}
// ====================================== EXPOSED APIs ====================================== // ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) { SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
@ -4160,6 +4164,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
} }
} }
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
if (pReader->pSchemaMap == NULL) {
tsdbError("failed init schema hash for reader %s", pReader->idStr);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc);
if (pReader->pSchema != NULL) { if (pReader->pSchema != NULL) {
code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -4202,7 +4214,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
p->status.uidList.tableUidList = NULL; p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL; p->pReadSnap = NULL;
p->pSchema = NULL; p->pSchema = NULL;
p->pMemSchema = NULL; p->pSchemaMap = NULL;
p = pReader->innerReader[1]; p = pReader->innerReader[1];
@ -4210,7 +4222,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
p->status.uidList.tableUidList = NULL; p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL; p->pReadSnap = NULL;
p->pSchema = NULL; p->pSchema = NULL;
p->pMemSchema = NULL; p->pSchemaMap = NULL;
tsdbReaderClose(pReader->innerReader[0]); tsdbReaderClose(pReader->innerReader[0]);
tsdbReaderClose(pReader->innerReader[1]); tsdbReaderClose(pReader->innerReader[1]);
@ -4290,10 +4302,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema); taosMemoryFree(pReader->pSchema);
if (pReader->pMemSchema != pReader->pSchema) { tSimpleHashCleanup(pReader->pSchemaMap);
taosMemoryFree(pReader->pMemSchema);
}
taosMemoryFreeClear(pReader); taosMemoryFreeClear(pReader);
} }
@ -4453,14 +4462,14 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
pPrevReader->status.pTableMap = pReader->status.pTableMap; pPrevReader->status.pTableMap = pReader->status.pTableMap;
pPrevReader->status.uidList = pReader->status.uidList; pPrevReader->status.uidList = pReader->status.uidList;
pPrevReader->pSchema = pReader->pSchema; pPrevReader->pSchema = pReader->pSchema;
pPrevReader->pMemSchema = pReader->pMemSchema; pPrevReader->pSchemaMap = pReader->pSchemaMap;
pPrevReader->pReadSnap = pReader->pReadSnap; pPrevReader->pReadSnap = pReader->pReadSnap;
pNextReader->capacity = 1; pNextReader->capacity = 1;
pNextReader->status.pTableMap = pReader->status.pTableMap; pNextReader->status.pTableMap = pReader->status.pTableMap;
pNextReader->status.uidList = pReader->status.uidList; pNextReader->status.uidList = pReader->status.uidList;
pNextReader->pSchema = pReader->pSchema; pNextReader->pSchema = pReader->pSchema;
pNextReader->pMemSchema = pReader->pMemSchema; pNextReader->pSchemaMap = pReader->pSchemaMap;
pNextReader->pReadSnap = pReader->pReadSnap; pNextReader->pReadSnap = pReader->pReadSnap;
code = doOpenReaderImpl(pPrevReader); code = doOpenReaderImpl(pPrevReader);
@ -4815,7 +4824,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
pReader->order = pCond->order; pReader->order = pCond->order;
@ -4836,9 +4845,9 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
resetDataBlockIterator(pBlockIter, pReader->order); resetDataBlockIterator(pBlockIter, pReader->order);
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc? 1:-1; int32_t step = asc ? 1 : -1;
int64_t ts = asc? pReader->window.skey - 1 : pReader->window.ekey + 1; int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1;
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step); resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
int32_t code = 0; int32_t code = 0;

View File

@ -28,19 +28,23 @@
#define HASH_INDEX(v, c) ((v) & ((c)-1)) #define HASH_INDEX(v, c) ((v) & ((c)-1))
#define FREE_HASH_NODE(_n) \ #define FREE_HASH_NODE(_n, fp) \
do { \ do { \
taosMemoryFreeClear(_n); \ if (fp) { \
fp((_n)->data); \
} \
taosMemoryFreeClear(_n); \
} while (0); } while (0);
struct SSHashObj { struct SSHashObj {
SHNode **hashList; SHNode **hashList;
size_t capacity; // number of slots size_t capacity; // number of slots
int64_t size; // number of elements in hash table int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function _hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function _equal_fn_t equalFp; // equal function
SArray* pHashNodeBuf;// hash node allocation buffer, 1k size of each page by default _hash_free_fn_t freeFp; // free function
int32_t offset; // allocation offset in current page SArray *pHashNodeBuf; // hash node allocation buffer, 1k size of each page by default
int32_t offset; // allocation offset in current page
}; };
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
@ -71,7 +75,7 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
pHashObj->capacity = taosHashCapacity((int32_t)capacity); pHashObj->capacity = taosHashCapacity((int32_t)capacity);
pHashObj->equalFp = memcmp; pHashObj->equalFp = memcmp;
pHashObj->pHashNodeBuf = taosArrayInit(10, sizeof(void*)); pHashObj->freeFp = NULL;
pHashObj->offset = 0; pHashObj->offset = 0;
pHashObj->size = 0; pHashObj->size = 0;
@ -92,6 +96,10 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
return (int32_t) pHashObj->size; return (int32_t) pHashObj->size;
} }
void tSimpleHashSetFreeFp(SSHashObj* pHashObj, _hash_free_fn_t freeFp) {
pHashObj->freeFp = freeFp;
}
static void* doInternalAlloc(SSHashObj* pHashObj, int32_t size) { static void* doInternalAlloc(SSHashObj* pHashObj, int32_t size) {
#if 0 #if 0
void** p = taosArrayGetLast(pHashObj->pHashNodeBuf); void** p = taosArrayGetLast(pHashObj->pHashNodeBuf);
@ -306,7 +314,8 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
} else { } else {
pPrev->next = pNode->next; pPrev->next = pNode->next;
} }
FREE_HASH_NODE(pNode);
FREE_HASH_NODE(pNode, pHashObj->freeFp);
pHashObj->size -= 1; pHashObj->size -= 1;
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
break; break;
@ -341,7 +350,7 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
*pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL; *pIter = pPrev ? GET_SHASH_NODE_DATA(pPrev) : NULL;
} }
FREE_HASH_NODE(pNode); FREE_HASH_NODE(pNode, pHashObj->freeFp);
pHashObj->size -= 1; pHashObj->size -= 1;
break; break;
} }
@ -370,14 +379,13 @@ void tSimpleHashClear(SSHashObj *pHashObj) {
while (pNode) { while (pNode) {
pNext = pNode->next; pNext = pNode->next;
FREE_HASH_NODE(pNode); FREE_HASH_NODE(pNode, pHashObj->freeFp);
pNode = pNext; pNode = pNext;
} }
pHashObj->hashList[i] = NULL; pHashObj->hashList[i] = NULL;
} }
taosArrayClearEx(pHashObj->pHashNodeBuf, destroyItems);
pHashObj->offset = 0; pHashObj->offset = 0;
pHashObj->size = 0; pHashObj->size = 0;
} }
@ -388,7 +396,6 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) {
} }
tSimpleHashClear(pHashObj); tSimpleHashClear(pHashObj);
taosArrayDestroy(pHashObj->pHashNodeBuf);
taosMemoryFreeClear(pHashObj->hashList); taosMemoryFreeClear(pHashObj->hashList);
taosMemoryFree(pHashObj); taosMemoryFree(pHashObj);
} }