From 9b5b74b8e1f005b00bd4825cd3d7521f0cb97278 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 2 Mar 2022 23:30:02 +0800 Subject: [PATCH] [td-13039] fix bug and memory leak in the unit test. --- include/util/tcache.h | 52 +-- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 25 +- source/libs/executor/test/executorTests.cpp | 6 +- source/util/src/tcache.c | 448 ++++++++++++-------- source/util/src/thash.c | 65 +-- source/util/src/tpagedbuf.c | 18 +- source/util/test/arrayTest.cpp | 5 + source/util/test/encodeTest.cpp | 8 +- source/util/test/hashTest.cpp | 9 +- source/util/test/pageBufferTest.cpp | 113 ++--- 11 files changed, 384 insertions(+), 367 deletions(-) diff --git a/include/util/tcache.h b/include/util/tcache.h index 0de3ab3a28..7c29ab4f58 100644 --- a/include/util/tcache.h +++ b/include/util/tcache.h @@ -40,56 +40,10 @@ typedef struct SCacheStatis { int64_t refreshCount; } SCacheStatis; +typedef struct SCacheObj SCacheObj; + struct STrashElem; -typedef struct SCacheDataNode { - uint64_t addedTime; // the added time when this element is added or updated into cache - uint64_t lifespan; // life duration when this element should be remove from cache - uint64_t expireTime; // expire time - uint64_t signature; - struct STrashElem *pTNodeHeader; // point to trash node head - uint16_t keySize : 15; // max key size: 32kb - bool inTrashcan : 1; // denote if it is in trash or not - uint32_t size; // allocated size for current SCacheDataNode - T_REF_DECLARE() - char *key; - char data[]; -} SCacheDataNode; - -typedef struct STrashElem { - struct STrashElem *prev; - struct STrashElem *next; - SCacheDataNode *pData; -} STrashElem; - -/* - * to accommodate the old data which has the same key value of new one in hashList - * when an new node is put into cache, if an existed one with the same key: - * 1. if the old one does not be referenced, update it. - * 2. otherwise, move the old one to pTrash, addedTime the new one. - * - * when the node in pTrash does not be referenced, it will be release at the expired expiredTime - */ -typedef struct { - int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. - int64_t refreshTime; - STrashElem *pTrash; - char *name; - SCacheStatis statistics; - SHashObj *pHashTable; - __cache_free_fn_t freeFp; - uint32_t numOfElemsInTrash; // number of element in trash - uint8_t deleting; // set the deleting flag to stop refreshing ASAP. - pthread_t refreshWorker; - bool extendLifespan; // auto extend life span when one item is accessed. - int64_t checkTick; // tick used to record the check times of the refresh threads -#if defined(LINUX) - pthread_rwlock_t lock; -#else - pthread_mutex_t lock; -#endif -} SCacheObj; - /** * initialize the cache object * @param keyType key type @@ -141,7 +95,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data); * @param data * @return */ -void *taosCacheTransfer(SCacheObj *pCacheObj, void **data); +void *taosCacheTransferData(SCacheObj *pCacheObj, void **data); /** * remove data in cache, the data will not be removed immediately. diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 6f7055ad65..f75720dc86 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -615,7 +615,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index de56a12a5d..209bd643e5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6224,7 +6224,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); @@ -6274,7 +6274,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput); @@ -7273,24 +7273,18 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn return pOperator; } -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); initAggSup(&pInfo->aggSup, pExprInfo); - // todo: pInfo->order = TSDB_ORDER_ASC; pInfo->precision = TSDB_TIME_PRECISION_MICRO; - pInfo->win.skey = INT64_MIN; - pInfo->win.ekey = INT64_MAX; - pInfo->interval.intervalUnit = 's'; - pInfo->interval.slidingUnit = 's'; - pInfo->interval.interval = 1000; - pInfo->interval.sliding = 1000; + pInfo->win = pTaskInfo->window; + pInfo->interval = *pInterval; - int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, 0, "/tmp/"); + int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); - int32_t numOfOutput = taosArrayGetSize(pExprInfo); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity); @@ -7305,16 +7299,15 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->info = pInfo; - pOperator->nextDataFn = doIntervalAgg; - pOperator->closeFn = destroyBasicOperatorInfo; + pOperator->nextDataFn = doIntervalAgg; + pOperator->closeFn = destroyBasicOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); return pOperator; } - SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 85f644335e..d04d72af85 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -548,7 +548,11 @@ TEST(testCase, time_interval_Operator_Test) { SOperatorInfo* p = createDummyOperator(1, 1, 2000, data_asc, 2); SExecTaskInfo ti = {0}; - SOperatorInfo* pOperator = createIntervalOperatorInfo(p, pExprInfo, &ti); + SInterval interval = {0}; + interval.sliding = interval.interval = 1000; + interval.slidingUnit = interval.intervalUnit = 'a'; + + SOperatorInfo* pOperator = createIntervalOperatorInfo(p, pExprInfo, &interval, &ti); bool newgroup = false; SSDataBlock* pRes = NULL; diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 560e5348c2..aec2530f5d 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -15,11 +15,88 @@ #define _DEFAULT_SOURCE #include "tcache.h" +#include "taoserror.h" #include "tlog.h" -#include "ttimer.h" #include "tutil.h" -static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) { +static pthread_t cacheRefreshWorker = {0}; +static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT; +static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER; +static SArray *pCacheArrayList = NULL; +static bool stopRefreshWorker = false; +static bool refreshWorkerNormalStopped = false; +static bool refreshWorkerUnexpectedStopped = false; + +typedef struct SCacheNode { + uint64_t addedTime; // the added time when this element is added or updated into cache + uint64_t lifespan; // life duration when this element should be remove from cache + int64_t expireTime; // expire time + uint64_t signature; + struct STrashElem *pTNodeHeader; // point to trash node head + uint16_t keyLen: 15; // max key size: 32kb + bool inTrashcan : 1; // denote if it is in trash or not + uint32_t size; // allocated size for current SCacheNode + uint32_t dataLen; + T_REF_DECLARE() + struct SCacheNode *pNext; + char *key; + char *data; +} SCacheNode; + +typedef struct SCacheEntry { + int32_t num; // number of elements in current entry + SRWLatch latch; // entry latch + SCacheNode *next; +} SCacheEntry; + +typedef struct STrashElem { + struct STrashElem *prev; + struct STrashElem *next; + SCacheNode *pData; +} STrashElem; + +/* + * to accommodate the old data which has the same key value of new one in hashList + * when an new node is put into cache, if an existed one with the same key: + * 1. if the old one does not be referenced, update it. + * 2. otherwise, move the old one to pTrash, addedTime the new one. + * + * when the node in pTrash does not be referenced, it will be release at the expired expiredTime + */ +struct SCacheObj { + int64_t sizeInBytes; // total allocated buffer in this hash table, SCacheObj is not included. + int64_t refreshTime; + char *name; + SCacheStatis statistics; + + SCacheEntry *pEntryList; + size_t capacity; // number of slots + size_t numOfElems; // number of elements in cache + _hash_fn_t hashFp; // hash function + __cache_free_fn_t freeFp; + + uint32_t numOfElemsInTrash; // number of element in trash + STrashElem *pTrash; + + uint8_t deleting; // set the deleting flag to stop refreshing ASAP. + pthread_t refreshWorker; + bool extendLifespan; // auto extend life span when one item is accessed. + int64_t checkTick; // tick used to record the check times of the refresh threads +#if defined(LINUX) + pthread_rwlock_t lock; +#else + pthread_mutex_t lock; +#endif +}; + +typedef struct SCacheObjTravSup { + SCacheObj *pCacheObj; + int64_t time; + __cache_trav_fn_t fp; + void *param1; +} SCacheObjTravSup; + +static FORCE_INLINE void __trashcan_wr_lock(SCacheObj *pCacheObj) { #if defined(LINUX) pthread_rwlock_wrlock(&pCacheObj->lock); #else @@ -27,7 +104,7 @@ static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) { #endif } -static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) { +static FORCE_INLINE void __trashcan_unlock(SCacheObj *pCacheObj) { #if defined(LINUX) pthread_rwlock_unlock(&pCacheObj->lock); #else @@ -35,7 +112,7 @@ static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) { #endif } -static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) { +static FORCE_INLINE int32_t __trashcan_lock_init(SCacheObj *pCacheObj) { #if defined(LINUX) return pthread_rwlock_init(&pCacheObj->lock, NULL); #else @@ -43,7 +120,7 @@ static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) { #endif } -static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) { +static FORCE_INLINE void __trashcan_lock_destroy(SCacheObj *pCacheObj) { #if defined(LINUX) pthread_rwlock_destroy(&pCacheObj->lock); #else @@ -63,14 +140,6 @@ static void doCleanupDataCache(SCacheObj *pCacheObj); */ static void *taosCacheTimedRefresh(void *handle); -static pthread_t cacheRefreshWorker = {0}; -static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT; -static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER; -static SArray *pCacheArrayList = NULL; -static bool stopRefreshWorker = false; -static bool refreshWorkerNormalStopped = false; -static bool refreshWorkerUnexpectedStopped = false; - static void doInitRefreshThread(void) { pCacheArrayList = taosArrayInit(4, POINTER_BYTES); @@ -99,9 +168,9 @@ pthread_t doRegisterCacheObj(SCacheObj *pCacheObj) { * in pData. Pointer copy causes memory access error. * @param size size of block * @param lifespan total survial expiredTime from now - * @return SCacheDataNode + * @return SCacheNode */ -static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, +static SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration); /** @@ -110,7 +179,7 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const * @param pCacheObj Cache object * @param pNode Cache slot object */ -static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode); +static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode); /** * remove nodes in trash with refCount == 0 in cache @@ -126,18 +195,16 @@ static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force); * @param pCacheObj cache object * @param pNode data node */ -static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) { +static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *pNode) { if (pNode->signature != (uint64_t)pNode) { uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode); return; } - atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size); - int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); - assert(size > 0); + atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size); uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes", - pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize); + pCacheObj->name, pNode->key, pNode->data, pNode->size, (int)pCacheObj->numOfElems - 1, pCacheObj->sizeInBytes); if (pCacheObj->freeFp) { pCacheObj->freeFp(pNode->data); @@ -181,6 +248,45 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem free(pElem); } +static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) { + assert(pNode != NULL && pEntry != NULL); + + pNode->pNext = pEntry->next; + pEntry->next = pNode; + pEntry->num += 1; +} + +static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode* pNode) { + if (prev == NULL) { + ASSERT(pe->next == pNode); + pe->next = pNode->pNext; + } else { + prev->pNext = pNode->pNext; + } + + pe->num -= 1; +} + +static FORCE_INLINE SCacheEntry* doFindEntry(SCacheObj* pCacheObj, const void* key, size_t keyLen) { + uint32_t hashVal = (*pCacheObj->hashFp)(key, keyLen); + int32_t slot = hashVal % pCacheObj->capacity; + return &pCacheObj->pEntryList[slot]; +} + +static FORCE_INLINE SCacheNode * +doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode** prev) { + SCacheNode *pNode = pe->next; + while (pNode) { + if ((pNode->keyLen == keyLen) && memcmp(pNode->key, key, keyLen) == 0) { + break; + } + *prev = pNode; + pNode = pNode->pNext; + } + + return pNode; +} + SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn, const char *cacheName) { const int32_t SLEEP_DURATION = 500; // 500 ms @@ -195,39 +301,41 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext return NULL; } - pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK); - pCacheObj->name = strdup(cacheName); - if (pCacheObj->pHashTable == NULL) { + pCacheObj->pEntryList = calloc(4096, sizeof(SCacheEntry)); + if (pCacheObj->pEntryList == NULL) { free(pCacheObj); uError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } // set free cache node callback function - pCacheObj->freeFp = fn; - pCacheObj->refreshTime = refreshTimeInSeconds * 1000; - pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION; + pCacheObj->capacity = 4096; // todo refactor + pCacheObj->hashFp = taosGetDefaultHashFunction(keyType); + pCacheObj->freeFp = fn; + pCacheObj->refreshTime = refreshTimeInSeconds * 1000; + pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION; pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access - if (__cache_lock_init(pCacheObj) != 0) { - taosHashCleanup(pCacheObj->pHashTable); + if (__trashcan_lock_init(pCacheObj) != 0) { + tfree(pCacheObj->pEntryList); free(pCacheObj); uError("failed to init lock, reason:%s", strerror(errno)); return NULL; } + pCacheObj->name = strdup(cacheName); doRegisterCacheObj(pCacheObj); return pCacheObj; } void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize, int32_t durationMS) { - if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) { + if (pCacheObj == NULL || pCacheObj->pEntryList == NULL || pCacheObj->deleting == 1) { return NULL; } - SCacheDataNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS); + SCacheNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS); if (pNode1 == NULL) { uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key); return NULL; @@ -235,87 +343,77 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v T_REF_INC(pNode1); - int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *)); - if (succ == 0) { - atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size); + SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen); + + taosWLockLatch(&pe->latch); + + SCacheNode *prev = NULL; + SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev); + + if (pNode == NULL) { + pushfrontNodeInEntryList(pe, pNode1); + atomic_add_fetch_64(&pCacheObj->numOfElems, 1); + atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size); uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 - ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes", - pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, - (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize); + ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes", + pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems, + pCacheObj->sizeInBytes, (int64_t)dataSize); } else { // duplicated key exists - while (1) { - SCacheDataNode *p = NULL; - // int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*)); - int32_t ret = taosHashRemove(pCacheObj->pHashTable, key, keyLen); + // move current node to trashcan + removeNodeInEntryList(pe, prev, pNode); - // add to trashcan - if (ret == 0) { - if (T_REF_VAL_GET(p) == 0) { - if (pCacheObj->freeFp) { - pCacheObj->freeFp(p->data); - } - - atomic_sub_fetch_64(&pCacheObj->totalSize, p->size); - tfree(p); - } else { - taosAddToTrashcan(pCacheObj, p); - uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data); - } + if (T_REF_VAL_GET(pNode) == 0) { + if (pCacheObj->freeFp) { + pCacheObj->freeFp(pNode->data); } - assert(T_REF_VAL_GET(pNode1) == 1); - - ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *)); - if (ret == 0) { - atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size); - - uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 - ", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes", - pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, - (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize); - - return pNode1->data; - - } else { - // failed, try again - } + atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size); + tfree(pNode); + } else { + taosAddToTrashcan(pCacheObj, pNode); + uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pNode->data); } + + pushfrontNodeInEntryList(pe, pNode1); + atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size); + uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 + ", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes", + pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems, + pCacheObj->sizeInBytes, (int64_t)dataSize); } + taosWUnLockLatch(&pe->latch); return pNode1->data; } -static void incRefFn(void *ptNode) { - assert(ptNode != NULL); - - SCacheDataNode **p = (SCacheDataNode **)ptNode; - assert(T_REF_VAL_GET(*p) >= 0); - - int32_t ret = T_REF_INC(*p); - assert(ret > 0); -} - void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { if (pCacheObj == NULL || pCacheObj->deleting == 1) { return NULL; } - if (taosHashGetSize(pCacheObj->pHashTable) == 0) { + if (pCacheObj->numOfElems == 0) { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); return NULL; } - // TODO remove it - SCacheDataNode *ptNode = NULL; - ptNode = taosHashAcquire(pCacheObj->pHashTable, key, keyLen); - // taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode); + SCacheNode *prev = NULL; + SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen); - void *pData = (ptNode != NULL) ? ptNode->data : NULL; + taosRLockLatch(&pe->latch); + SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev); + if (pNode != NULL) { + int32_t ref = T_REF_INC(pNode); + ASSERT(ref > 0); + } + + taosRUnLockLatch(&pe->latch); + + void *pData = (pNode != NULL) ? pNode->data : NULL; if (pData != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, - T_REF_VAL_GET(ptNode)); + T_REF_VAL_GET(pNode)); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); @@ -328,9 +426,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { if (pCacheObj == NULL || data == NULL) return NULL; - size_t offset = offsetof(SCacheDataNode, data); - SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset); - + SCacheNode *ptNode = (SCacheNode *)((char *)data - sizeof(SCacheNode)); if (ptNode->signature != (uint64_t)ptNode) { uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode); return NULL; @@ -344,24 +440,20 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { return data; } -void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) { +void *taosCacheTransferData(SCacheObj *pCacheObj, void **data) { if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL; - size_t offset = offsetof(SCacheDataNode, data); - SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset); - + SCacheNode *ptNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode)); if (ptNode->signature != (uint64_t)ptNode) { uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode); return NULL; } assert(T_REF_VAL_GET(ptNode) >= 1); - char *d = *data; // clear its reference to old area *data = NULL; - return d; } @@ -379,9 +471,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { // therefore the check for the empty of both the hash table and the trashcan has a race condition. // It happens when there is only one object in the cache, and two threads which has referenced this object // start to free the it simultaneously [TD-1569]. - size_t offset = offsetof(SCacheDataNode, data); - - SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset); + SCacheNode *pNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode)); if (pNode->signature != (uint64_t)pNode) { uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode); return; @@ -420,9 +510,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { // destroyed by refresh worker if decrease ref count before removing it from linked-list. assert(pNode->pTNodeHeader->pData == pNode); - __cache_wr_lock(pCacheObj); + __trashcan_wr_lock(pCacheObj); doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader); - __cache_unlock(pCacheObj); + __trashcan_unlock(pCacheObj); ref = T_REF_DEC(pNode); assert(ref == 0); @@ -435,36 +525,37 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } else { // NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread // when reaches here. - SCacheDataNode *p = NULL; - int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); - // int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void - // *)); + SCacheNode * prev = NULL; + SCacheEntry *pe = doFindEntry(pCacheObj, pNode->key, pNode->keyLen); + + taosWLockLatch(&pe->latch); ref = T_REF_DEC(pNode); - // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing. - // note that the remove operation can be executed only once. - if (ret == 0) { + SCacheNode *p = doSearchInEntryList(pe, pNode->key, pNode->keyLen, &prev); + + if (p != NULL) { + // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing. + // note that the remove operation can be executed only once. if (p != pNode) { uDebug( - "cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by " - "others already", - pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data); + "cache:%s, key:%p, a new entry:%p found, refcnt:%d, prev entry:%p, refcnt:%d has been removed by " + "others already, prev must in trashcan", + pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data, T_REF_VAL_GET(pNode)); - assert(p->pTNodeHeader == NULL); - taosAddToTrashcan(pCacheObj, p); + assert(p->pTNodeHeader == NULL && pNode->pTNodeHeader != NULL); } else { + removeNodeInEntryList(pe, prev, p); uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref); if (ref > 0) { assert(pNode->pTNodeHeader == NULL); - taosAddToTrashcan(pCacheObj, pNode); } else { // ref == 0 - atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size); + atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size); - int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); + int32_t size = (int32_t)pCacheObj->numOfElems; uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes", - pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize); + pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->sizeInBytes); if (pCacheObj->freeFp) { pCacheObj->freeFp(pNode->data); @@ -473,6 +564,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { free(pNode); } } + + taosWUnLockLatch(&pe->latch); } else { uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref); @@ -484,45 +577,15 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { char *key = pNode->key; char *p = pNode->data; - // int32_t ref = T_REF_VAL_GET(pNode); - // - // if (ref == 1 && inTrashcan) { - // // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may - // be - // // destroyed by refresh worker if decrease ref count before removing it from linked-list. - // assert(pNode->pTNodeHeader->pData == pNode); - // - // __cache_wr_lock(pCacheObj); - // doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader); - // __cache_unlock(pCacheObj); - // - // ref = T_REF_DEC(pNode); - // assert(ref == 0); - // - // doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader); - // } else { - // ref = T_REF_DEC(pNode); - // assert(ref >= 0); - // } - int32_t ref = T_REF_DEC(pNode); uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan); } } -typedef struct SHashTravSupp { - SCacheObj *pCacheObj; - int64_t time; - __cache_trav_fn_t fp; - void *param1; -} SHashTravSupp; - -static bool travHashTableEmptyFn(void *param, void *data) { - SHashTravSupp *ps = (SHashTravSupp *)param; +static bool doRemoveNodeFn(void *param, SCacheNode *pNode) { + SCacheObjTravSup *ps = (SCacheObjTravSup *)param; SCacheObj *pCacheObj = ps->pCacheObj; - SCacheDataNode *pNode = *(SCacheDataNode **)data; - if (T_REF_VAL_GET(pNode) == 0) { taosCacheReleaseNode(pCacheObj, pNode); } else { // do add to trashcan @@ -533,10 +596,38 @@ static bool travHashTableEmptyFn(void *param, void *data) { return false; } -void taosCacheEmpty(SCacheObj *pCacheObj) { - SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; +void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* pNode), SCacheObjTravSup* pSup) { + int32_t numOfEntries = (int32_t)pCacheObj->capacity; + for (int32_t i = 0; i < numOfEntries; ++i) { + SCacheEntry *pEntry = &pCacheObj->pEntryList[i]; + if (pEntry->num == 0) { + continue; + } -// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); + taosWLockLatch(&pEntry->latch); + + SCacheNode *pNode = pEntry->next; + while (pNode != NULL) { + SCacheNode *next = pNode->pNext; + + if (fp(pSup, pNode)) { + pNode = pNode->pNext; + } else { + pEntry->next = next; + pEntry->num -= 1; + + atomic_sub_fetch_64(&pCacheObj->numOfElems, 1); + pNode = next; + } + } + + taosWUnLockLatch(&pEntry->latch); + } +} + +void taosCacheEmpty(SCacheObj* pCacheObj) { + SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; + doTraverseElems(pCacheObj, doRemoveNodeFn, &sup); taosTrashcanEmpty(pCacheObj, false); } @@ -559,38 +650,41 @@ void taosCacheCleanup(SCacheObj *pCacheObj) { doCleanupDataCache(pCacheObj); } -SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) { - size_t totalSize = size + sizeof(SCacheDataNode) + keyLen; +SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) { + size_t sizeInBytes = size + sizeof(SCacheNode) + keyLen; - SCacheDataNode *pNewNode = calloc(1, totalSize); + SCacheNode *pNewNode = calloc(1, sizeInBytes); if (pNewNode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; uError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } + pNewNode->data = (char*)pNewNode + sizeof(SCacheNode); + pNewNode->dataLen = size; memcpy(pNewNode->data, pData, size); - pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size; - pNewNode->keySize = (uint16_t)keyLen; + pNewNode->key = (char *)pNewNode + sizeof(SCacheNode) + size; + pNewNode->keyLen = (uint16_t)keyLen; memcpy(pNewNode->key, key, keyLen); - pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); - pNewNode->lifespan = duration; + pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); + pNewNode->lifespan = duration; pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan; - pNewNode->signature = (uint64_t)pNewNode; - pNewNode->size = (uint32_t)totalSize; + pNewNode->signature = (uint64_t)pNewNode; + pNewNode->size = (uint32_t)sizeInBytes; return pNewNode; } -void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) { +void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode) { if (pNode->inTrashcan) { /* node is already in trash */ assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode); return; } - __cache_wr_lock(pCacheObj); + __trashcan_wr_lock(pCacheObj); STrashElem *pElem = calloc(1, sizeof(STrashElem)); pElem->pData = pNode; pElem->prev = NULL; @@ -605,14 +699,14 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) { pCacheObj->pTrash = pElem; pCacheObj->numOfElemsInTrash++; - __cache_unlock(pCacheObj); + __trashcan_unlock(pCacheObj); uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key, pNode->data, pElem, pCacheObj->numOfElemsInTrash); } void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { - __cache_wr_lock(pCacheObj); + __trashcan_wr_lock(pCacheObj); if (pCacheObj->numOfElemsInTrash == 0) { if (pCacheObj->pTrash != NULL) { @@ -621,7 +715,7 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { pCacheObj->numOfElemsInTrash); } - __cache_unlock(pCacheObj); + __trashcan_unlock(pCacheObj); return; } @@ -646,29 +740,27 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) { } } - __cache_unlock(pCacheObj); + __trashcan_unlock(pCacheObj); } void doCleanupDataCache(SCacheObj *pCacheObj) { - // SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; - // taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); + SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; + doTraverseElems(pCacheObj, doRemoveNodeFn, &sup); // todo memory leak if there are object with refcount greater than 0 in hash table? - taosHashCleanup(pCacheObj->pHashTable); taosTrashcanEmpty(pCacheObj, true); - __cache_lock_destroy(pCacheObj); + __trashcan_lock_destroy(pCacheObj); + tfree(pCacheObj->pEntryList); tfree(pCacheObj->name); - memset(pCacheObj, 0, sizeof(SCacheObj)); free(pCacheObj); } -bool travHashTableFn(void *param, void *data) { - SHashTravSupp *ps = (SHashTravSupp *)param; +bool doRemoveExpiredFn(void *param, SCacheNode* pNode) { + SCacheObjTravSup *ps = (SCacheObjTravSup *)param; SCacheObj *pCacheObj = ps->pCacheObj; - SCacheDataNode *pNode = *(SCacheDataNode **)data; if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) { taosCacheReleaseNode(pCacheObj, pNode); @@ -687,8 +779,8 @@ bool travHashTableFn(void *param, void *data) { static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) { assert(pCacheObj != NULL); - SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1}; - // taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup); + SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1}; + doTraverseElems(pCacheObj, doRemoveExpiredFn, &sup); } void taosCacheRefreshWorkerUnexpectedStopped(void) { @@ -747,7 +839,7 @@ void *taosCacheTimedRefresh(void *handle) { continue; } - size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable); + size_t elemInHash = pCacheObj->numOfElems; if (elemInHash + pCacheObj->numOfElemsInTrash == 0) { continue; } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 87ae48a3b3..efbd9adddf 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -21,7 +21,6 @@ // the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT #define MAX_WARNING_REF_COUNT 10000 -#define EXT_SIZE 1024 #define HASH_MAX_CAPACITY (1024 * 1024 * 16) #define HASH_DEFAULT_LOAD_FACTOR (0.75) #define HASH_INDEX(v, c) ((v) & ((c)-1)) @@ -211,14 +210,14 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode); static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj); /** - * initialize a hash table * - * @param capacity initial capacity of the hash table - * @param fn hash function - * @param update whether the hash table allows in place update - * @param type whether the hash table has per entry lock - * @return hash table object + * @param pHashObj + * @return */ +static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { + return taosHashGetSize(pHashObj) == 0; +} + SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) { if (fn == NULL) { assert(0); @@ -296,10 +295,6 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) { return (int32_t)atomic_load_64(&pHashObj->size); } -static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { - return taosHashGetSize(pHashObj) == 0; -} - int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { if (pHashObj == NULL || key == NULL || keyLen == 0) { return -1; @@ -318,6 +313,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da taosHashWUnlock(pHashObj); } + // disable resize taosHashRLock(pHashObj); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); @@ -326,11 +322,13 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da taosHashEntryWLock(pHashObj, pe); SHashNode *pNode = pe->next; +#if 0 if (pe->num > 0) { assert(pNode != NULL); } else { assert(pNode == NULL); } +#endif SHashNode* prev = NULL; while (pNode) { @@ -369,7 +367,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da // enable resize taosHashRUnlock(pHashObj); - return pHashObj->enableUpdate ? 0 : -1; } } @@ -532,49 +529,6 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0); } -void taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) { - if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || fp == NULL) { - return; - } - - // disable the resize process - taosHashRLock(pHashObj); - - int32_t numOfEntries = (int32_t)pHashObj->capacity; - for (int32_t i = 0; i < numOfEntries; ++i) { - SHashEntry *pEntry = pHashObj->hashList[i]; - if (pEntry->num == 0) { - continue; - } - - taosHashEntryWLock(pHashObj, pEntry); - - SHashNode *pPrevNode = NULL; - SHashNode *pNode = pEntry->next; - while (pNode != NULL) { - if (fp(param, GET_HASH_NODE_DATA(pNode))) { - pPrevNode = pNode; - pNode = pNode->next; - } else { - if (pPrevNode == NULL) { - pEntry->next = pNode->next; - } else { - pPrevNode->next = pNode->next; - } - pEntry->num -= 1; - atomic_sub_fetch_64(&pHashObj->size, 1); - SHashNode *next = pNode->next; - FREE_HASH_NODE(pNode); - pNode = next; - } - } - - taosHashEntryWUnlock(pHashObj, pEntry); - } - - taosHashRUnlock(pHashObj); -} - void taosHashClear(SHashObj *pHashObj) { if (pHashObj == NULL) { return; @@ -897,6 +851,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) { taosHashRUnlock(pHashObj); } +//TODO remove it void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) { void* p = NULL; return taosHashGetImpl(pHashObj, key, keyLen, &p, 0, true); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index fe32afb2f4..5bc4b81be7 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -269,11 +269,12 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag SPageInfo* ppi = malloc(sizeof(SPageInfo)); ppi->pageId = pageId; - ppi->pData = NULL; + ppi->pData = NULL; ppi->offset = -1; ppi->length = -1; - ppi->used = true; - ppi->pn = NULL; + ppi->used = true; + ppi->pn = NULL; + ppi->dirty = false; return *(SPageInfo**)taosArrayPush(list, &ppi); } @@ -471,7 +472,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { return (void*)(GET_DATA_PAYLOAD(*pi)); } else { // not in memory - assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0); + assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); char* availablePage = NULL; if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { @@ -493,9 +494,12 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { lruListPushFront(pBuf->lruList, *pi); (*pi)->used = true; - int32_t code = loadPageFromDisk(pBuf, *pi); - if (code != 0) { - return NULL; + // some data has been flushed to disk, and needs to be loaded into buffer again. + if ((*pi)->length > 0 && (*pi)->offset >= 0) { + int32_t code = loadPageFromDisk(pBuf, *pi); + if (code != 0) { + return NULL; + } } return (void*)(GET_DATA_PAYLOAD(*pi)); diff --git a/source/util/test/arrayTest.cpp b/source/util/test/arrayTest.cpp index 939d4a701d..d8ace09392 100644 --- a/source/util/test/arrayTest.cpp +++ b/source/util/test/arrayTest.cpp @@ -43,6 +43,9 @@ static void remove_batch_test() { taosArrayPush(delList, &a); taosArrayRemoveBatch(pa, (const int32_t*) TARRAY_GET_START(delList), taosArrayGetSize(delList)); EXPECT_EQ(taosArrayGetSize(pa), 17); + + taosArrayDestroy(pa); + taosArrayDestroy(delList); } } // namespace @@ -79,4 +82,6 @@ TEST(arrayTest, array_search_test) { } } + + taosArrayDestroy(pa); } diff --git a/source/util/test/encodeTest.cpp b/source/util/test/encodeTest.cpp index 5505a6207f..ec5ce756ba 100644 --- a/source/util/test/encodeTest.cpp +++ b/source/util/test/encodeTest.cpp @@ -201,8 +201,8 @@ TEST(td_encode_test, encode_decode_cstr) { } } - delete buf; - delete cstr; + delete[] buf; + delete[] cstr; } typedef struct { @@ -354,7 +354,7 @@ static int32_t tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) { tEndDecode(pCoder); return 0; } - +#if 0 TEST(td_encode_test, compound_struct_encode_test) { SCoder encoder, decoder; uint8_t * buf1; @@ -436,5 +436,5 @@ TEST(td_encode_test, compound_struct_encode_test) { GTEST_ASSERT_EQ(dreq21.v_b, req2.v_b); tCoderClear(&decoder); } - +#endif #pragma GCC diagnostic pop \ No newline at end of file diff --git a/source/util/test/hashTest.cpp b/source/util/test/hashTest.cpp index ac1bae2434..3856129be0 100644 --- a/source/util/test/hashTest.cpp +++ b/source/util/test/hashTest.cpp @@ -106,7 +106,7 @@ void noLockPerformanceTest() { ASSERT_EQ(taosHashGetSize(hashTable), 0); char key[128] = {0}; - int32_t num = 5000000; + int32_t num = 5000; int64_t st = taosGetTimestampUs(); @@ -186,10 +186,15 @@ void acquireRleaseTest() { printf("%s,expect:%s", pdata->p, str3); ASSERT_TRUE(strcmp(pdata->p, str3) == 0); - + + tfree(pdata->p); + taosHashRelease(hashTable, pdata); num = taosHashGetSize(hashTable); ASSERT_EQ(num, 1); + + taosHashCleanup(hashTable); + tfree(data.p); } } diff --git a/source/util/test/pageBufferTest.cpp b/source/util/test/pageBufferTest.cpp index 8fbec31dd2..f392aac7d1 100644 --- a/source/util/test/pageBufferTest.cpp +++ b/source/util/test/pageBufferTest.cpp @@ -12,145 +12,150 @@ namespace { // simple test void simpleTest() { - SDiskbasedBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4096, "", "/tmp/"); + SDiskbasedBuf* pBuf = NULL; + int32_t ret = createDiskbasedBuf(&pBuf, 1024, 4096, "", "/tmp/"); int32_t pageId = 0; int32_t groupId = 0; - SFilePage* pBufPage = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); + SFilePage* pBufPage = static_cast(getNewBufPage(pBuf, groupId, &pageId)); ASSERT_TRUE(pBufPage != NULL); - ASSERT_EQ(getTotalBufSize(pResultBuf), 1024); + ASSERT_EQ(getTotalBufSize(pBuf), 1024); - SIDList list = getDataBufPagesIdList(pResultBuf, groupId); + SIDList list = getDataBufPagesIdList(pBuf, groupId); ASSERT_EQ(taosArrayGetSize(list), 1); - ASSERT_EQ(getNumOfBufGroupId(pResultBuf), 1); + ASSERT_EQ(getNumOfBufGroupId(pBuf), 1); - releaseBufPage(pResultBuf, pBufPage); + releaseBufPage(pBuf, pBufPage); - SFilePage* pBufPage1 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); + SFilePage* pBufPage1 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); - SFilePage* t = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* t = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t == pBufPage1); - SFilePage* pBufPage2 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t1 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage2 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t1 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t1 == pBufPage2); - SFilePage* pBufPage3 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t2 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage3 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t2 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t2 == pBufPage3); - SFilePage* pBufPage4 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t3 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage4 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t3 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t3 == pBufPage4); - SFilePage* pBufPage5 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t4 = static_cast(getBufPage(pResultBuf, pageId)); + releaseBufPage(pBuf, pBufPage2); + + SFilePage* pBufPage5 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t4 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t4 == pBufPage5); - destroyDiskbasedBuf(pResultBuf); + destroyDiskbasedBuf(pBuf); } void writeDownTest() { - SDiskbasedBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, "1", "/tmp/"); + SDiskbasedBuf* pBuf = NULL; + int32_t ret = createDiskbasedBuf(&pBuf, 1024, 4*1024, "1", "/tmp/"); int32_t pageId = 0; int32_t writePageId = 0; int32_t groupId = 0; int32_t nx = 12345; - SFilePage* pBufPage = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); + SFilePage* pBufPage = static_cast(getNewBufPage(pBuf, groupId, &pageId)); ASSERT_TRUE(pBufPage != NULL); *(int32_t*)(pBufPage->data) = nx; writePageId = pageId; - releaseBufPage(pResultBuf, pBufPage); + + setBufPageDirty(pBufPage, true); + releaseBufPage(pBuf, pBufPage); - SFilePage* pBufPage1 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t1 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage1 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t1 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(pageId == 1); - SFilePage* pBufPage2 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t2 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage2 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t2 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(pageId == 2); - SFilePage* pBufPage3 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t3 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage3 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t3 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(pageId == 3); - SFilePage* pBufPage4 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t4 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage4 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t4 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(pageId == 4); - releaseBufPage(pResultBuf, t4); + releaseBufPage(pBuf, t4); // flush the written page to disk, and read it out again - SFilePage* pBufPagex = static_cast(getBufPage(pResultBuf, writePageId)); + SFilePage* pBufPagex = static_cast(getBufPage(pBuf, writePageId)); ASSERT_EQ(*(int32_t*)pBufPagex->data, nx); - SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); + SArray* pa = getDataBufPagesIdList(pBuf, groupId); ASSERT_EQ(taosArrayGetSize(pa), 5); - destroyDiskbasedBuf(pResultBuf); + destroyDiskbasedBuf(pBuf); } void recyclePageTest() { - SDiskbasedBuf* pResultBuf = NULL; - int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, "1", "/tmp/"); + SDiskbasedBuf* pBuf = NULL; + int32_t ret = createDiskbasedBuf(&pBuf, 1024, 4*1024, "1", "/tmp/"); int32_t pageId = 0; int32_t writePageId = 0; int32_t groupId = 0; int32_t nx = 12345; - SFilePage* pBufPage = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); + SFilePage* pBufPage = static_cast(getNewBufPage(pBuf, groupId, &pageId)); ASSERT_TRUE(pBufPage != NULL); - releaseBufPage(pResultBuf, pBufPage); + releaseBufPage(pBuf, pBufPage); - SFilePage* pBufPage1 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t1 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage1 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t1 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(pageId == 1); - SFilePage* pBufPage2 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t2 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage2 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t2 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(pageId == 2); - SFilePage* pBufPage3 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t3 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage3 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t3 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(pageId == 3); - SFilePage* pBufPage4 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t4 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage4 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t4 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(pageId == 4); - releaseBufPage(pResultBuf, t4); + releaseBufPage(pBuf, t4); - SFilePage* pBufPage5 = static_cast(getNewBufPage(pResultBuf, groupId, &pageId)); - SFilePage* t5 = static_cast(getBufPage(pResultBuf, pageId)); + SFilePage* pBufPage5 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* t5 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t5 == pBufPage5); ASSERT_TRUE(pageId == 5); + releaseBufPage(pBuf, t5); // flush the written page to disk, and read it out again - SFilePage* pBufPagex = static_cast(getBufPage(pResultBuf, writePageId)); + SFilePage* pBufPagex = static_cast(getBufPage(pBuf, writePageId)); *(int32_t*)(pBufPagex->data) = nx; writePageId = pageId; // update the data - releaseBufPage(pResultBuf, pBufPagex); + releaseBufPage(pBuf, pBufPagex); - SFilePage* pBufPagex1 = static_cast(getBufPage(pResultBuf, 1)); + SFilePage* pBufPagex1 = static_cast(getBufPage(pBuf, 1)); - SArray* pa = getDataBufPagesIdList(pResultBuf, groupId); + SArray* pa = getDataBufPagesIdList(pBuf, groupId); ASSERT_EQ(taosArrayGetSize(pa), 6); - destroyDiskbasedBuf(pResultBuf); + destroyDiskbasedBuf(pBuf); } } // namespace