diff --git a/include/os/os.h b/include/os/os.h index f020af5a65..a58e798d38 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -44,6 +44,7 @@ extern "C" { #include #include #include +#include #include #include #include diff --git a/include/util/thash.h b/include/util/thash.h index 5c344f3f0f..017cc8696f 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -28,11 +28,6 @@ typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len); typedef void (*_hash_before_fn_t)(void *); typedef void (*_hash_free_fn_t)(void *); -#define HASH_MAX_CAPACITY (1024 * 1024 * 16) -#define HASH_DEFAULT_LOAD_FACTOR (0.75) - -#define HASH_INDEX(v, c) ((v) & ((c)-1)) - #define HASH_NODE_EXIST(code) (code == -2) /** @@ -62,41 +57,17 @@ typedef struct SHashNode { uint32_t hashVal; // the hash value of key uint32_t dataLen; // length of data uint32_t keyLen; // length of the key - uint16_t count; // reference count + uint16_t refCount; // reference count int8_t removed; // flag to indicate removed char data[]; } SHashNode; -#define GET_HASH_NODE_KEY(_n) ((char *)(_n) + sizeof(SHashNode) + (_n)->dataLen) -#define GET_HASH_NODE_DATA(_n) ((char *)(_n) + sizeof(SHashNode)) -#define GET_HASH_PNODE(_n) ((SHashNode *)((char *)(_n) - sizeof(SHashNode))) - typedef enum SHashLockTypeE { HASH_NO_LOCK = 0, HASH_ENTRY_LOCK = 1, } SHashLockTypeE; -typedef struct SHashEntry { - int32_t num; // number of elements in current entry - SRWLatch latch; // entry latch - SHashNode *next; -} SHashEntry; - -typedef struct SHashObj { - SHashEntry **hashList; - uint32_t capacity; // number of slots - uint32_t size; // number of elements in hash table - - _hash_fn_t hashFp; // hash function - _hash_free_fn_t freeFp; // hash node free callback function - _equal_fn_t equalFp; // equal function - _hash_before_fn_t callbackFp; // function invoked before return the value to caller - - SRWLatch lock; // read-write spin lock - SHashLockTypeE type; // lock type - bool enableUpdate; // enable update - SArray *pMemBlock; // memory block allocated for SHashEntry -} SHashObj; +typedef struct SHashObj SHashObj; /** * init the hash table @@ -126,8 +97,6 @@ int32_t taosHashGetSize(const SHashObj *pHashObj); */ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size); -int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded); - /** * return the payload data with the specified key * @@ -146,17 +115,18 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); * @param destBuf * @return */ -void *taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf); +int32_t taosHashGetDup(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf); /** - * Clone the result to interval allocated buffer + * * @param pHashObj * @param key * @param keyLen * @param destBuf + * @param size * @return */ -void *taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void **d, size_t *sz); +int32_t taosHashGetDup_m(SHashObj* pHashObj, const void* key, size_t keyLen, void** destBuf, int32_t* size); /** * remove item with the specified key @@ -207,37 +177,13 @@ void *taosHashIterate(SHashObj *pHashObj, void *p); */ void taosHashCancelIterate(SHashObj *pHashObj, void *p); -/** - * Get the corresponding key information for a given data in hash table - * @param data - * @return - */ -int32_t taosHashGetKey(void *data, void **key, size_t *keyLen); - -/** - * Get the corresponding key information for a given data in hash table, using memcpy - * @param data - * @param dst - * @return - */ -static FORCE_INLINE int32_t taosHashCopyKey(void *data, void *dst) { - if (NULL == data || NULL == dst) { - return -1; - } - - SHashNode *node = GET_HASH_PNODE(data); - void *key = GET_HASH_NODE_KEY(node); - memcpy(dst, key, node->keyLen); - - return 0; -} - -/** - * Get the corresponding data length for a given data in hash table - * @param data - * @return - */ -int32_t taosHashGetDataLen(void *data); + /** + * Get the corresponding key information for a given data in hash table + * @param data + * @param keyLen + * @return + */ +void *taosHashGetKey(void *data, size_t* keyLen); /** * return the payload data with the specified key(reference number added) @@ -258,8 +204,20 @@ void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen); */ void taosHashRelease(SHashObj *pHashObj, void *p); +/** + * + * @param pHashObj + * @param fp + */ void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp); +/** + * + * @param pHashObj + * @param fp + */ +void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp); + #ifdef __cplusplus } #endif diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index ce9a57c2c3..acaff759b7 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -53,7 +53,7 @@ typedef struct SDiskbasedBufStatis { * @param handle * @return */ -int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); +int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, const char* dir); /** * diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 45c1858948..9413f748eb 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -482,7 +482,8 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { free(pAppHbMgr); return NULL; } - pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq; + + taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq); // init getInfoFunc pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index f20493aa7f..ce7cbe0977 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -85,7 +85,7 @@ static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); - taosHashGetClone(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); + taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; } else { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2b6caa2f8f..945a4101b1 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -162,8 +162,7 @@ void ctgDbgShowDBCache(SHashObj *dbHash) { size_t len = 0; dbCache = (SCtgDBCache *)pIter; - - taosHashGetKey(dbCache, (void **)&dbFName, &len); + dbFName = taosHashGetKey(dbCache, &len); CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId); @@ -532,9 +531,9 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable return TSDB_CODE_SUCCESS; } - size_t sz = 0; + int32_t sz = 0; CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); - STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz); + int32_t code = taosHashGetDup_m(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), (void **)pTableMeta, &sz); CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); if (NULL == *pTableMeta) { @@ -545,8 +544,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable } *exist = 1; - - tbMeta = *pTableMeta; + STableMeta* tbMeta = *pTableMeta; if (tbMeta->tableType != TSDB_CHILD_TABLE) { ctgReleaseDBCache(pCtg, dbCache); @@ -1110,7 +1108,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) { void *pIter = taosHashIterate(cache->stbCache, NULL); while (pIter) { uint64_t *suid = NULL; - taosHashGetKey(pIter, (void **)&suid, NULL); + suid = taosHashGetKey(pIter, NULL); if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionCompare)) { ctgDebug("stb removed from rent, suid:%"PRIx64, *suid); @@ -1305,7 +1303,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) { CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); CTG_UNLOCK(CTG_READ, &tbCache->metaLock); - ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid); + ctgError("taosHashPut stable to stable cache failed, suid:%"PRIx64, meta->suid); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -1343,7 +1341,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { int32_t *vgId = NULL; void *pIter = taosHashIterate(src->vgHash, NULL); while (pIter) { - taosHashGetKey(pIter, (void **)&vgId, NULL); + vgId = taosHashGetKey(pIter, NULL); if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) { qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize); @@ -2296,7 +2294,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList)); } else { int32_t vgId = tbMeta->vgId; - if (NULL == taosHashGetClone(vgHash, &vgId, sizeof(vgId), &vgroupInfo)) { + if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) { ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c400ded1c1..a8109e7363 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4619,7 +4619,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize); int32_t TENMB = 1024*1024*10; - int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId, "/tmp"); + int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, "", "/tmp"); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 1aa0c04ec4..06c58430a4 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -255,7 +255,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, resetSlotInfo(pBucket); - int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, "/tmp"); + int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", "/tmp"); if (ret != 0) { tMemBucketDestroy(pBucket); return NULL; diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 745982e869..4b58bc4e69 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -153,7 +153,7 @@ static int32_t buildOutput(SInsertParseContext* pCxt) { if (NULL == dst) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - taosHashGetClone(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); + taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg); dst->numOfTables = src->numOfTables; dst->size = src->size; TSWAP(dst->pData, src->pData, char*); diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 64d822f750..560e5348c2 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -305,8 +305,9 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen return NULL; } + // TODO remove it SCacheDataNode *ptNode = NULL; - taosHashGetClone(pCacheObj->pHashTable, key, keyLen, &ptNode); + ptNode = taosHashAcquire(pCacheObj->pHashTable, key, keyLen); // taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode); void *pData = (ptNode != NULL) ? ptNode->data : NULL; @@ -535,7 +536,7 @@ static bool travHashTableEmptyFn(void *param, void *data) { void taosCacheEmpty(SCacheObj *pCacheObj) { SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; - // taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); +// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); taosTrashcanEmpty(pCacheObj, false); } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 219fc739ca..05bc94caef 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -15,69 +15,127 @@ #define _DEFAULT_SOURCE #include "thash.h" -#include "tdef.h" +#include "taoserror.h" +#include "os.h" #include "tlog.h" // the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT -#define MAX_WARNING_REF_COUNT 10000 -#define EXT_SIZE 1024 -#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) +#define MAX_WARNING_REF_COUNT 10000 +#define EXT_SIZE 1024 +#define HASH_MAX_CAPACITY (1024 * 1024 * 16) +#define HASH_DEFAULT_LOAD_FACTOR (0.75) +#define HASH_INDEX(v, c) ((v) & ((c)-1)) -#define DO_FREE_HASH_NODE(_n) \ - do { \ - tfree(_n); \ - } while (0) +#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) -#define FREE_HASH_NODE(_h, _n) \ - do { \ - if ((_h)->freeFp) { \ - (_h)->freeFp(GET_HASH_NODE_DATA(_n)); \ - } \ - \ - DO_FREE_HASH_NODE(_n); \ +#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen) +#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode)) +#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode))) + +#define FREE_HASH_NODE(_n) \ + do { \ + tfree(_n); \ } while (0); -static FORCE_INLINE void __wr_lock(void *lock, int32_t type) { - if (type == HASH_NO_LOCK) { +typedef struct SHashEntry { + int32_t num; // number of elements in current entry + SRWLatch latch; // entry latch + SHashNode *next; +} SHashEntry; + +typedef struct SHashObj { + SHashEntry **hashList; + size_t capacity; // number of slots + size_t size; // number of elements in hash table + _hash_fn_t hashFp; // hash function + _equal_fn_t equalFp; // equal function + _hash_free_fn_t freeFp; // hash node free callback function + SRWLatch lock; // read-write spin lock + SHashLockTypeE type; // lock type + bool enableUpdate; // enable update + SArray *pMemBlock; // memory block allocated for SHashEntry + _hash_before_fn_t callbackFp; // function invoked before return the value to caller +} SHashObj; + +/* + * Function definition + */ +static FORCE_INLINE void taosHashWLock(SHashObj *pHashObj) { + if (pHashObj->type == HASH_NO_LOCK) { return; } - taosWLockLatch(lock); + taosWLockLatch(&pHashObj->lock); } -static FORCE_INLINE void __rd_lock(void *lock, int32_t type) { - if (type == HASH_NO_LOCK) { +static FORCE_INLINE void taosHashWUnlock(SHashObj *pHashObj) { + if (pHashObj->type == HASH_NO_LOCK) { return; } - taosRLockLatch(lock); + + taosWUnLockLatch(&pHashObj->lock); } -static FORCE_INLINE void __rd_unlock(void *lock, int32_t type) { - if (type == HASH_NO_LOCK) { +static FORCE_INLINE void taosHashRLock(SHashObj *pHashObj) { + if (pHashObj->type == HASH_NO_LOCK) { return; } - taosRUnLockLatch(lock); + + taosRLockLatch(&pHashObj->lock); } -static FORCE_INLINE void __wr_unlock(void *lock, int32_t type) { - if (type == HASH_NO_LOCK) { +static FORCE_INLINE void taosHashRUnlock(SHashObj *pHashObj) { + if (pHashObj->type == HASH_NO_LOCK) { return; } - taosWUnLockLatch(lock); + + taosRUnLockLatch(&pHashObj->lock); +} + +static FORCE_INLINE void taosHashEntryWLock(const SHashObj *pHashObj, SHashEntry* pe) { + if (pHashObj->type == HASH_NO_LOCK) { + return; + } + taosWLockLatch(&pe->latch); +} + +static FORCE_INLINE void taosHashEntryWUnlock(const SHashObj *pHashObj, SHashEntry* pe) { + if (pHashObj->type == HASH_NO_LOCK) { + return; + } + + taosWUnLockLatch(&pe->latch); +} + +static FORCE_INLINE void taosHashEntryRLock(const SHashObj *pHashObj, SHashEntry* pe) { + if (pHashObj->type == HASH_NO_LOCK) { + return; + } + + taosRLockLatch(&pe->latch); +} + +static FORCE_INLINE void taosHashEntryRUnlock(const SHashObj *pHashObj, SHashEntry* pe) { + if (pHashObj->type == HASH_NO_LOCK) { + return; + } + + taosRUnLockLatch(&pe->latch); } static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { - int32_t len = TMIN(length, HASH_MAX_CAPACITY); + int32_t len = MIN(length, HASH_MAX_CAPACITY); int32_t i = 4; while (i < len) i = (i << 1u); return i; } -static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen, - uint32_t hashVal) { +static FORCE_INLINE SHashNode * +doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) { SHashNode *pNode = pe->next; while (pNode) { - if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && + if ((pNode->keyLen == keyLen) && + ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) { assert(pNode->hashVal == hashVal); break; @@ -90,60 +148,57 @@ static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntr } /** - * Resize the hash list if the threshold is reached + * resize the hash list if the threshold is reached * * @param pHashObj */ static void taosHashTableResize(SHashObj *pHashObj); /** + * allocate and initialize a hash node + * * @param key key of object for hash, usually a null-terminated string * @param keyLen length of key - * @param pData actually data. Requires a consecutive memory block, no pointer is allowed in pData. - * Pointer copy causes memory access error. + * @param pData data to be stored in hash node * @param dsize size of data * @return SHashNode */ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal); /** - * Update the hash node + * update the hash node * - * @param pNode hash node - * @param key key for generate hash value - * @param keyLen key length - * @param pData actual data - * @param dsize size of actual data - * @return hash node + * @param pHashObj hash table object + * @param pe hash table entry to operate on + * @param prev previous node + * @param pNode the old node with requested key + * @param pNewNode the new node with requested key */ -static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry *pe, SHashNode *prev, SHashNode *pNode, - SHashNode *pNewNode) { +static FORCE_INLINE void doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) { assert(pNode->keyLen == pNewNode->keyLen); - pNode->count--; + atomic_sub_fetch_32(&pNode->refCount, 1); if (prev != NULL) { prev->next = pNewNode; } else { pe->next = pNewNode; } - if (pNode->count <= 0) { + if (pNode->refCount <= 0) { pNewNode->next = pNode->next; - DO_FREE_HASH_NODE(pNode); + FREE_HASH_NODE(pNode); } else { pNewNode->next = pNode; pe->num++; - atomic_add_fetch_32(&pHashObj->size, 1); + atomic_add_fetch_64(&pHashObj->size, 1); } - - return pNewNode; } /** * insert the hash node at the front of the linked list * - * @param pHashObj - * @param pNode + * @param pHashObj hash table object + * @param pNode the old node with requested key */ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode); @@ -156,47 +211,70 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode); static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj); /** - * Get the next element in hash table for iterator - * @param pIter - * @return + * initialize a hash table + * + * @param capacity initial capacity of the hash table + * @param fn hash function + * @param update whether the hash table allows in place update + * @param type whether the hash table has per entry lock + * @return hash table object */ - SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) { - assert(fn != NULL); + if (fn == NULL) { + assert(0); + return NULL; + } + if (capacity == 0) { capacity = 4; } SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj)); if (pHashObj == NULL) { - uError("failed to allocate memory, reason:%s", strerror(errno)); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } // the max slots is not defined by user pHashObj->capacity = taosHashCapacity((int32_t)capacity); - assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); + pHashObj->equalFp = memcmp; - pHashObj->hashFp = fn; + pHashObj->hashFp = fn; pHashObj->type = type; pHashObj->enableUpdate = update; + ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); + pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(void *)); if (pHashObj->hashList == NULL) { free(pHashObj); - uError("failed to allocate memory, reason:%s", strerror(errno)); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; - } else { - pHashObj->pMemBlock = taosArrayInit(8, sizeof(void *)); - - void *p = calloc(pHashObj->capacity, sizeof(SHashEntry)); - for (int32_t i = 0; i < pHashObj->capacity; ++i) { - pHashObj->hashList[i] = (void *)((char *)p + i * sizeof(SHashEntry)); - } - - taosArrayPush(pHashObj->pMemBlock, &p); } + pHashObj->pMemBlock = taosArrayInit(8, sizeof(void *)); + if (pHashObj->pMemBlock == NULL) { + free(pHashObj->hashList); + free(pHashObj); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *p = calloc(pHashObj->capacity, sizeof(SHashEntry)); + if (p == NULL) { + taosArrayDestroy(pHashObj->pMemBlock); + free(pHashObj->hashList); + free(pHashObj); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < pHashObj->capacity; ++i) { + pHashObj->hashList[i] = (void *)((char *)p + i * sizeof(SHashEntry)); + } + + taosArrayPush(pHashObj->pMemBlock, &p); + return pHashObj; } @@ -206,16 +284,28 @@ void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp) { } } -int32_t taosHashGetSize(const SHashObj *pHashObj) { - if (!pHashObj) { - return 0; +void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp) { + if (pHashObj != NULL && fp != NULL) { + pHashObj->freeFp = fp; } - return (int32_t)atomic_load_32(&pHashObj->size); } -static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { return taosHashGetSize(pHashObj) == 0; } +int32_t taosHashGetSize(const SHashObj *pHashObj) { + if (pHashObj == NULL) { + return 0; + } + return (int32_t)atomic_load_64(&pHashObj->size); +} + +static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { + return taosHashGetSize(pHashObj) == 0; +} + +int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { + if (pHashObj == NULL || key == NULL || keyLen == 0 || data == NULL || size == 0) { + return -1; + } -int32_t taosHashPutImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) { uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); if (pNewNode == NULL) { @@ -224,19 +314,17 @@ int32_t taosHashPutImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void // need the resize process, write lock applied if (HASH_NEED_RESIZE(pHashObj)) { - __wr_lock((void *)&pHashObj->lock, pHashObj->type); + taosHashWLock(pHashObj); taosHashTableResize(pHashObj); - __wr_unlock((void *)&pHashObj->lock, pHashObj->type); + taosHashWUnlock(pHashObj); } - __rd_lock((void *)&pHashObj->lock, pHashObj->type); + taosHashRLock(pHashObj); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWLockLatch(&pe->latch); - } + taosHashEntryWLock(pHashObj, pe); SHashNode *pNode = pe->next; if (pe->num > 0) { @@ -245,9 +333,10 @@ int32_t taosHashPutImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void assert(pNode == NULL); } - SHashNode *prev = NULL; + SHashNode* prev = NULL; while (pNode) { - if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && + if ((pNode->keyLen == keyLen) && + (*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 && pNode->removed == 0) { assert(pNode->hashVal == hashVal); break; @@ -260,24 +349,13 @@ int32_t taosHashPutImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void if (pNode == NULL) { // no data in hash table with the specified key, add it into hash table pushfrontNodeInEntryList(pe, pNewNode); + assert(pe->next != NULL); - if (pe->num == 0) { - assert(pe->next == NULL); - } else { - assert(pe->next != NULL); - } - - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWUnLockLatch(&pe->latch); - } + taosHashEntryWUnlock(pHashObj, pe); // enable resize - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); - atomic_add_fetch_32(&pHashObj->size, 1); - - if (newAdded) { - *newAdded = true; - } + taosHashRUnlock(pHashObj); + atomic_add_fetch_64(&pHashObj->size, 1); return 0; } else { @@ -285,131 +363,67 @@ int32_t taosHashPutImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void if (pHashObj->enableUpdate) { doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); } else { - DO_FREE_HASH_NODE(pNewNode); + FREE_HASH_NODE(pNewNode); } - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWUnLockLatch(&pe->latch); - } + taosHashEntryWUnlock(pHashObj, pe); // enable resize - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); + taosHashRUnlock(pHashObj); - if (newAdded) { - *newAdded = false; - } - - return pHashObj->enableUpdate ? 0 : -2; + return pHashObj->enableUpdate ? 0 : -1; } } -int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { - return taosHashPutImpl(pHashObj, key, keyLen, data, size, NULL); -} - -int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) { - return taosHashPutImpl(pHashObj, key, keyLen, data, size, newAdded); -} +static void* taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void** d, int32_t* size, bool addRef); void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { - return taosHashGetClone(pHashObj, key, keyLen, NULL); + void* p = NULL; + return taosHashGetImpl(pHashObj, key, keyLen, &p, 0, false); } -// TODO(yihaoDeng), merge with taosHashGetClone -void *taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void **d, - size_t *sz) { - if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { +int32_t taosHashGetDup(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf) { + terrno = 0; + /*char* p = */taosHashGetImpl(pHashObj, key, keyLen, &destBuf, 0, false); + return terrno; +} + +int32_t taosHashGetDup_m(SHashObj *pHashObj, const void *key, size_t keyLen, void **destBuf, int32_t* size) { + terrno = 0; + + /*char* p = */taosHashGetImpl(pHashObj, key, keyLen, destBuf, size, false); + return terrno; +} + +void* taosHashGetImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void** d, int32_t* size, bool addRef) { + if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { return NULL; } uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // only add the read lock to disable the resize process - __rd_lock((void *)&pHashObj->lock, pHashObj->type); + taosHashRLock(pHashObj); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; // no data, return directly if (atomic_load_32(&pe->num) == 0) { - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); + taosHashRUnlock(pHashObj); return NULL; } char *data = NULL; + taosHashEntryRLock(pHashObj, pe); - // lock entry - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosRLockLatch(&pe->latch); - } - - if (pe->num > 0) { - assert(pe->next != NULL); - } else { - assert(pe->next == NULL); - } - - SHashNode *pNode = doSearchInEntryList(pHashObj, pe, key, keyLen, hashVal); - if (pNode != NULL) { - if (fp != NULL) { - fp(GET_HASH_NODE_DATA(pNode)); - } - - if (*d == NULL) { - *sz = pNode->dataLen + EXT_SIZE; - *d = calloc(1, *sz); - } else if (*sz < pNode->dataLen) { - *sz = pNode->dataLen + EXT_SIZE; - *d = realloc(*d, *sz); - } - memcpy((char *)(*d), GET_HASH_NODE_DATA(pNode), pNode->dataLen); - // just make runtime happy - if ((*sz) - pNode->dataLen > 0) { - memset((char *)(*d) + pNode->dataLen, 0, (*sz) - pNode->dataLen); - } - - data = GET_HASH_NODE_DATA(pNode); - } - - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosRUnLockLatch(&pe->latch); - } - - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); - return data; -} - -void *taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void *d, bool acquire) { - if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { - return NULL; - } - - uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); - - // only add the read lock to disable the resize process - __rd_lock((void *)&pHashObj->lock, pHashObj->type); - - int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); - SHashEntry *pe = pHashObj->hashList[slot]; - - // no data, return directly - if (atomic_load_32(&pe->num) == 0) { - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); - return NULL; - } - - char *data = NULL; - - // lock entry - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosRLockLatch(&pe->latch); - } - +#if 0 if (pe->num > 0) { assert(pe->next != NULL); } else { assert(pe->next == NULL); } +#endif SHashNode *pNode = doSearchInEntryList(pHashObj, pe, key, keyLen, hashVal); if (pNode != NULL) { @@ -417,108 +431,115 @@ void *taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v pHashObj->callbackFp(GET_HASH_NODE_DATA(pNode)); } - if (d != NULL) { - memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen); + if (size != NULL) { + if (*d == NULL) { + *size = pNode->dataLen; + *d = calloc(1, *size); + if (*d == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } else if (*size < pNode->dataLen) { + *size = pNode->dataLen; + char* tmp = realloc(*d, *size); + if (tmp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + *d = tmp; + } } - if (acquire) { - atomic_add_fetch_16(&pNode->count, 1); + if (addRef) { + atomic_add_fetch_16(&pNode->refCount, 1); + } + + if (*d != NULL) { + memcpy(*d, GET_HASH_NODE_DATA(pNode), pNode->dataLen); } data = GET_HASH_NODE_DATA(pNode); } - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosRUnLockLatch(&pe->latch); - } + taosHashEntryRUnlock(pHashObj, pe); + taosHashRUnlock(pHashObj); - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); return data; } -void *taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void *d) { - return taosHashGetCloneImpl(pHashObj, key, keyLen, d, false); -} - -void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) { - return taosHashGetCloneImpl(pHashObj, key, keyLen, NULL, true); -} - -int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen /*, void *data, size_t dsize*/) { - if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) { +int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t dsize) { + if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || key == NULL || keyLen == 0) { return -1; } uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); // disable the resize process - __rd_lock((void *)&pHashObj->lock, pHashObj->type); + taosHashRLock(pHashObj); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWLockLatch(&pe->latch); - } + taosHashEntryWLock(pHashObj, pe); // double check after locked if (pe->num == 0) { assert(pe->next == NULL); - taosWUnLockLatch(&pe->latch); - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); + taosHashEntryWUnlock(pHashObj, pe); + taosHashRUnlock(pHashObj); return -1; } - int32_t code = -1; + int code = -1; SHashNode *pNode = pe->next; SHashNode *prevNode = NULL; while (pNode) { - if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && - pNode->removed == 0) - break; + if ((pNode->keyLen == keyLen) && + ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && + pNode->removed == 0) { + code = 0; // it is found - prevNode = pNode; - pNode = pNode->next; - } + atomic_sub_fetch_32(&pNode->refCount, 1); + pNode->removed = 1; + if (pNode->refCount <= 0) { + if (prevNode == NULL) { + pe->next = pNode->next; + } else { + prevNode->next = pNode->next; + } - if (pNode) { - code = 0; // it is found + if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize); - pNode->count--; - pNode->removed = 1; - if (pNode->count <= 0) { - if (prevNode) { - prevNode->next = pNode->next; - } else { - pe->next = pNode->next; + pe->num--; + atomic_sub_fetch_64(&pHashObj->size, 1); + FREE_HASH_NODE(pNode); } - - // if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize); - - pe->num--; - atomic_sub_fetch_32(&pHashObj->size, 1); - FREE_HASH_NODE(pHashObj, pNode); + } else { + prevNode = pNode; + pNode = pNode->next; } } - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWUnLockLatch(&pe->latch); - } - - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); + taosHashEntryWUnlock(pHashObj, pe); + taosHashRUnlock(pHashObj); return code; } -int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) { - if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) { - return 0; +int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { + return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0); +} + +void taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) { + if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || fp == NULL) { + return; } // disable the resize process - __rd_lock((void *)&pHashObj->lock, pHashObj->type); + taosHashRLock(pHashObj); int32_t numOfEntries = (int32_t)pHashObj->capacity; for (int32_t i = 0; i < numOfEntries; ++i) { @@ -527,63 +548,32 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi continue; } - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWLockLatch(&pEntry->latch); - } + taosHashEntryWLock(pHashObj, pEntry); - // todo remove the first node - SHashNode *pNode = NULL; - while ((pNode = pEntry->next) != NULL) { - if (fp && (!fp(param, GET_HASH_NODE_DATA(pNode)))) { - pEntry->num -= 1; - atomic_sub_fetch_32(&pHashObj->size, 1); - - pEntry->next = pNode->next; - - if (pEntry->num == 0) { - assert(pEntry->next == NULL); - } else { - assert(pEntry->next != NULL); - } - - FREE_HASH_NODE(pHashObj, pNode); + SHashNode *pPrevNode = NULL; + SHashNode *pNode = pEntry->next; + while (pNode != NULL) { + if (fp(param, GET_HASH_NODE_DATA(pNode))) { + pPrevNode = pNode; + pNode = pNode->next; } else { - break; - } - } - - // handle the following node - if (pNode != NULL) { - assert(pNode == pEntry->next); - SHashNode *pNext = NULL; - - while ((pNext = pNode->next) != NULL) { - // not qualified, remove it - if (fp && (!fp(param, GET_HASH_NODE_DATA(pNext)))) { - pNode->next = pNext->next; - pEntry->num -= 1; - atomic_sub_fetch_32(&pHashObj->size, 1); - - if (pEntry->num == 0) { - assert(pEntry->next == NULL); - } else { - assert(pEntry->next != NULL); - } - - FREE_HASH_NODE(pHashObj, pNext); + if (pPrevNode == NULL) { + pEntry->next = pNode->next; } else { - pNode = pNext; + pPrevNode->next = pNode->next; } + pEntry->num -= 1; + atomic_sub_fetch_64(&pHashObj->size, 1); + SHashNode *next = pNode->next; + FREE_HASH_NODE(pNode); + pNode = next; } } - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWUnLockLatch(&pEntry->latch); - } + taosHashEntryWUnlock(pHashObj, pEntry); } - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); - return 0; + taosHashRUnlock(pHashObj); } void taosHashClear(SHashObj *pHashObj) { @@ -593,12 +583,12 @@ void taosHashClear(SHashObj *pHashObj) { SHashNode *pNode, *pNext; - __wr_lock((void *)&pHashObj->lock, pHashObj->type); + taosHashWLock(pHashObj); for (int32_t i = 0; i < pHashObj->capacity; ++i) { SHashEntry *pEntry = pHashObj->hashList[i]; if (pEntry->num == 0) { - assert(pEntry->next == 0); + assert(pEntry->next == NULL); continue; } @@ -607,7 +597,7 @@ void taosHashClear(SHashObj *pHashObj) { while (pNode) { pNext = pNode->next; - FREE_HASH_NODE(pHashObj, pNode); + FREE_HASH_NODE(pNode); pNode = pNext; } @@ -616,10 +606,11 @@ void taosHashClear(SHashObj *pHashObj) { pEntry->next = NULL; } - atomic_store_32(&pHashObj->size, 0); - __wr_unlock((void *)&pHashObj->lock, pHashObj->type); + pHashObj->size = 0; + taosHashWUnlock(pHashObj); } +// the input paras should be SHashObj **, so the origin input will be set by tfree(*pHashObj) void taosHashCleanup(SHashObj *pHashObj) { if (pHashObj == NULL) { return; @@ -636,26 +627,29 @@ void taosHashCleanup(SHashObj *pHashObj) { } taosArrayDestroy(pHashObj->pMemBlock); - - memset(pHashObj, 0, sizeof(SHashObj)); free(pHashObj); } // for profile only -int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) { +int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj){ if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) { return 0; } int32_t num = 0; + taosHashRLock((SHashObj*) pHashObj); for (int32_t i = 0; i < pHashObj->size; ++i) { SHashEntry *pEntry = pHashObj->hashList[i]; + + // fine grain per entry lock is not held since this is used + // for profiling only and doesn't need an accurate count. if (num < pEntry->num) { num = pEntry->num; } } + taosHashRUnlock((SHashObj*) pHashObj); return num; } @@ -664,28 +658,24 @@ void taosHashTableResize(SHashObj *pHashObj) { return; } - // double the original capacity - SHashNode *pNode = NULL; - SHashNode *pNext = NULL; - - int32_t newSize = (int32_t)(pHashObj->capacity << 1u); - if (newSize > HASH_MAX_CAPACITY) { - // uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", - // pHashObj->capacity, HASH_MAX_CAPACITY); + int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u); + if (newCapacity > HASH_MAX_CAPACITY) { +// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached", +// pHashObj->capacity, HASH_MAX_CAPACITY); return; } int64_t st = taosGetTimestampUs(); - void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newSize); - if (pNewEntryList == NULL) { // todo handle error - // uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); + void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newCapacity); + if (pNewEntryList == NULL) { +// uDebug("cache resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); return; } pHashObj->hashList = pNewEntryList; - size_t inc = newSize - pHashObj->capacity; - void *p = calloc(inc, sizeof(SHashEntry)); + size_t inc = newCapacity - pHashObj->capacity; + void * p = calloc(inc, sizeof(SHashEntry)); for (int32_t i = 0; i < inc; ++i) { pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry)); @@ -693,92 +683,62 @@ void taosHashTableResize(SHashObj *pHashObj) { taosArrayPush(pHashObj->pMemBlock, &p); - pHashObj->capacity = newSize; - for (int32_t i = 0; i < pHashObj->capacity; ++i) { - SHashEntry *pe = pHashObj->hashList[i]; - - if (pe->num == 0) { - assert(pe->next == NULL); - } else { - assert(pe->next != NULL); - } + pHashObj->capacity = newCapacity; + for (int32_t idx = 0; idx < pHashObj->capacity; ++idx) { + SHashEntry *pe = pHashObj->hashList[idx]; + SHashNode *pNode; + SHashNode *pNext; + SHashNode *pPrev = NULL; if (pe->num == 0) { assert(pe->next == NULL); continue; } - while ((pNode = pe->next) != NULL) { - int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity); - if (j != i) { - pe->num -= 1; - pe->next = pNode->next; + pNode = pe->next; - if (pe->num == 0) { - assert(pe->next == NULL); + assert(pNode != NULL); + + while (pNode != NULL) { + int32_t newIdx = HASH_INDEX(pNode->hashVal, pHashObj->capacity); + pNext = pNode->next; + if (newIdx != idx) { + pe->num -= 1; + if (pPrev == NULL) { + pe->next = pNext; } else { - assert(pe->next != NULL); + pPrev->next = pNext; } - SHashEntry *pNewEntry = pHashObj->hashList[j]; + SHashEntry *pNewEntry = pHashObj->hashList[newIdx]; pushfrontNodeInEntryList(pNewEntry, pNode); } else { - break; - } - } - - if (pNode != NULL) { - while ((pNext = pNode->next) != NULL) { - int32_t j = HASH_INDEX(pNext->hashVal, pHashObj->capacity); - if (j != i) { - pe->num -= 1; - - pNode->next = pNext->next; - pNext->next = NULL; - - // added into new slot - SHashEntry *pNewEntry = pHashObj->hashList[j]; - - if (pNewEntry->num == 0) { - assert(pNewEntry->next == NULL); - } else { - assert(pNewEntry->next != NULL); - } - - pushfrontNodeInEntryList(pNewEntry, pNext); - } else { - pNode = pNext; - } - } - - if (pe->num == 0) { - assert(pe->next == NULL); - } else { - assert(pe->next != NULL); + pPrev = pNode; } + pNode = pNext; } } int64_t et = taosGetTimestampUs(); - uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity, - ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); +// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity, +// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); } SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { SHashNode *pNewNode = malloc(sizeof(SHashNode) + keyLen + dsize); if (pNewNode == NULL) { - uError("failed to allocate memory, reason:%s", strerror(errno)); + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pNewNode->keyLen = (uint32_t)keyLen; + pNewNode->keyLen = (uint32_t)keyLen; pNewNode->hashVal = hashVal; pNewNode->dataLen = (uint32_t)dsize; - pNewNode->count = 1; + pNewNode->refCount= 1; pNewNode->removed = 0; - pNewNode->next = NULL; + pNewNode->next = NULL; memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize); memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen); @@ -800,51 +760,32 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) { return 0; } - return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + - sizeof(SHashObj); + return (pHashObj->capacity * (sizeof(SHashEntry) + sizeof(void*))) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj); } -FORCE_INLINE int32_t taosHashGetKey(void *data, void **key, size_t *keyLen) { - if (NULL == data || NULL == key) { - return -1; - } - - SHashNode *node = GET_HASH_PNODE(data); - *key = GET_HASH_NODE_KEY(node); - if (keyLen) { +void *taosHashGetKey(void *data, size_t* keyLen) { + SHashNode * node = GET_HASH_PNODE(data); + if (keyLen != NULL) { *keyLen = node->keyLen; } - return 0; -} - -FORCE_INLINE int32_t taosHashGetDataLen(void *data) { - SHashNode *node = GET_HASH_PNODE(data); - return node->keyLen; -} - -FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) { - SHashNode *node = GET_HASH_PNODE(data); - return node->keyLen; + return GET_HASH_NODE_KEY(node); } // release the pNode, return next pNode, and lock the current entry -static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int32_t *slot) { +static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) { SHashNode *pOld = (SHashNode *)GET_HASH_PNODE(p); SHashNode *prevNode = NULL; *slot = HASH_INDEX(pOld->hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[*slot]; - // lock entry - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWLockLatch(&pe->latch); - } + taosHashEntryWLock(pHashObj, pe); SHashNode *pNode = pe->next; - while (pNode) { - if (pNode == pOld) break; + if (pNode == pOld) + break; prevNode = pNode; pNode = pNode->next; @@ -857,8 +798,8 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int32_t *slot) { pNode = pNode->next; } - pOld->count--; - if (pOld->count <= 0) { + atomic_sub_fetch_32(&pOld->refCount, 1); + if (pOld->refCount <=0) { if (prevNode) { prevNode->next = pOld->next; } else { @@ -866,11 +807,11 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int32_t *slot) { } pe->num--; - atomic_sub_fetch_32(&pHashObj->size, 1); - FREE_HASH_NODE(pHashObj, pOld); + atomic_sub_fetch_64(&pHashObj->size, 1); + FREE_HASH_NODE(pOld); } } else { - uError("pNode:%p data:%p is not there!!!", pNode, p); +// uError("pNode:%p data:%p is not there!!!", pNode, p); } return pNode; @@ -879,20 +820,18 @@ static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int32_t *slot) { void *taosHashIterate(SHashObj *pHashObj, void *p) { if (pHashObj == NULL) return NULL; - int32_t slot = 0; + int slot = 0; char *data = NULL; // only add the read lock to disable the resize process - __rd_lock((void *)&pHashObj->lock, pHashObj->type); + taosHashRLock(pHashObj); SHashNode *pNode = NULL; if (p) { pNode = taosHashReleaseNode(pHashObj, p, &slot); if (pNode == NULL) { SHashEntry *pe = pHashObj->hashList[slot]; - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWUnLockLatch(&pe->latch); - } + taosHashEntryWUnlock(pHashObj, pe); slot = slot + 1; } @@ -902,10 +841,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { for (; slot < pHashObj->capacity; ++slot) { SHashEntry *pe = pHashObj->hashList[slot]; - // lock entry - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWLockLatch(&pe->latch); - } + taosHashEntryWLock(pHashObj, pe); pNode = pe->next; while (pNode) { @@ -915,23 +851,22 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { if (pNode) break; - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWUnLockLatch(&pe->latch); - } + taosHashEntryWUnlock(pHashObj, pe); } } if (pNode) { SHashEntry *pe = pHashObj->hashList[slot]; - uint16_t prevRef = atomic_load_16(&pNode->count); - uint16_t afterRef = atomic_add_fetch_16(&pNode->count, 1); + uint16_t prevRef = atomic_load_16(&pNode->refCount); + uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1); + ASSERT(prevRef < afterRef); // the reference count value is overflow, which will cause the delete node operation immediately. if (prevRef > afterRef) { uError("hash entry ref count overflow, prev ref:%d, current ref:%d", prevRef, afterRef); // restore the value - atomic_sub_fetch_16(&pNode->count, 1); + atomic_sub_fetch_16(&pNode->refCount, 1); data = NULL; } else { data = GET_HASH_NODE_DATA(pNode); @@ -941,12 +876,10 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { uWarn("hash entry ref count is abnormally high: %d", afterRef); } - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWUnLockLatch(&pe->latch); - } + taosHashEntryWUnlock(pHashObj, pe); } - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); + taosHashRUnlock(pHashObj); return data; } @@ -954,17 +887,20 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) { if (pHashObj == NULL || p == NULL) return; // only add the read lock to disable the resize process - __rd_lock((void *)&pHashObj->lock, pHashObj->type); + taosHashRLock(pHashObj); - int32_t slot; + int slot; taosHashReleaseNode(pHashObj, p, &slot); SHashEntry *pe = pHashObj->hashList[slot]; - if (pHashObj->type == HASH_ENTRY_LOCK) { - taosWUnLockLatch(&pe->latch); - } - __rd_unlock((void *)&pHashObj->lock, pHashObj->type); + taosHashEntryWUnlock(pHashObj, pe); + taosHashRUnlock(pHashObj); +} + +void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) { + void* p = NULL; + return taosHashGetImpl(pHashObj, key, keyLen, &p, 0, true); } void taosHashRelease(SHashObj *pHashObj, void *p) { taosHashCancelIterate(pHashObj, p); } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 45c38165a1..fe32afb2f4 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -42,8 +42,8 @@ struct SDiskbasedBuf { bool comp; // compressed before flushed to disk uint64_t nextPos; // next page flush position - uint64_t qId; // for debug purpose - bool printStatis; // Print statistics info when closing this buffer. + char* id; // for debug purpose + bool printStatis; // Print statistics info when closing this buffer. SDiskbasedBufStatis statis; }; @@ -356,7 +356,7 @@ static SPageInfo* getPageInfoFromPayload(void* page) { return ppi; } -int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, +int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, const char* dir) { *pBuf = calloc(1, sizeof(SDiskbasedBuf)); @@ -366,13 +366,13 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem } pPBuf->pageSize = pagesize; - pPBuf->numOfPages = 0; // all pages are in buffer in the first place + pPBuf->numOfPages = 0; // all pages are in buffer in the first place pPBuf->totalBufSize = 0; pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit. pPBuf->allocateId = -1; - pPBuf->comp = true; - pPBuf->pFile = NULL; - pPBuf->qId = qId; + pPBuf->comp = true; + pPBuf->pFile = NULL; + pPBuf->id = strdup(id); pPBuf->fileSize = 0; pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem)); pPBuf->freePgList = tdListNew(POINTER_BYTES); @@ -540,13 +540,13 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { if (pBuf->pFile != NULL) { uDebug( "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page " - "size:%.2f Kb, %" PRIx64 "\n", + "size:%.2f Kb, %s\n", pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, - listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId); + listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id); taosCloseFile(&pBuf->pFile); } else { - uDebug("Paged buffer closed, total:%.2f Kb, no file created, %" PRIx64, pBuf->totalBufSize / 1024.0, pBuf->qId); + uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id); } // print the statistics information @@ -584,6 +584,7 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { taosHashCleanup(pBuf->groupSet); taosHashCleanup(pBuf->all); + tfree(pBuf->id); tfree(pBuf->assistBuf); tfree(pBuf); } @@ -639,9 +640,9 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { printf( "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f " - "Kb, %" PRIx64 "\n", + "Kb, %s\n", pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, - listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId); + listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id); printf( "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n", diff --git a/source/util/test/pageBufferTest.cpp b/source/util/test/pageBufferTest.cpp index 4b9d8c5f51..8fbec31dd2 100644 --- a/source/util/test/pageBufferTest.cpp +++ b/source/util/test/pageBufferTest.cpp @@ -13,7 +13,7 @@ namespace { // simple test void simpleTest() { SDiskbasedBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4096, 1, "/tmp/"); + int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4096, "", "/tmp/"); int32_t pageId = 0; int32_t groupId = 0; @@ -55,7 +55,7 @@ void simpleTest() { void writeDownTest() { SDiskbasedBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/"); + int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, "1", "/tmp/"); int32_t pageId = 0; int32_t writePageId = 0; @@ -102,7 +102,7 @@ void writeDownTest() { void recyclePageTest() { SDiskbasedBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/"); + int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, "1", "/tmp/"); int32_t pageId = 0; int32_t writePageId = 0;