Merge pull request #16460 from taosdata/feature/3.0_interval_hash_optimize
enh: use simple hash to save agg result
This commit is contained in:
commit
e0748f43e1
|
@ -22,6 +22,7 @@
|
|||
#include "tbuffer.h"
|
||||
#include "tcommon.h"
|
||||
#include "tpagedbuf.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
#define T_LONG_JMP(_obj, _c) \
|
||||
do { \
|
||||
|
@ -106,7 +107,7 @@ static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPo
|
|||
setBufPageDirty(pPage, true);
|
||||
}
|
||||
|
||||
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
|
||||
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
|
||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||
|
||||
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||
|
|
|
@ -297,10 +297,10 @@ enum {
|
|||
};
|
||||
|
||||
typedef struct SAggSupporter {
|
||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||
char* keyBuf; // window key buffer
|
||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||
SSHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||
char* keyBuf; // window key buffer
|
||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||
} SAggSupporter;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -28,7 +28,7 @@ typedef void (*_hash_free_fn_t)(void *);
|
|||
|
||||
/**
|
||||
* @brief single thread hash
|
||||
*
|
||||
*
|
||||
*/
|
||||
typedef struct SSHashObj SSHashObj;
|
||||
|
||||
|
@ -52,13 +52,13 @@ int32_t tSimpleHashPrint(const SSHashObj *pHashObj);
|
|||
|
||||
/**
|
||||
* @brief put element into hash table, if the element with the same key exists, update it
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param key
|
||||
* @param keyLen
|
||||
* @param data
|
||||
* @param dataLen
|
||||
* @return int32_t
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param key
|
||||
* @param keyLen
|
||||
* @param data
|
||||
* @param dataLen
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen);
|
||||
|
||||
|
@ -80,6 +80,18 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen);
|
|||
*/
|
||||
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen);
|
||||
|
||||
/**
|
||||
* remove item with the specified key during hash iterate
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param key
|
||||
* @param keyLen
|
||||
* @param pIter
|
||||
* @param iter
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter);
|
||||
|
||||
/**
|
||||
* Clear the hash table.
|
||||
* @param pHashObj
|
||||
|
@ -99,13 +111,27 @@ void tSimpleHashCleanup(SSHashObj *pHashObj);
|
|||
*/
|
||||
size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
|
||||
|
||||
#pragma pack(push, 4)
|
||||
typedef struct SHNode{
|
||||
struct SHNode *next;
|
||||
uint32_t keyLen : 20;
|
||||
uint32_t dataLen : 12;
|
||||
char data[];
|
||||
} SHNode;
|
||||
#pragma pack(pop)
|
||||
|
||||
/**
|
||||
* Get the corresponding key information for a given data in hash table
|
||||
* @param data
|
||||
* @param keyLen
|
||||
* @return
|
||||
*/
|
||||
void *tSimpleHashGetKey(void *data, size_t* keyLen);
|
||||
static FORCE_INLINE void *tSimpleHashGetKey(void *data, size_t *keyLen) {
|
||||
SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data));
|
||||
if (keyLen) *keyLen = node->keyLen;
|
||||
|
||||
return POINTER_SHIFT(data, node->dataLen);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the hash table iterator
|
||||
|
@ -116,17 +142,6 @@ void *tSimpleHashGetKey(void *data, size_t* keyLen);
|
|||
*/
|
||||
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter);
|
||||
|
||||
/**
|
||||
* Create the hash table iterator
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param data
|
||||
* @param key
|
||||
* @param iter
|
||||
* @return void*
|
||||
*/
|
||||
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -83,7 +83,7 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) {
|
|||
|
||||
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
|
||||
|
||||
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) {
|
||||
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
|
||||
if (pGroupResInfo->pRows != NULL) {
|
||||
taosArrayDestroy(pGroupResInfo->pRows);
|
||||
}
|
||||
|
@ -92,9 +92,10 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
|
|||
void* pData = NULL;
|
||||
pGroupResInfo->pRows = taosArrayInit(10, POINTER_BYTES);
|
||||
|
||||
size_t keyLen = 0;
|
||||
while ((pData = taosHashIterate(pHashmap, pData)) != NULL) {
|
||||
void* key = taosHashGetKey(pData, &keyLen);
|
||||
size_t keyLen = 0;
|
||||
int32_t iter = 0;
|
||||
while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
|
||||
void* key = tSimpleHashGetKey(pData, &keyLen);
|
||||
|
||||
SResKeyPos* p = taosMemoryMalloc(keyLen + sizeof(SResultRowPosition));
|
||||
|
||||
|
|
|
@ -234,7 +234,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
|
@ -273,7 +273,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
|
||||
// add a new result set for a new group
|
||||
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
|
||||
tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
|
||||
sizeof(SResultRowPosition));
|
||||
}
|
||||
|
||||
|
@ -282,7 +282,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
|||
|
||||
// too many time window in query
|
||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
|
||||
taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
|
||||
tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
||||
}
|
||||
|
||||
|
@ -3011,7 +3011,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
|
|||
}
|
||||
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
|
||||
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
|
||||
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
|
||||
int32_t size = tSimpleHashGetSize(pSup->pResultRowHashTable);
|
||||
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
|
||||
int32_t totalSize =
|
||||
sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
|
||||
|
@ -3038,10 +3038,11 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
|
|||
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pSup->pResultBuf, pPage);
|
||||
|
||||
void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
|
||||
while (pIter) {
|
||||
void* key = taosHashGetKey(pIter, &keyLen);
|
||||
|
||||
int32_t iter = 0;
|
||||
void* pIter = NULL;
|
||||
while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
|
||||
void* key = tSimpleHashGetKey(pIter, &keyLen);
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)pIter;
|
||||
|
||||
pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
|
||||
|
@ -3072,8 +3073,6 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
|
|||
offset += sizeof(int32_t);
|
||||
memcpy(*result + offset, pRow, pSup->resultRowSize);
|
||||
offset += pSup->resultRowSize;
|
||||
|
||||
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
|
||||
}
|
||||
|
||||
*(int32_t*)(*result) = offset;
|
||||
|
@ -3108,7 +3107,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
|
|||
|
||||
// add a new result set for a new group
|
||||
SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
|
||||
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
|
||||
tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
|
||||
|
||||
offset += keyLen;
|
||||
int32_t valueLen = *(int32_t*)(result + offset);
|
||||
|
@ -3452,7 +3451,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
|
|||
|
||||
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
||||
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
|
||||
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
||||
pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
|
||||
|
||||
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -3479,7 +3478,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
|
|||
|
||||
void cleanupAggSup(SAggSupporter* pAggSup) {
|
||||
taosMemoryFreeClear(pAggSup->keyBuf);
|
||||
taosHashCleanup(pAggSup->pResultRowHashTable);
|
||||
tSimpleHashCleanup(pAggSup->pResultRowHashTable);
|
||||
destroyDiskbasedBuf(pAggSup->pResultBuf);
|
||||
}
|
||||
|
||||
|
|
|
@ -178,8 +178,8 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
|
|||
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
|
||||
GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf,
|
||||
GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
|
||||
|
||||
if (p1 == NULL) {
|
||||
return NULL;
|
||||
|
|
|
@ -1399,7 +1399,7 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t
|
|||
int32_t numOfOutput) {
|
||||
SET_RES_WINDOW_KEY(pAggSup->keyBuf, pData, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
(SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
if (!p1) {
|
||||
// window has been closed
|
||||
return false;
|
||||
|
@ -1412,14 +1412,14 @@ bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId)
|
|||
size_t bytes = sizeof(TSKEY);
|
||||
SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
(SResultRowPosition*)tSimpleHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
if (!p1) {
|
||||
// window has been closed
|
||||
return false;
|
||||
}
|
||||
// SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId);
|
||||
// dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage);
|
||||
taosHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
tSimpleHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1469,11 +1469,13 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
|
||||
void* pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
||||
void* key = taosHashGetKey(pIte, &keyLen);
|
||||
static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
|
||||
|
||||
void* pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
||||
uint64_t groupId = *(uint64_t*)key;
|
||||
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
|
||||
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
||||
|
@ -1486,14 +1488,15 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
||||
static int32_t closeIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
||||
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages,
|
||||
SDiskbasedBuf* pDiscBuf) {
|
||||
qDebug("===stream===close interval window");
|
||||
void* pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
||||
void* key = taosHashGetKey(pIte, &keyLen);
|
||||
void* pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
||||
uint64_t groupId = *(uint64_t*)key;
|
||||
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
|
||||
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
||||
|
@ -1531,7 +1534,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
|||
}
|
||||
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
|
||||
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
|
||||
taosHashRemove(pHashMap, keyBuf, keyLen);
|
||||
tSimpleHashIterateRemove(pHashMap, keyBuf, keyLen, &pIte, &iter);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2874,7 +2877,7 @@ bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) {
|
|||
int32_t bytes = sizeof(TSKEY);
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
return p1 != NULL;
|
||||
}
|
||||
|
||||
|
@ -2915,7 +2918,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
|
|||
|
||||
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId);
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
|
||||
GET_RES_WINDOW_KEY_LEN(sizeof(int64_t)));
|
||||
return p1 == NULL;
|
||||
}
|
||||
|
@ -3044,7 +3047,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
|||
}
|
||||
|
||||
static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) {
|
||||
taosHashClear(pInfo->aggSup.pResultRowHashTable);
|
||||
tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
|
||||
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
}
|
||||
|
@ -4945,14 +4948,14 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui
|
|||
SExprSupp* pSup = &pOperatorInfo->exprSupp;
|
||||
|
||||
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId);
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
|
||||
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
ASSERT(p1 != NULL);
|
||||
|
||||
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs,
|
||||
pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
|
||||
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
|
||||
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -4975,7 +4978,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
|||
|
||||
// there is an result exists
|
||||
if (miaInfo->curTs != INT64_MIN) {
|
||||
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
|
||||
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
|
||||
|
||||
if (ts != miaInfo->curTs) {
|
||||
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
|
||||
|
@ -4983,7 +4986,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
|||
}
|
||||
} else {
|
||||
miaInfo->curTs = ts;
|
||||
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
|
||||
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0);
|
||||
}
|
||||
|
||||
STimeWindow win = {0};
|
||||
|
@ -5059,7 +5062,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
|||
if (pBlock == NULL) {
|
||||
// close last unfinalized time window
|
||||
if (miaInfo->curTs != INT64_MIN) {
|
||||
ASSERT(taosHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
|
||||
ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1);
|
||||
outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs);
|
||||
miaInfo->curTs = INT64_MIN;
|
||||
}
|
||||
|
@ -5240,12 +5243,12 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table
|
|||
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
|
||||
|
||||
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
|
||||
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf,
|
||||
GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
ASSERT(p1 != NULL);
|
||||
finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo,
|
||||
pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo);
|
||||
taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -31,21 +31,12 @@
|
|||
taosMemoryFreeClear(_n); \
|
||||
} while (0);
|
||||
|
||||
#pragma pack(push, 4)
|
||||
typedef struct SHNode {
|
||||
struct SHNode *next;
|
||||
uint32_t keyLen : 20;
|
||||
uint32_t dataLen : 12;
|
||||
char data[];
|
||||
} SHNode;
|
||||
#pragma pack(pop)
|
||||
|
||||
struct SSHashObj {
|
||||
SHNode **hashList;
|
||||
size_t capacity; // number of slots
|
||||
int64_t size; // number of elements in hash table
|
||||
_hash_fn_t hashFp; // hash function
|
||||
_equal_fn_t equalFp; // equal function
|
||||
int64_t size; // number of elements in hash table
|
||||
_hash_fn_t hashFp; // hash function
|
||||
_equal_fn_t equalFp; // equal function
|
||||
};
|
||||
|
||||
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
|
||||
|
@ -76,7 +67,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
|
|||
pHashObj->hashFp = fn;
|
||||
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
|
||||
|
||||
|
||||
pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
|
||||
if (!pHashObj->hashList) {
|
||||
taosMemoryFree(pHashObj);
|
||||
|
@ -285,6 +275,44 @@ int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t keyLen, void **pIter, int32_t *iter) {
|
||||
if (!pHashObj || !key) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
|
||||
SHNode *pNode = pHashObj->hashList[slot];
|
||||
SHNode *pPrev = NULL;
|
||||
while (pNode) {
|
||||
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
|
||||
if (!pPrev) {
|
||||
pHashObj->hashList[slot] = pNode->next;
|
||||
} else {
|
||||
pPrev->next = pNode->next;
|
||||
}
|
||||
|
||||
if (*pIter == (void *)GET_SHASH_NODE_DATA(pNode)) {
|
||||
if (!pPrev) {
|
||||
*pIter = NULL;
|
||||
} else {
|
||||
*pIter = GET_SHASH_NODE_DATA(pPrev);
|
||||
}
|
||||
}
|
||||
|
||||
FREE_HASH_NODE(pNode);
|
||||
atomic_sub_fetch_64(&pHashObj->size, 1);
|
||||
break;
|
||||
}
|
||||
pPrev = pNode;
|
||||
pNode = pNode->next;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void tSimpleHashClear(SSHashObj *pHashObj) {
|
||||
if (!pHashObj || taosHashTableEmpty(pHashObj)) {
|
||||
return;
|
||||
|
@ -302,6 +330,7 @@ void tSimpleHashClear(SSHashObj *pHashObj) {
|
|||
FREE_HASH_NODE(pNode);
|
||||
pNode = pNext;
|
||||
}
|
||||
pHashObj->hashList[i] = NULL;
|
||||
}
|
||||
atomic_store_64(&pHashObj->size, 0);
|
||||
}
|
||||
|
@ -324,15 +353,6 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
|
|||
return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj);
|
||||
}
|
||||
|
||||
void *tSimpleHashGetKey(void *data, size_t *keyLen) {
|
||||
SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data));
|
||||
if (keyLen) {
|
||||
*keyLen = node->keyLen;
|
||||
}
|
||||
|
||||
return POINTER_SHIFT(data, node->dataLen);
|
||||
}
|
||||
|
||||
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
|
||||
if (!pHashObj) {
|
||||
return NULL;
|
||||
|
@ -341,7 +361,7 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
|
|||
SHNode *pNode = NULL;
|
||||
|
||||
if (!data) {
|
||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||
for (int32_t i = *iter; i < pHashObj->capacity; ++i) {
|
||||
pNode = pHashObj->hashList[i];
|
||||
if (!pNode) {
|
||||
continue;
|
||||
|
@ -368,52 +388,5 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
|
|||
return GET_SHASH_NODE_DATA(pNode);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter) {
|
||||
if (!pHashObj) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SHNode *pNode = NULL;
|
||||
|
||||
if (!data) {
|
||||
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||
pNode = pHashObj->hashList[i];
|
||||
if (!pNode) {
|
||||
continue;
|
||||
}
|
||||
*iter = i;
|
||||
if (key) {
|
||||
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
|
||||
}
|
||||
return GET_SHASH_NODE_DATA(pNode);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pNode = (SHNode *)((char *)data - offsetof(SHNode, data));
|
||||
|
||||
if (pNode->next) {
|
||||
if (key) {
|
||||
*key = GET_SHASH_NODE_KEY(pNode->next, pNode->next->dataLen);
|
||||
}
|
||||
return GET_SHASH_NODE_DATA(pNode->next);
|
||||
}
|
||||
|
||||
++(*iter);
|
||||
for (int32_t i = *iter; i < pHashObj->capacity; ++i) {
|
||||
pNode = pHashObj->hashList[i];
|
||||
if (!pNode) {
|
||||
continue;
|
||||
}
|
||||
*iter = i;
|
||||
if (key) {
|
||||
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
|
||||
}
|
||||
return GET_SHASH_NODE_DATA(pNode);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
|
@ -30,7 +30,7 @@
|
|||
// return RUN_ALL_TESTS();
|
||||
// }
|
||||
|
||||
TEST(testCase, tSimpleHashTest) {
|
||||
TEST(testCase, tSimpleHashTest_intKey) {
|
||||
SSHashObj *pHashObj =
|
||||
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
|
||||
|
@ -57,12 +57,14 @@ TEST(testCase, tSimpleHashTest) {
|
|||
int32_t iter = 0;
|
||||
int64_t keySum = 0;
|
||||
int64_t dataSum = 0;
|
||||
size_t kLen = 0;
|
||||
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
|
||||
void *key = tSimpleHashGetKey(data, NULL);
|
||||
void *key = tSimpleHashGetKey(data, &kLen);
|
||||
ASSERT_EQ(keyLen, kLen);
|
||||
keySum += *(int64_t *)key;
|
||||
dataSum += *(int64_t *)data;
|
||||
}
|
||||
|
||||
|
||||
ASSERT_EQ(keySum, dataSum);
|
||||
ASSERT_EQ(keySum, originKeySum);
|
||||
|
||||
|
@ -74,4 +76,69 @@ TEST(testCase, tSimpleHashTest) {
|
|||
tSimpleHashCleanup(pHashObj);
|
||||
}
|
||||
|
||||
|
||||
TEST(testCase, tSimpleHashTest_binaryKey) {
|
||||
SSHashObj *pHashObj =
|
||||
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
|
||||
assert(pHashObj != nullptr);
|
||||
|
||||
ASSERT_EQ(0, tSimpleHashGetSize(pHashObj));
|
||||
|
||||
typedef struct {
|
||||
int64_t suid;
|
||||
int64_t uid;
|
||||
} SCombineKey;
|
||||
|
||||
size_t keyLen = sizeof(SCombineKey);
|
||||
size_t dataLen = sizeof(int64_t);
|
||||
|
||||
int64_t originDataSum = 0;
|
||||
SCombineKey combineKey = {0};
|
||||
for (int64_t i = 1; i <= 100; ++i) {
|
||||
combineKey.suid = i;
|
||||
combineKey.uid = i + 1;
|
||||
tSimpleHashPut(pHashObj, (const void *)&combineKey, keyLen, (const void *)&i, dataLen);
|
||||
originDataSum += i;
|
||||
ASSERT_EQ(i, tSimpleHashGetSize(pHashObj));
|
||||
}
|
||||
|
||||
for (int64_t i = 1; i <= 100; ++i) {
|
||||
combineKey.suid = i;
|
||||
combineKey.uid = i + 1;
|
||||
void *data = tSimpleHashGet(pHashObj, (const void *)&combineKey, keyLen);
|
||||
ASSERT_EQ(i, *(int64_t *)data);
|
||||
}
|
||||
|
||||
void *data = NULL;
|
||||
int32_t iter = 0;
|
||||
int64_t keySum = 0;
|
||||
int64_t dataSum = 0;
|
||||
size_t kLen = 0;
|
||||
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
|
||||
void *key = tSimpleHashGetKey(data, &kLen);
|
||||
ASSERT_EQ(keyLen, kLen);
|
||||
dataSum += *(int64_t *)data;
|
||||
}
|
||||
|
||||
ASSERT_EQ(originDataSum, dataSum);
|
||||
|
||||
tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen);
|
||||
|
||||
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
|
||||
void *key = tSimpleHashGetKey(data, &kLen);
|
||||
ASSERT_EQ(keyLen, kLen);
|
||||
}
|
||||
|
||||
for (int64_t i = 1; i <= 99; ++i) {
|
||||
combineKey.suid = i;
|
||||
combineKey.uid = i + 1;
|
||||
tSimpleHashRemove(pHashObj, (const void *)&combineKey, keyLen);
|
||||
ASSERT_EQ(99 - i, tSimpleHashGetSize(pHashObj));
|
||||
}
|
||||
|
||||
tSimpleHashCleanup(pHashObj);
|
||||
}
|
||||
|
||||
|
||||
#pragma GCC diagnostic pop
|
Loading…
Reference in New Issue