From 4abb6e14112f96596e1213dc79653269a4ef5d6a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 26 Aug 2022 16:52:12 +0800 Subject: [PATCH] enh: use simple hash to save agg result --- source/libs/executor/inc/executil.h | 3 +- source/libs/executor/inc/executorimpl.h | 8 +- source/libs/executor/inc/tsimplehash.h | 55 +++++---- source/libs/executor/src/executil.c | 9 +- source/libs/executor/src/executorimpl.c | 25 ++-- source/libs/executor/src/scanoperator.c | 4 +- source/libs/executor/src/timewindowoperator.c | 53 +++++---- source/libs/executor/src/tsimplehash.c | 111 +++++++----------- .../libs/executor/test/tSimpleHashTests.cpp | 73 +++++++++++- 9 files changed, 199 insertions(+), 142 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index a25933d15e..9e7fcc2227 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -22,6 +22,7 @@ #include "tbuffer.h" #include "tcommon.h" #include "tpagedbuf.h" +#include "tsimplehash.h" #define T_LONG_JMP(_obj, _c) \ do { \ @@ -106,7 +107,7 @@ static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPo setBufPageDirty(pPage, true); } -void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); +void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f059a90c9c..d69888677c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -297,10 +297,10 @@ enum { }; typedef struct SAggSupporter { - SHashObj* pResultRowHashTable; // quick locate the window object for each result - char* keyBuf; // window key buffer - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + SSHashObj* pResultRowHashTable; // quick locate the window object for each result + char* keyBuf; // window key buffer + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; typedef struct { diff --git a/source/libs/executor/inc/tsimplehash.h b/source/libs/executor/inc/tsimplehash.h index 4c5a80e2f1..27191e3b7e 100644 --- a/source/libs/executor/inc/tsimplehash.h +++ b/source/libs/executor/inc/tsimplehash.h @@ -28,7 +28,7 @@ typedef void (*_hash_free_fn_t)(void *); /** * @brief single thread hash - * + * */ typedef struct SSHashObj SSHashObj; @@ -52,13 +52,13 @@ int32_t tSimpleHashPrint(const SSHashObj *pHashObj); /** * @brief put element into hash table, if the element with the same key exists, update it - * - * @param pHashObj - * @param key - * @param keyLen - * @param data - * @param dataLen - * @return int32_t + * + * @param pHashObj + * @param key + * @param keyLen + * @param data + * @param dataLen + * @return int32_t */ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen); @@ -80,6 +80,18 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen); */ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen); +/** + * remove item with the specified key during hash iterate + * + * @param pHashObj + * @param key + * @param keyLen + * @param pIter + * @param iter + * @return int32_t + */ +int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter); + /** * Clear the hash table. * @param pHashObj @@ -99,13 +111,27 @@ void tSimpleHashCleanup(SSHashObj *pHashObj); */ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj); +#pragma pack(push, 4) +typedef struct SHNode{ + struct SHNode *next; + uint32_t keyLen : 20; + uint32_t dataLen : 12; + char data[]; +} SHNode; +#pragma pack(pop) + /** * Get the corresponding key information for a given data in hash table * @param data * @param keyLen * @return */ -void *tSimpleHashGetKey(void *data, size_t* keyLen); +static FORCE_INLINE void *tSimpleHashGetKey(void *data, size_t *keyLen) { + SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data)); + if (keyLen) *keyLen = node->keyLen; + + return POINTER_SHIFT(data, node->dataLen); +} /** * Create the hash table iterator @@ -116,17 +142,6 @@ void *tSimpleHashGetKey(void *data, size_t* keyLen); */ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter); -/** - * Create the hash table iterator - * - * @param pHashObj - * @param data - * @param key - * @param iter - * @return void* - */ -void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter); - #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b89579a017..e0e0f1198c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -83,7 +83,7 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) { static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); } -void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) { +void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) { if (pGroupResInfo->pRows != NULL) { taosArrayDestroy(pGroupResInfo->pRows); } @@ -92,9 +92,10 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int void* pData = NULL; pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES); - size_t keyLen = 0; - while ((pData = taosHashIterate(pHashmap, pData)) != NULL) { - void* key = taosHashGetKey(pData, &keyLen); + size_t keyLen = 0; + int32_t iter = 0; + while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) { + void* key = tSimpleHashGetKey(pData, &keyLen); SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6f4c84f9c0..027a9a67b8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -234,7 +234,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = - (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); SResultRow* pResult = NULL; @@ -273,7 +273,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; - taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, + tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition)); } @@ -282,7 +282,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // too many time window in query if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && - taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { + tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } @@ -3011,7 +3011,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len } SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); - int32_t size = taosHashGetSize(pSup->pResultRowHashTable); + int32_t size = tSimpleHashGetSize(pSup->pResultRowHashTable); size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length int32_t totalSize = sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); @@ -3038,10 +3038,11 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset); setBufPageDirty(pPage, true); releaseBufPage(pSup->pResultBuf, pPage); - - void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL); - while (pIter) { - void* key = taosHashGetKey(pIter, &keyLen); + + int32_t iter = 0; + void* pIter = NULL; + while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) { + void* key = tSimpleHashGetKey(pIter, &keyLen); SResultRowPosition* p1 = (SResultRowPosition*)pIter; pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId); @@ -3072,8 +3073,6 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len offset += sizeof(int32_t); memcpy(*result + offset, pRow, pSup->resultRowSize); offset += pSup->resultRowSize; - - pIter = taosHashIterate(pSup->pResultRowHashTable, pIter); } *(int32_t*)(*result) = offset; @@ -3108,7 +3107,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { // add a new result set for a new group SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset}; - taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); + tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); offset += keyLen; int32_t valueLen = *(int32_t*)(result + offset); @@ -3452,7 +3451,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); - pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); + pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn); if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -3479,7 +3478,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n void cleanupAggSup(SAggSupporter* pAggSup) { taosMemoryFreeClear(pAggSup->keyBuf); - taosHashCleanup(pAggSup->pResultRowHashTable); + tSimpleHashCleanup(pAggSup->pResultRowHashTable); destroyDiskbasedBuf(pAggSup->pResultBuf); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fc36d740a9..ca140718da 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro STableScanInfo* pTableScanInfo = pOperator->info; - SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, - GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, + GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); if (p1 == NULL) { return NULL; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ebafa91046..f96c33169c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1380,7 +1380,7 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t int32_t numOfOutput) { SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = - (SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + (SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); if (!p1) { // window has been closed return false; @@ -1393,14 +1393,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) size_t bytes = sizeof(TSKEY); SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId); SResultRowPosition* p1 = - (SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + (SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); if (!p1) { // window has been closed return false; } // SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId); // dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage); - taosHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); return true; } @@ -1450,11 +1450,13 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* } } -static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) { - void* pIte = NULL; - size_t keyLen = 0; - while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { - void* key = taosHashGetKey(pIte, &keyLen); +static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) { + + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { + void* key = tSimpleHashGetKey(pIte, &keyLen); uint64_t groupId = *(uint64_t*)key; ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); @@ -1467,14 +1469,15 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) { return TSDB_CODE_SUCCESS; } -static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, +static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages, SDiskbasedBuf* pDiscBuf) { qDebug("===stream===close interval window"); - void* pIte = NULL; - size_t keyLen = 0; - while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { - void* key = taosHashGetKey(pIte, &keyLen); + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) { + void* key = tSimpleHashGetKey(pIte, &keyLen); uint64_t groupId = *(uint64_t*)key; ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))); TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t)); @@ -1512,7 +1515,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, } char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); - taosHashRemove(pHashMap, keyBuf, keyLen); + tSimpleHashIterateRemove(pHashMap, keyBuf, keyLen, &pIte, &iter); } } return TSDB_CODE_SUCCESS; @@ -2855,7 +2858,7 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) { int32_t bytes = sizeof(TSKEY); SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId); SResultRowPosition* p1 = - (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); return p1 != NULL; } @@ -2896,7 +2899,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); - SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); return p1 == NULL; } @@ -3025,7 +3028,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc } static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) { - taosHashClear(pInfo->aggSup.pResultRowHashTable); + tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); initResultRowInfo(&pInfo->binfo.resultRowInfo); } @@ -4926,14 +4929,14 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui SExprSupp* pSup = &pOperatorInfo->exprSupp; SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); ASSERT(p1 != NULL); finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs, pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); - taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); + tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); return TSDB_CODE_SUCCESS; } @@ -4956,7 +4959,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR // there is an result exists if (miaInfo->curTs != INT64_MIN) { - ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); + ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); if (ts != miaInfo->curTs) { outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); @@ -4964,7 +4967,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } } else { miaInfo->curTs = ts; - ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); + ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); } STimeWindow win = {0}; @@ -5040,7 +5043,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { // close last unfinalized time window if (miaInfo->curTs != INT64_MIN) { - ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); + ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); miaInfo->curTs = INT64_MIN; } @@ -5221,12 +5224,12 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, + SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); ASSERT(p1 != NULL); finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); - taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index 6b2edf0d5e..8dce53273b 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -31,21 +31,12 @@ taosMemoryFreeClear(_n); \ } while (0); -#pragma pack(push, 4) -typedef struct SHNode { - struct SHNode *next; - uint32_t keyLen : 20; - uint32_t dataLen : 12; - char data[]; -} SHNode; -#pragma pack(pop) - struct SSHashObj { SHNode **hashList; size_t capacity; // number of slots - int64_t size; // number of elements in hash table - _hash_fn_t hashFp; // hash function - _equal_fn_t equalFp; // equal function + int64_t size; // number of elements in hash table + _hash_fn_t hashFp; // hash function + _equal_fn_t equalFp; // equal function }; static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { @@ -76,7 +67,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) { pHashObj->hashFp = fn; ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); - pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *)); if (!pHashObj->hashList) { taosMemoryFree(pHashObj); @@ -285,6 +275,43 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) { return TSDB_CODE_SUCCESS; } +int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter) { + if (!pHashObj || !key) { + return TSDB_CODE_FAILED; + } + + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); + + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + + SHNode *pNode = pHashObj->hashList[slot]; + SHNode *pPrev = NULL; + while (pNode) { + if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { + if (!pPrev) { + pHashObj->hashList[slot] = pNode->next; + } else { + pPrev->next = pNode->next; + } + + if (pNode->next) { + *pIter = GET_SHASH_NODE_DATA(pNode->next); + } else { + *pIter = NULL; + ++(*iter); + } + + FREE_HASH_NODE(pNode); + atomic_sub_fetch_64(&pHashObj->size, 1); + break; + } + pPrev = pNode; + pNode = pNode->next; + } + + return TSDB_CODE_SUCCESS; +} + void tSimpleHashClear(SSHashObj *pHashObj) { if (!pHashObj || taosHashTableEmpty(pHashObj)) { return; @@ -324,15 +351,6 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj); } -void *tSimpleHashGetKey(void *data, size_t *keyLen) { - SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data)); - if (keyLen) { - *keyLen = node->keyLen; - } - - return POINTER_SHIFT(data, node->dataLen); -} - void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { if (!pHashObj) { return NULL; @@ -341,7 +359,7 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { SHNode *pNode = NULL; if (!data) { - for (int32_t i = 0; i < pHashObj->capacity; ++i) { + for (int32_t i = *iter; i < pHashObj->capacity; ++i) { pNode = pHashObj->hashList[i]; if (!pNode) { continue; @@ -368,52 +386,5 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { return GET_SHASH_NODE_DATA(pNode); } - return NULL; -} - -void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter) { - if (!pHashObj) { - return NULL; - } - - SHNode *pNode = NULL; - - if (!data) { - for (int32_t i = 0; i < pHashObj->capacity; ++i) { - pNode = pHashObj->hashList[i]; - if (!pNode) { - continue; - } - *iter = i; - if (key) { - *key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen); - } - return GET_SHASH_NODE_DATA(pNode); - } - return NULL; - } - - pNode = (SHNode *)((char *)data - offsetof(SHNode, data)); - - if (pNode->next) { - if (key) { - *key = GET_SHASH_NODE_KEY(pNode->next, pNode->next->dataLen); - } - return GET_SHASH_NODE_DATA(pNode->next); - } - - ++(*iter); - for (int32_t i = *iter; i < pHashObj->capacity; ++i) { - pNode = pHashObj->hashList[i]; - if (!pNode) { - continue; - } - *iter = i; - if (key) { - *key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen); - } - return GET_SHASH_NODE_DATA(pNode); - } - return NULL; } \ No newline at end of file diff --git a/source/libs/executor/test/tSimpleHashTests.cpp b/source/libs/executor/test/tSimpleHashTests.cpp index acb6d434b4..3bf339ef90 100644 --- a/source/libs/executor/test/tSimpleHashTests.cpp +++ b/source/libs/executor/test/tSimpleHashTests.cpp @@ -30,7 +30,7 @@ // return RUN_ALL_TESTS(); // } -TEST(testCase, tSimpleHashTest) { +TEST(testCase, tSimpleHashTest_intKey) { SSHashObj *pHashObj = tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); @@ -57,12 +57,14 @@ TEST(testCase, tSimpleHashTest) { int32_t iter = 0; int64_t keySum = 0; int64_t dataSum = 0; + size_t kLen = 0; while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { - void *key = tSimpleHashGetKey(data, NULL); + void *key = tSimpleHashGetKey(data, &kLen); + ASSERT_EQ(keyLen, kLen); keySum += *(int64_t *)key; dataSum += *(int64_t *)data; } - + ASSERT_EQ(keySum, dataSum); ASSERT_EQ(keySum, originKeySum); @@ -74,4 +76,69 @@ TEST(testCase, tSimpleHashTest) { tSimpleHashCleanup(pHashObj); } + +TEST(testCase, tSimpleHashTest_binaryKey) { + SSHashObj *pHashObj = + tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + + assert(pHashObj != nullptr); + + ASSERT_EQ(0, tSimpleHashGetSize(pHashObj)); + + typedef struct { + int64_t suid; + int64_t uid; + } SCombineKey; + + size_t keyLen = sizeof(SCombineKey); + size_t dataLen = sizeof(int64_t); + + int64_t originDataSum = 0; + SCombineKey combineKey = {0}; + for (int64_t i = 1; i <= 100; ++i) { + combineKey.suid = i; + combineKey.uid = i + 1; + tSimpleHashPut(pHashObj, (const void *)&combineKey, keyLen, (const void *)&i, dataLen); + originDataSum += i; + ASSERT_EQ(i, tSimpleHashGetSize(pHashObj)); + } + + for (int64_t i = 1; i <= 100; ++i) { + combineKey.suid = i; + combineKey.uid = i + 1; + void *data = tSimpleHashGet(pHashObj, (const void *)&combineKey, keyLen); + ASSERT_EQ(i, *(int64_t *)data); + } + + void *data = NULL; + int32_t iter = 0; + int64_t keySum = 0; + int64_t dataSum = 0; + size_t kLen = 0; + while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { + void *key = tSimpleHashGetKey(data, &kLen); + ASSERT_EQ(keyLen, kLen); + dataSum += *(int64_t *)data; + } + + ASSERT_EQ(originDataSum, dataSum); + + tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen); + + while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { + void *key = tSimpleHashGetKey(data, &kLen); + ASSERT_EQ(keyLen, kLen); + } + + for (int64_t i = 1; i <= 99; ++i) { + combineKey.suid = i; + combineKey.uid = i + 1; + tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen); + ASSERT_EQ(99 - i, tSimpleHashGetSize(pHashObj)); + } + + tSimpleHashCleanup(pHashObj); +} + + #pragma GCC diagnostic pop \ No newline at end of file