diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index ae2370cd56..d113a43ecf 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1039,7 +1039,7 @@ int tsParseInsertSql(SSqlObj *pSql) { } if (NULL == pCmd->pTableList) { - pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES); if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 5ee3db36d1..9c2454e76c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -646,7 +646,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, 0); int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta); - void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); size_t total = taosArrayGetSize(pTableDataBlockList); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 26efcfeac0..f09541d306 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -814,7 +814,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); } - pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true); + pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true, true); tsSdbObj.numOfTables++; tsSdbObj.tableList[pTable->tableId] = pTable; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 053a1522a7..85afd70b83 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -378,7 +378,7 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt atomic_add_fetch_32(&pStable->numOfTables, 1); if (pStable->vgHash == NULL) { - pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); + pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); } if (pStable->vgHash != NULL) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 66762a1ca5..8afc44f939 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5841,7 +5841,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables; pQInfo->tableqinfoGroupInfo.map = taosHashInit(pTableGroupInfo->numOfTables, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); + taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); } int tableIndex = 0; diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index 93ccad8e27..d2a124aa30 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -34,9 +34,9 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro pResBuf->lruList = tdListNew(POINTER_BYTES); // init id hash table - pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); + pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES - pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); + pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); char path[PATH_MAX] = {0}; getTmpfilePath("qbuf", path); diff --git a/src/query/src/qTokenizer.c b/src/query/src/qTokenizer.c index 0ea0ff7bf3..a22c4d5f65 100644 --- a/src/query/src/qTokenizer.c +++ b/src/query/src/qTokenizer.c @@ -256,7 +256,7 @@ static void* KeywordHashTable = NULL; static void doInitKeywordsTable() { int numOfEntries = tListLen(keywordTable); - KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, false); + KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, false); for (int32_t i = 0; i < numOfEntries; i++) { keywordTable[i].len = strlen(keywordTable[i].name); void* ptr = &keywordTable[i]; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index c9143b3a53..ff3c6b20bb 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -40,7 +40,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun pWindowResInfo->type = type; _hash_fn_t fn = taosGetDefaultHashFunction(type); - pWindowResInfo->hashList = taosHashInit(threshold, fn, false); + pWindowResInfo->hashList = taosHashInit(threshold, fn, true, false); if (pWindowResInfo->hashList == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -107,7 +107,7 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR pWindowResInfo->size = 0; _hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type); - pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, false); + pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, true, false); pWindowResInfo->startTime = TSKEY_INITIAL_VAL; pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 7425558535..4acf95d4f4 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -261,7 +261,7 @@ void *rpcOpen(const SRpcInit *pInit) { } if (pRpc->connType == TAOS_CONN_SERVER) { - pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); + pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true); if (pRpc->hash == NULL) { tError("%s failed to init string hash", pRpc->label); rpcClose(pRpc); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 93c4a9402f..afd765f2c2 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -96,7 +96,7 @@ static void syncModuleInitFunc() { return; } - vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true); + vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); if (vgIdHash == NULL) { taosTmrCleanUp(syncTmrCtrl); taosCloseTcpThreadPool(tsTcpPool); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 09bbbd8f4d..5fc2f3e253 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -436,7 +436,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) { goto _err; } - pMeta->uidMap = taosHashInit(TSDB_INIT_NTABLES * 1.1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + pMeta->uidMap = taosHashInit(TSDB_INIT_NTABLES * 1.1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); if (pMeta->uidMap == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index d3edc89585..3c99a36764 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -44,6 +44,7 @@ typedef struct SHashObj { _hash_fn_t hashFp; // hash function _hash_free_fn_t freeFp; // hash node free callback function + bool enableUpdate; // enable update #if defined(LINUX) pthread_rwlock_t *lock; #else @@ -67,7 +68,7 @@ typedef struct SHashMutableIterator { * @param threadsafe thread safe or not * @return */ -SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe); +SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threadsafe); /** * return the size of hash table @@ -105,7 +106,8 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); */ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen); -void taosHashRemoveNode(); + +void* taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen); /** * clean up hash table diff --git a/src/util/src/hash.c b/src/util/src/hash.c index f59f25d153..e560a9c744 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -163,7 +163,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode); */ static SHashNode *getNextHashNode(SHashMutableIterator *pIter); -SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { +SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threadsafe) { if (capacity == 0 || fn == NULL) { return NULL; } @@ -179,6 +179,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); pHashObj->hashFp = fn; + pHashObj->enableUpdate = update; pHashObj->hashList = (SHashNode **)calloc(pHashObj->capacity, POINTER_BYTES); if (pHashObj->hashList == NULL) { @@ -232,15 +233,21 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da doAddToHashTable(pHashObj, pNewNode); __unlock(pHashObj->lock); + + return 0; } else { - doUpdateHashNode(pNode, pNewNode); + // not support the update operation, return error + if (pHashObj->enableUpdate) { + doUpdateHashNode(pNode, pNewNode); + } + __unlock(pHashObj->lock); tfree(pNewNode->data) tfree(pNewNode); - } - return 0; + return pHashObj->enableUpdate? 0:-1; + } } void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { @@ -288,10 +295,45 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { pNode->next = NULL; pNode->prev = NULL; + tfree(pNode->data); tfree(pNode); + return 0; } +void* taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen) { + uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen); + + __wr_lock(pHashObj->lock); + SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal); + if (pNode == NULL) { + __unlock(pHashObj->lock); + return NULL; + } + + SHashNode *pNext = pNode->next; + if (pNode->prev == NULL) { + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + assert(pHashObj->hashList[slot] == pNode); + + pHashObj->hashList[slot] = pNext; + } else { + pNode->prev->next = pNext; + } + + if (pNext != NULL) { + pNext->prev = pNode->prev; + } + + pHashObj->size -= 1; + __unlock(pHashObj->lock); + + pNode->next = NULL; + pNode->prev = NULL; + + return pNode; +} + void taosHashCleanup(SHashObj *pHashObj) { if (pHashObj == NULL) return; diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 77f3ea2db0..2432e97552 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -207,7 +207,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext return NULL; } - pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), true); + pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false, true); pCacheObj->name = strdup(cacheName); if (pCacheObj->pHashTable == NULL) { free(pCacheObj); @@ -249,45 +249,49 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v return NULL; } - __cache_wr_lock(pCacheObj); - SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); - SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL; - - if (pOld == NULL) { // do addedTime to cache - T_REF_INC(pNode1); - taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *)); - __cache_unlock(pCacheObj); +// __cache_wr_lock(pCacheObj); + T_REF_INC(pNode1); + int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *)); + if (succ == 0) { atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size); - uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64 - "bytes size:%" PRId64 "bytes", + uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 + ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes", pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize); - } else { // old data exists, update the node - bool addToTrashcan = false; - if (T_REF_VAL_GET(pOld) > 0) { - + } else { // duplicated key exists + while (1) { // todo removed by node, instead of by key - int32_t succ = taosHashRemove(pCacheObj->pHashTable, pOld->key, pOld->keySize); - assert(succ == 0); + SHashNode *p = taosHashRemoveNode(pCacheObj->pHashTable, key, keyLen); - addToTrashcan = true; + // add to trashcan + if (p != NULL) { + SCacheDataNode* pCachedNode = *(SCacheDataNode**)p->data; + if (T_REF_VAL_GET(pCachedNode) == 0) { + tfree(pCachedNode); + } else { + taosAddToTrash(pCacheObj, pCachedNode); + uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pCachedNode); + } + } + + assert(T_REF_VAL_GET(pNode1) == 1); + + int32_t ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *)); + if (ret == 0) { + atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size); + + uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 + ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes", + pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, + (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize); + + return pNode1; + + } else { + // failed, try again + } } - - T_REF_INC(pNode1); - - // addedTime new element to hashtable - taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *)); - __cache_unlock(pCacheObj); - - // todo add trashcan lock - if (addToTrashcan) { - taosAddToTrash(pCacheObj, pOld); - } else { - free(pOld); - } - - uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pOld); } return pNode1->data; @@ -415,8 +419,6 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } if (_remove) { -// __cache_wr_lock(pCacheObj); - // NOTE: once refcount is decrease, pNode may be freed by other thread immediately. int32_t ref = T_REF_DEC(pNode); uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref); @@ -429,34 +431,27 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { * that tries to do the same thing. */ if (pNode->inTrashCan) { -// __cache_unlock(pCacheObj); - if (ref == 0) { assert(pNode->pTNodeHeader->pData == pNode); taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader); } } else { int32_t success = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); + if (success) { + if (ref > 0) { + assert(pNode->pTNodeHeader == NULL); - if (ref > 0) { - assert(pNode->pTNodeHeader == NULL); - - // todo trashcan lock - if (success) { + // todo trashcan lock taosAddToTrash(pCacheObj, pNode); + } else { // ref == 0 + atomic_fetch_sub_ptr(&pCacheObj->totalSize, pNode->size); + uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes", + pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), + pCacheObj->totalSize, pNode->size); + + if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); + free(pNode); } -// __cache_unlock(pCacheObj); - } else { -// __cache_unlock(pCacheObj); - -// taosCacheReleaseNode(pCacheObj, pNode); - atomic_fetch_sub_ptr(&pCacheObj->totalSize, pNode->size); - uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes", - pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, - pNode->size); - - if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); - free(pNode); } } diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index d7bf9d7857..602ea8c96d 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -419,7 +419,7 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void pStore->iFunc = iFunc; pStore->aFunc = aFunc; pStore->appH = appH; - pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); + pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); if (pStore->map == NULL) { terrno = TSDB_CODE_COM_OUT_OF_MEMORY; goto _err; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 77d4503d9d..e8919e5fce 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -50,7 +50,7 @@ int32_t vnodeInitResources() { vnodeInitWriteFp(); vnodeInitReadFp(); - tsDnodeVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true); + tsDnodeVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); if (tsDnodeVnodesHash == NULL) { vError("failed to init vnode list"); return TSDB_CODE_VND_OUT_OF_MEMORY;