[td-13039] fix bug and memory leak in the unit test.
This commit is contained in:
parent
1bb958732d
commit
9b5b74b8e1
|
@ -40,56 +40,10 @@ typedef struct SCacheStatis {
|
|||
int64_t refreshCount;
|
||||
} SCacheStatis;
|
||||
|
||||
typedef struct SCacheObj SCacheObj;
|
||||
|
||||
struct STrashElem;
|
||||
|
||||
typedef struct SCacheDataNode {
|
||||
uint64_t addedTime; // the added time when this element is added or updated into cache
|
||||
uint64_t lifespan; // life duration when this element should be remove from cache
|
||||
uint64_t expireTime; // expire time
|
||||
uint64_t signature;
|
||||
struct STrashElem *pTNodeHeader; // point to trash node head
|
||||
uint16_t keySize : 15; // max key size: 32kb
|
||||
bool inTrashcan : 1; // denote if it is in trash or not
|
||||
uint32_t size; // allocated size for current SCacheDataNode
|
||||
T_REF_DECLARE()
|
||||
char *key;
|
||||
char data[];
|
||||
} SCacheDataNode;
|
||||
|
||||
typedef struct STrashElem {
|
||||
struct STrashElem *prev;
|
||||
struct STrashElem *next;
|
||||
SCacheDataNode *pData;
|
||||
} STrashElem;
|
||||
|
||||
/*
|
||||
* to accommodate the old data which has the same key value of new one in hashList
|
||||
* when an new node is put into cache, if an existed one with the same key:
|
||||
* 1. if the old one does not be referenced, update it.
|
||||
* 2. otherwise, move the old one to pTrash, addedTime the new one.
|
||||
*
|
||||
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
|
||||
*/
|
||||
typedef struct {
|
||||
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
|
||||
int64_t refreshTime;
|
||||
STrashElem *pTrash;
|
||||
char *name;
|
||||
SCacheStatis statistics;
|
||||
SHashObj *pHashTable;
|
||||
__cache_free_fn_t freeFp;
|
||||
uint32_t numOfElemsInTrash; // number of element in trash
|
||||
uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
|
||||
pthread_t refreshWorker;
|
||||
bool extendLifespan; // auto extend life span when one item is accessed.
|
||||
int64_t checkTick; // tick used to record the check times of the refresh threads
|
||||
#if defined(LINUX)
|
||||
pthread_rwlock_t lock;
|
||||
#else
|
||||
pthread_mutex_t lock;
|
||||
#endif
|
||||
} SCacheObj;
|
||||
|
||||
/**
|
||||
* initialize the cache object
|
||||
* @param keyType key type
|
||||
|
@ -141,7 +95,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data);
|
|||
* @param data
|
||||
* @return
|
||||
*/
|
||||
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data);
|
||||
void *taosCacheTransferData(SCacheObj *pCacheObj, void **data);
|
||||
|
||||
/**
|
||||
* remove data in cache, the data will not be removed immediately.
|
||||
|
|
|
@ -615,7 +615,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray
|
|||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
|
||||
SExprInfo* pExpr, int32_t numOfOutput);
|
||||
|
|
|
@ -6224,7 +6224,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
|
|||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
|
||||
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows);
|
||||
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows);
|
||||
|
||||
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||
|
||||
|
@ -6274,7 +6274,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
|
|||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
|
||||
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows);
|
||||
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows);
|
||||
|
||||
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
|
||||
pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput);
|
||||
|
@ -7273,24 +7273,18 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn
|
|||
return pOperator;
|
||||
}
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo) {
|
||||
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
||||
|
||||
initAggSup(&pInfo->aggSup, pExprInfo);
|
||||
|
||||
// todo:
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->precision = TSDB_TIME_PRECISION_MICRO;
|
||||
pInfo->win.skey = INT64_MIN;
|
||||
pInfo->win.ekey = INT64_MAX;
|
||||
pInfo->interval.intervalUnit = 's';
|
||||
pInfo->interval.slidingUnit = 's';
|
||||
pInfo->interval.interval = 1000;
|
||||
pInfo->interval.sliding = 1000;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->interval = *pInterval;
|
||||
|
||||
int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, 0, "/tmp/");
|
||||
int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
|
||||
|
||||
int32_t numOfOutput = taosArrayGetSize(pExprInfo);
|
||||
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize);
|
||||
pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity);
|
||||
|
||||
|
@ -7305,16 +7299,15 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx
|
|||
pOperator->pExpr = exprArrayDup(pExprInfo);
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->numOfOutput = taosArrayGetSize(pExprInfo);
|
||||
pOperator->info = pInfo;
|
||||
pOperator->nextDataFn = doIntervalAgg;
|
||||
pOperator->closeFn = destroyBasicOperatorInfo;
|
||||
pOperator->nextDataFn = doIntervalAgg;
|
||||
pOperator->closeFn = destroyBasicOperatorInfo;
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
|
||||
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
||||
|
||||
|
|
|
@ -548,7 +548,11 @@ TEST(testCase, time_interval_Operator_Test) {
|
|||
SOperatorInfo* p = createDummyOperator(1, 1, 2000, data_asc, 2);
|
||||
|
||||
SExecTaskInfo ti = {0};
|
||||
SOperatorInfo* pOperator = createIntervalOperatorInfo(p, pExprInfo, &ti);
|
||||
SInterval interval = {0};
|
||||
interval.sliding = interval.interval = 1000;
|
||||
interval.slidingUnit = interval.intervalUnit = 'a';
|
||||
|
||||
SOperatorInfo* pOperator = createIntervalOperatorInfo(p, pExprInfo, &interval, &ti);
|
||||
|
||||
bool newgroup = false;
|
||||
SSDataBlock* pRes = NULL;
|
||||
|
|
|
@ -15,11 +15,88 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tcache.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
|
||||
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
|
||||
static pthread_t cacheRefreshWorker = {0};
|
||||
static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT;
|
||||
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
|
||||
static SArray *pCacheArrayList = NULL;
|
||||
static bool stopRefreshWorker = false;
|
||||
static bool refreshWorkerNormalStopped = false;
|
||||
static bool refreshWorkerUnexpectedStopped = false;
|
||||
|
||||
typedef struct SCacheNode {
|
||||
uint64_t addedTime; // the added time when this element is added or updated into cache
|
||||
uint64_t lifespan; // life duration when this element should be remove from cache
|
||||
int64_t expireTime; // expire time
|
||||
uint64_t signature;
|
||||
struct STrashElem *pTNodeHeader; // point to trash node head
|
||||
uint16_t keyLen: 15; // max key size: 32kb
|
||||
bool inTrashcan : 1; // denote if it is in trash or not
|
||||
uint32_t size; // allocated size for current SCacheNode
|
||||
uint32_t dataLen;
|
||||
T_REF_DECLARE()
|
||||
struct SCacheNode *pNext;
|
||||
char *key;
|
||||
char *data;
|
||||
} SCacheNode;
|
||||
|
||||
typedef struct SCacheEntry {
|
||||
int32_t num; // number of elements in current entry
|
||||
SRWLatch latch; // entry latch
|
||||
SCacheNode *next;
|
||||
} SCacheEntry;
|
||||
|
||||
typedef struct STrashElem {
|
||||
struct STrashElem *prev;
|
||||
struct STrashElem *next;
|
||||
SCacheNode *pData;
|
||||
} STrashElem;
|
||||
|
||||
/*
|
||||
* to accommodate the old data which has the same key value of new one in hashList
|
||||
* when an new node is put into cache, if an existed one with the same key:
|
||||
* 1. if the old one does not be referenced, update it.
|
||||
* 2. otherwise, move the old one to pTrash, addedTime the new one.
|
||||
*
|
||||
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
|
||||
*/
|
||||
struct SCacheObj {
|
||||
int64_t sizeInBytes; // total allocated buffer in this hash table, SCacheObj is not included.
|
||||
int64_t refreshTime;
|
||||
char *name;
|
||||
SCacheStatis statistics;
|
||||
|
||||
SCacheEntry *pEntryList;
|
||||
size_t capacity; // number of slots
|
||||
size_t numOfElems; // number of elements in cache
|
||||
_hash_fn_t hashFp; // hash function
|
||||
__cache_free_fn_t freeFp;
|
||||
|
||||
uint32_t numOfElemsInTrash; // number of element in trash
|
||||
STrashElem *pTrash;
|
||||
|
||||
uint8_t deleting; // set the deleting flag to stop refreshing ASAP.
|
||||
pthread_t refreshWorker;
|
||||
bool extendLifespan; // auto extend life span when one item is accessed.
|
||||
int64_t checkTick; // tick used to record the check times of the refresh threads
|
||||
#if defined(LINUX)
|
||||
pthread_rwlock_t lock;
|
||||
#else
|
||||
pthread_mutex_t lock;
|
||||
#endif
|
||||
};
|
||||
|
||||
typedef struct SCacheObjTravSup {
|
||||
SCacheObj *pCacheObj;
|
||||
int64_t time;
|
||||
__cache_trav_fn_t fp;
|
||||
void *param1;
|
||||
} SCacheObjTravSup;
|
||||
|
||||
static FORCE_INLINE void __trashcan_wr_lock(SCacheObj *pCacheObj) {
|
||||
#if defined(LINUX)
|
||||
pthread_rwlock_wrlock(&pCacheObj->lock);
|
||||
#else
|
||||
|
@ -27,7 +104,7 @@ static FORCE_INLINE void __cache_wr_lock(SCacheObj *pCacheObj) {
|
|||
#endif
|
||||
}
|
||||
|
||||
static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
|
||||
static FORCE_INLINE void __trashcan_unlock(SCacheObj *pCacheObj) {
|
||||
#if defined(LINUX)
|
||||
pthread_rwlock_unlock(&pCacheObj->lock);
|
||||
#else
|
||||
|
@ -35,7 +112,7 @@ static FORCE_INLINE void __cache_unlock(SCacheObj *pCacheObj) {
|
|||
#endif
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
|
||||
static FORCE_INLINE int32_t __trashcan_lock_init(SCacheObj *pCacheObj) {
|
||||
#if defined(LINUX)
|
||||
return pthread_rwlock_init(&pCacheObj->lock, NULL);
|
||||
#else
|
||||
|
@ -43,7 +120,7 @@ static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pCacheObj) {
|
|||
#endif
|
||||
}
|
||||
|
||||
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) {
|
||||
static FORCE_INLINE void __trashcan_lock_destroy(SCacheObj *pCacheObj) {
|
||||
#if defined(LINUX)
|
||||
pthread_rwlock_destroy(&pCacheObj->lock);
|
||||
#else
|
||||
|
@ -63,14 +140,6 @@ static void doCleanupDataCache(SCacheObj *pCacheObj);
|
|||
*/
|
||||
static void *taosCacheTimedRefresh(void *handle);
|
||||
|
||||
static pthread_t cacheRefreshWorker = {0};
|
||||
static pthread_once_t cacheThreadInit = PTHREAD_ONCE_INIT;
|
||||
static pthread_mutex_t guard = PTHREAD_MUTEX_INITIALIZER;
|
||||
static SArray *pCacheArrayList = NULL;
|
||||
static bool stopRefreshWorker = false;
|
||||
static bool refreshWorkerNormalStopped = false;
|
||||
static bool refreshWorkerUnexpectedStopped = false;
|
||||
|
||||
static void doInitRefreshThread(void) {
|
||||
pCacheArrayList = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
|
@ -99,9 +168,9 @@ pthread_t doRegisterCacheObj(SCacheObj *pCacheObj) {
|
|||
* in pData. Pointer copy causes memory access error.
|
||||
* @param size size of block
|
||||
* @param lifespan total survial expiredTime from now
|
||||
* @return SCacheDataNode
|
||||
* @return SCacheNode
|
||||
*/
|
||||
static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
|
||||
static SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
|
||||
uint64_t duration);
|
||||
|
||||
/**
|
||||
|
@ -110,7 +179,7 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const
|
|||
* @param pCacheObj Cache object
|
||||
* @param pNode Cache slot object
|
||||
*/
|
||||
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode);
|
||||
static void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode);
|
||||
|
||||
/**
|
||||
* remove nodes in trash with refCount == 0 in cache
|
||||
|
@ -126,18 +195,16 @@ static void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force);
|
|||
* @param pCacheObj cache object
|
||||
* @param pNode data node
|
||||
*/
|
||||
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
||||
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheNode *pNode) {
|
||||
if (pNode->signature != (uint64_t)pNode) {
|
||||
uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
|
||||
return;
|
||||
}
|
||||
|
||||
atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
|
||||
int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
|
||||
assert(size > 0);
|
||||
atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
|
||||
|
||||
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, total num:%d size:%" PRId64 "bytes",
|
||||
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
|
||||
pCacheObj->name, pNode->key, pNode->data, pNode->size, (int)pCacheObj->numOfElems - 1, pCacheObj->sizeInBytes);
|
||||
|
||||
if (pCacheObj->freeFp) {
|
||||
pCacheObj->freeFp(pNode->data);
|
||||
|
@ -181,6 +248,45 @@ static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj *pCacheObj, STrashElem
|
|||
free(pElem);
|
||||
}
|
||||
|
||||
static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) {
|
||||
assert(pNode != NULL && pEntry != NULL);
|
||||
|
||||
pNode->pNext = pEntry->next;
|
||||
pEntry->next = pNode;
|
||||
pEntry->num += 1;
|
||||
}
|
||||
|
||||
static void removeNodeInEntryList(SCacheEntry* pe, SCacheNode* prev, SCacheNode* pNode) {
|
||||
if (prev == NULL) {
|
||||
ASSERT(pe->next == pNode);
|
||||
pe->next = pNode->pNext;
|
||||
} else {
|
||||
prev->pNext = pNode->pNext;
|
||||
}
|
||||
|
||||
pe->num -= 1;
|
||||
}
|
||||
|
||||
static FORCE_INLINE SCacheEntry* doFindEntry(SCacheObj* pCacheObj, const void* key, size_t keyLen) {
|
||||
uint32_t hashVal = (*pCacheObj->hashFp)(key, keyLen);
|
||||
int32_t slot = hashVal % pCacheObj->capacity;
|
||||
return &pCacheObj->pEntryList[slot];
|
||||
}
|
||||
|
||||
static FORCE_INLINE SCacheNode *
|
||||
doSearchInEntryList(SCacheEntry *pe, const void *key, size_t keyLen, SCacheNode** prev) {
|
||||
SCacheNode *pNode = pe->next;
|
||||
while (pNode) {
|
||||
if ((pNode->keyLen == keyLen) && memcmp(pNode->key, key, keyLen) == 0) {
|
||||
break;
|
||||
}
|
||||
*prev = pNode;
|
||||
pNode = pNode->pNext;
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool extendLifespan, __cache_free_fn_t fn,
|
||||
const char *cacheName) {
|
||||
const int32_t SLEEP_DURATION = 500; // 500 ms
|
||||
|
@ -195,39 +301,41 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pCacheObj->pHashTable = taosHashInit(4096, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
|
||||
pCacheObj->name = strdup(cacheName);
|
||||
if (pCacheObj->pHashTable == NULL) {
|
||||
pCacheObj->pEntryList = calloc(4096, sizeof(SCacheEntry));
|
||||
if (pCacheObj->pEntryList == NULL) {
|
||||
free(pCacheObj);
|
||||
uError("failed to allocate memory, reason:%s", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// set free cache node callback function
|
||||
pCacheObj->freeFp = fn;
|
||||
pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
|
||||
pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
|
||||
pCacheObj->capacity = 4096; // todo refactor
|
||||
pCacheObj->hashFp = taosGetDefaultHashFunction(keyType);
|
||||
pCacheObj->freeFp = fn;
|
||||
pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
|
||||
pCacheObj->checkTick = pCacheObj->refreshTime / SLEEP_DURATION;
|
||||
pCacheObj->extendLifespan = extendLifespan; // the TTL after the last access
|
||||
|
||||
if (__cache_lock_init(pCacheObj) != 0) {
|
||||
taosHashCleanup(pCacheObj->pHashTable);
|
||||
if (__trashcan_lock_init(pCacheObj) != 0) {
|
||||
tfree(pCacheObj->pEntryList);
|
||||
free(pCacheObj);
|
||||
|
||||
uError("failed to init lock, reason:%s", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pCacheObj->name = strdup(cacheName);
|
||||
doRegisterCacheObj(pCacheObj);
|
||||
return pCacheObj;
|
||||
}
|
||||
|
||||
void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const void *pData, size_t dataSize,
|
||||
int32_t durationMS) {
|
||||
if (pCacheObj == NULL || pCacheObj->pHashTable == NULL || pCacheObj->deleting == 1) {
|
||||
if (pCacheObj == NULL || pCacheObj->pEntryList == NULL || pCacheObj->deleting == 1) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SCacheDataNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
|
||||
SCacheNode *pNode1 = taosCreateCacheNode(key, keyLen, pData, dataSize, durationMS);
|
||||
if (pNode1 == NULL) {
|
||||
uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
|
||||
return NULL;
|
||||
|
@ -235,87 +343,77 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
|
|||
|
||||
T_REF_INC(pNode1);
|
||||
|
||||
int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
|
||||
if (succ == 0) {
|
||||
atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
|
||||
SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);
|
||||
|
||||
taosWLockLatch(&pe->latch);
|
||||
|
||||
SCacheNode *prev = NULL;
|
||||
SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev);
|
||||
|
||||
if (pNode == NULL) {
|
||||
pushfrontNodeInEntryList(pe, pNode1);
|
||||
atomic_add_fetch_64(&pCacheObj->numOfElems, 1);
|
||||
atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size);
|
||||
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
|
||||
", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
|
||||
pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
|
||||
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
|
||||
", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
|
||||
pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
|
||||
pCacheObj->sizeInBytes, (int64_t)dataSize);
|
||||
} else { // duplicated key exists
|
||||
while (1) {
|
||||
SCacheDataNode *p = NULL;
|
||||
// int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
|
||||
int32_t ret = taosHashRemove(pCacheObj->pHashTable, key, keyLen);
|
||||
// move current node to trashcan
|
||||
removeNodeInEntryList(pe, prev, pNode);
|
||||
|
||||
// add to trashcan
|
||||
if (ret == 0) {
|
||||
if (T_REF_VAL_GET(p) == 0) {
|
||||
if (pCacheObj->freeFp) {
|
||||
pCacheObj->freeFp(p->data);
|
||||
}
|
||||
|
||||
atomic_sub_fetch_64(&pCacheObj->totalSize, p->size);
|
||||
tfree(p);
|
||||
} else {
|
||||
taosAddToTrashcan(pCacheObj, p);
|
||||
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p->data);
|
||||
}
|
||||
if (T_REF_VAL_GET(pNode) == 0) {
|
||||
if (pCacheObj->freeFp) {
|
||||
pCacheObj->freeFp(pNode->data);
|
||||
}
|
||||
|
||||
assert(T_REF_VAL_GET(pNode1) == 1);
|
||||
|
||||
ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
|
||||
if (ret == 0) {
|
||||
atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
|
||||
|
||||
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
|
||||
", totalNum:%d totalSize:%" PRId64 "bytes size:%" PRId64 "bytes",
|
||||
pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime,
|
||||
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
|
||||
|
||||
return pNode1->data;
|
||||
|
||||
} else {
|
||||
// failed, try again
|
||||
}
|
||||
atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
|
||||
tfree(pNode);
|
||||
} else {
|
||||
taosAddToTrashcan(pCacheObj, pNode);
|
||||
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pNode->data);
|
||||
}
|
||||
|
||||
pushfrontNodeInEntryList(pe, pNode1);
|
||||
atomic_add_fetch_64(&pCacheObj->sizeInBytes, pNode1->size);
|
||||
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64
|
||||
", totalNum:%d sizeInBytes:%" PRId64 "bytes size:%" PRId64 "bytes",
|
||||
pCacheObj->name, key, pNode1->data, pNode1->addedTime, pNode1->expireTime, (int32_t)pCacheObj->numOfElems,
|
||||
pCacheObj->sizeInBytes, (int64_t)dataSize);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pe->latch);
|
||||
return pNode1->data;
|
||||
}
|
||||
|
||||
static void incRefFn(void *ptNode) {
|
||||
assert(ptNode != NULL);
|
||||
|
||||
SCacheDataNode **p = (SCacheDataNode **)ptNode;
|
||||
assert(T_REF_VAL_GET(*p) >= 0);
|
||||
|
||||
int32_t ret = T_REF_INC(*p);
|
||||
assert(ret > 0);
|
||||
}
|
||||
|
||||
void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) {
|
||||
if (pCacheObj == NULL || pCacheObj->deleting == 1) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (taosHashGetSize(pCacheObj->pHashTable) == 0) {
|
||||
if (pCacheObj->numOfElems == 0) {
|
||||
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// TODO remove it
|
||||
SCacheDataNode *ptNode = NULL;
|
||||
ptNode = taosHashAcquire(pCacheObj->pHashTable, key, keyLen);
|
||||
// taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode);
|
||||
SCacheNode *prev = NULL;
|
||||
SCacheEntry *pe = doFindEntry(pCacheObj, key, keyLen);
|
||||
|
||||
void *pData = (ptNode != NULL) ? ptNode->data : NULL;
|
||||
taosRLockLatch(&pe->latch);
|
||||
|
||||
SCacheNode* pNode = doSearchInEntryList(pe, key, keyLen, &prev);
|
||||
if (pNode != NULL) {
|
||||
int32_t ref = T_REF_INC(pNode);
|
||||
ASSERT(ref > 0);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&pe->latch);
|
||||
|
||||
void *pData = (pNode != NULL) ? pNode->data : NULL;
|
||||
if (pData != NULL) {
|
||||
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
|
||||
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData,
|
||||
T_REF_VAL_GET(ptNode));
|
||||
T_REF_VAL_GET(pNode));
|
||||
} else {
|
||||
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
|
||||
uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
|
||||
|
@ -328,9 +426,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
|
|||
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
||||
if (pCacheObj == NULL || data == NULL) return NULL;
|
||||
|
||||
size_t offset = offsetof(SCacheDataNode, data);
|
||||
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
|
||||
|
||||
SCacheNode *ptNode = (SCacheNode *)((char *)data - sizeof(SCacheNode));
|
||||
if (ptNode->signature != (uint64_t)ptNode) {
|
||||
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
|
||||
return NULL;
|
||||
|
@ -344,24 +440,20 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
|||
return data;
|
||||
}
|
||||
|
||||
void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
|
||||
void *taosCacheTransferData(SCacheObj *pCacheObj, void **data) {
|
||||
if (pCacheObj == NULL || data == NULL || (*data) == NULL) return NULL;
|
||||
|
||||
size_t offset = offsetof(SCacheDataNode, data);
|
||||
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
|
||||
|
||||
SCacheNode *ptNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
|
||||
if (ptNode->signature != (uint64_t)ptNode) {
|
||||
uError("cache:%s, key: %p the data from cache is invalid", pCacheObj->name, ptNode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
assert(T_REF_VAL_GET(ptNode) >= 1);
|
||||
|
||||
char *d = *data;
|
||||
|
||||
// clear its reference to old area
|
||||
*data = NULL;
|
||||
|
||||
return d;
|
||||
}
|
||||
|
||||
|
@ -379,9 +471,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
|||
// therefore the check for the empty of both the hash table and the trashcan has a race condition.
|
||||
// It happens when there is only one object in the cache, and two threads which has referenced this object
|
||||
// start to free the it simultaneously [TD-1569].
|
||||
size_t offset = offsetof(SCacheDataNode, data);
|
||||
|
||||
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
|
||||
SCacheNode *pNode = (SCacheNode *)((char *)(*data) - sizeof(SCacheNode));
|
||||
if (pNode->signature != (uint64_t)pNode) {
|
||||
uError("cache:%s, %p, release invalid cache data", pCacheObj->name, pNode);
|
||||
return;
|
||||
|
@ -420,9 +510,9 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
|||
// destroyed by refresh worker if decrease ref count before removing it from linked-list.
|
||||
assert(pNode->pTNodeHeader->pData == pNode);
|
||||
|
||||
__cache_wr_lock(pCacheObj);
|
||||
__trashcan_wr_lock(pCacheObj);
|
||||
doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
|
||||
__cache_unlock(pCacheObj);
|
||||
__trashcan_unlock(pCacheObj);
|
||||
|
||||
ref = T_REF_DEC(pNode);
|
||||
assert(ref == 0);
|
||||
|
@ -435,36 +525,37 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
|||
} else {
|
||||
// NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
|
||||
// when reaches here.
|
||||
SCacheDataNode *p = NULL;
|
||||
int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
|
||||
// int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, pNode->key, pNode->keySize, &p, sizeof(void
|
||||
// *));
|
||||
SCacheNode * prev = NULL;
|
||||
SCacheEntry *pe = doFindEntry(pCacheObj, pNode->key, pNode->keyLen);
|
||||
|
||||
taosWLockLatch(&pe->latch);
|
||||
ref = T_REF_DEC(pNode);
|
||||
|
||||
// successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
|
||||
// note that the remove operation can be executed only once.
|
||||
if (ret == 0) {
|
||||
SCacheNode *p = doSearchInEntryList(pe, pNode->key, pNode->keyLen, &prev);
|
||||
|
||||
if (p != NULL) {
|
||||
// successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
|
||||
// note that the remove operation can be executed only once.
|
||||
if (p != pNode) {
|
||||
uDebug(
|
||||
"cache:%s, key:%p, successfully removed a new entry:%p, refcnt:%d, prev entry:%p has been removed by "
|
||||
"others already",
|
||||
pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data);
|
||||
"cache:%s, key:%p, a new entry:%p found, refcnt:%d, prev entry:%p, refcnt:%d has been removed by "
|
||||
"others already, prev must in trashcan",
|
||||
pCacheObj->name, pNode->key, p->data, T_REF_VAL_GET(p), pNode->data, T_REF_VAL_GET(pNode));
|
||||
|
||||
assert(p->pTNodeHeader == NULL);
|
||||
taosAddToTrashcan(pCacheObj, p);
|
||||
assert(p->pTNodeHeader == NULL && pNode->pTNodeHeader != NULL);
|
||||
} else {
|
||||
removeNodeInEntryList(pe, prev, p);
|
||||
uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key,
|
||||
pNode->data, ref);
|
||||
if (ref > 0) {
|
||||
assert(pNode->pTNodeHeader == NULL);
|
||||
|
||||
taosAddToTrashcan(pCacheObj, pNode);
|
||||
} else { // ref == 0
|
||||
atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size);
|
||||
atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size);
|
||||
|
||||
int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
|
||||
int32_t size = (int32_t)pCacheObj->numOfElems;
|
||||
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
|
||||
pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
|
||||
pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->sizeInBytes);
|
||||
|
||||
if (pCacheObj->freeFp) {
|
||||
pCacheObj->freeFp(pNode->data);
|
||||
|
@ -473,6 +564,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
|||
free(pNode);
|
||||
}
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pe->latch);
|
||||
} else {
|
||||
uDebug("cache:%s, key:%p, %p has been removed from hash table by others already, refcnt:%d", pCacheObj->name,
|
||||
pNode->key, pNode->data, ref);
|
||||
|
@ -484,45 +577,15 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
|||
char *key = pNode->key;
|
||||
char *p = pNode->data;
|
||||
|
||||
// int32_t ref = T_REF_VAL_GET(pNode);
|
||||
//
|
||||
// if (ref == 1 && inTrashcan) {
|
||||
// // If it is the last ref, remove it from trashcan linked-list first, and then destroy it.Otherwise, it may
|
||||
// be
|
||||
// // destroyed by refresh worker if decrease ref count before removing it from linked-list.
|
||||
// assert(pNode->pTNodeHeader->pData == pNode);
|
||||
//
|
||||
// __cache_wr_lock(pCacheObj);
|
||||
// doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader);
|
||||
// __cache_unlock(pCacheObj);
|
||||
//
|
||||
// ref = T_REF_DEC(pNode);
|
||||
// assert(ref == 0);
|
||||
//
|
||||
// doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
|
||||
// } else {
|
||||
// ref = T_REF_DEC(pNode);
|
||||
// assert(ref >= 0);
|
||||
// }
|
||||
|
||||
int32_t ref = T_REF_DEC(pNode);
|
||||
uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trashcan:%d", pCacheObj->name, key, p, ref, inTrashcan);
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct SHashTravSupp {
|
||||
SCacheObj *pCacheObj;
|
||||
int64_t time;
|
||||
__cache_trav_fn_t fp;
|
||||
void *param1;
|
||||
} SHashTravSupp;
|
||||
|
||||
static bool travHashTableEmptyFn(void *param, void *data) {
|
||||
SHashTravSupp *ps = (SHashTravSupp *)param;
|
||||
static bool doRemoveNodeFn(void *param, SCacheNode *pNode) {
|
||||
SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
|
||||
SCacheObj *pCacheObj = ps->pCacheObj;
|
||||
|
||||
SCacheDataNode *pNode = *(SCacheDataNode **)data;
|
||||
|
||||
if (T_REF_VAL_GET(pNode) == 0) {
|
||||
taosCacheReleaseNode(pCacheObj, pNode);
|
||||
} else { // do add to trashcan
|
||||
|
@ -533,10 +596,38 @@ static bool travHashTableEmptyFn(void *param, void *data) {
|
|||
return false;
|
||||
}
|
||||
|
||||
void taosCacheEmpty(SCacheObj *pCacheObj) {
|
||||
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
||||
void doTraverseElems(SCacheObj* pCacheObj, bool (*fp)(void *param, SCacheNode* pNode), SCacheObjTravSup* pSup) {
|
||||
int32_t numOfEntries = (int32_t)pCacheObj->capacity;
|
||||
for (int32_t i = 0; i < numOfEntries; ++i) {
|
||||
SCacheEntry *pEntry = &pCacheObj->pEntryList[i];
|
||||
if (pEntry->num == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
|
||||
taosWLockLatch(&pEntry->latch);
|
||||
|
||||
SCacheNode *pNode = pEntry->next;
|
||||
while (pNode != NULL) {
|
||||
SCacheNode *next = pNode->pNext;
|
||||
|
||||
if (fp(pSup, pNode)) {
|
||||
pNode = pNode->pNext;
|
||||
} else {
|
||||
pEntry->next = next;
|
||||
pEntry->num -= 1;
|
||||
|
||||
atomic_sub_fetch_64(&pCacheObj->numOfElems, 1);
|
||||
pNode = next;
|
||||
}
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pEntry->latch);
|
||||
}
|
||||
}
|
||||
|
||||
void taosCacheEmpty(SCacheObj* pCacheObj) {
|
||||
SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
||||
doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
|
||||
taosTrashcanEmpty(pCacheObj, false);
|
||||
}
|
||||
|
||||
|
@ -559,38 +650,41 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
|
|||
doCleanupDataCache(pCacheObj);
|
||||
}
|
||||
|
||||
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
|
||||
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
|
||||
SCacheNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size, uint64_t duration) {
|
||||
size_t sizeInBytes = size + sizeof(SCacheNode) + keyLen;
|
||||
|
||||
SCacheDataNode *pNewNode = calloc(1, totalSize);
|
||||
SCacheNode *pNewNode = calloc(1, sizeInBytes);
|
||||
if (pNewNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
uError("failed to allocate memory, reason:%s", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pNewNode->data = (char*)pNewNode + sizeof(SCacheNode);
|
||||
pNewNode->dataLen = size;
|
||||
memcpy(pNewNode->data, pData, size);
|
||||
|
||||
pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
|
||||
pNewNode->keySize = (uint16_t)keyLen;
|
||||
pNewNode->key = (char *)pNewNode + sizeof(SCacheNode) + size;
|
||||
pNewNode->keyLen = (uint16_t)keyLen;
|
||||
|
||||
memcpy(pNewNode->key, key, keyLen);
|
||||
|
||||
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
|
||||
pNewNode->lifespan = duration;
|
||||
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
|
||||
pNewNode->lifespan = duration;
|
||||
pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
|
||||
pNewNode->signature = (uint64_t)pNewNode;
|
||||
pNewNode->size = (uint32_t)totalSize;
|
||||
pNewNode->signature = (uint64_t)pNewNode;
|
||||
pNewNode->size = (uint32_t)sizeInBytes;
|
||||
|
||||
return pNewNode;
|
||||
}
|
||||
|
||||
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
||||
void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheNode *pNode) {
|
||||
if (pNode->inTrashcan) { /* node is already in trash */
|
||||
assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
|
||||
return;
|
||||
}
|
||||
|
||||
__cache_wr_lock(pCacheObj);
|
||||
__trashcan_wr_lock(pCacheObj);
|
||||
STrashElem *pElem = calloc(1, sizeof(STrashElem));
|
||||
pElem->pData = pNode;
|
||||
pElem->prev = NULL;
|
||||
|
@ -605,14 +699,14 @@ void taosAddToTrashcan(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
|||
|
||||
pCacheObj->pTrash = pElem;
|
||||
pCacheObj->numOfElemsInTrash++;
|
||||
__cache_unlock(pCacheObj);
|
||||
__trashcan_unlock(pCacheObj);
|
||||
|
||||
uDebug("cache:%s key:%p, %p move to trashcan, pTrashElem:%p, numOfElem in trashcan:%d", pCacheObj->name, pNode->key,
|
||||
pNode->data, pElem, pCacheObj->numOfElemsInTrash);
|
||||
}
|
||||
|
||||
void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
|
||||
__cache_wr_lock(pCacheObj);
|
||||
__trashcan_wr_lock(pCacheObj);
|
||||
|
||||
if (pCacheObj->numOfElemsInTrash == 0) {
|
||||
if (pCacheObj->pTrash != NULL) {
|
||||
|
@ -621,7 +715,7 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
|
|||
pCacheObj->numOfElemsInTrash);
|
||||
}
|
||||
|
||||
__cache_unlock(pCacheObj);
|
||||
__trashcan_unlock(pCacheObj);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -646,29 +740,27 @@ void taosTrashcanEmpty(SCacheObj *pCacheObj, bool force) {
|
|||
}
|
||||
}
|
||||
|
||||
__cache_unlock(pCacheObj);
|
||||
__trashcan_unlock(pCacheObj);
|
||||
}
|
||||
|
||||
void doCleanupDataCache(SCacheObj *pCacheObj) {
|
||||
// SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
||||
// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup);
|
||||
SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()};
|
||||
doTraverseElems(pCacheObj, doRemoveNodeFn, &sup);
|
||||
|
||||
// todo memory leak if there are object with refcount greater than 0 in hash table?
|
||||
taosHashCleanup(pCacheObj->pHashTable);
|
||||
taosTrashcanEmpty(pCacheObj, true);
|
||||
|
||||
__cache_lock_destroy(pCacheObj);
|
||||
__trashcan_lock_destroy(pCacheObj);
|
||||
|
||||
tfree(pCacheObj->pEntryList);
|
||||
tfree(pCacheObj->name);
|
||||
memset(pCacheObj, 0, sizeof(SCacheObj));
|
||||
free(pCacheObj);
|
||||
}
|
||||
|
||||
bool travHashTableFn(void *param, void *data) {
|
||||
SHashTravSupp *ps = (SHashTravSupp *)param;
|
||||
bool doRemoveExpiredFn(void *param, SCacheNode* pNode) {
|
||||
SCacheObjTravSup *ps = (SCacheObjTravSup *)param;
|
||||
SCacheObj *pCacheObj = ps->pCacheObj;
|
||||
|
||||
SCacheDataNode *pNode = *(SCacheDataNode **)data;
|
||||
if ((int64_t)pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) {
|
||||
taosCacheReleaseNode(pCacheObj, pNode);
|
||||
|
||||
|
@ -687,8 +779,8 @@ bool travHashTableFn(void *param, void *data) {
|
|||
static void doCacheRefresh(SCacheObj *pCacheObj, int64_t time, __cache_trav_fn_t fp, void *param1) {
|
||||
assert(pCacheObj != NULL);
|
||||
|
||||
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
|
||||
// taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup);
|
||||
SCacheObjTravSup sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
|
||||
doTraverseElems(pCacheObj, doRemoveExpiredFn, &sup);
|
||||
}
|
||||
|
||||
void taosCacheRefreshWorkerUnexpectedStopped(void) {
|
||||
|
@ -747,7 +839,7 @@ void *taosCacheTimedRefresh(void *handle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
size_t elemInHash = taosHashGetSize(pCacheObj->pHashTable);
|
||||
size_t elemInHash = pCacheObj->numOfElems;
|
||||
if (elemInHash + pCacheObj->numOfElemsInTrash == 0) {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
|
||||
// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT
|
||||
#define MAX_WARNING_REF_COUNT 10000
|
||||
#define EXT_SIZE 1024
|
||||
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
|
||||
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
|
||||
#define HASH_INDEX(v, c) ((v) & ((c)-1))
|
||||
|
@ -211,14 +210,14 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode);
|
|||
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj);
|
||||
|
||||
/**
|
||||
* initialize a hash table
|
||||
*
|
||||
* @param capacity initial capacity of the hash table
|
||||
* @param fn hash function
|
||||
* @param update whether the hash table allows in place update
|
||||
* @param type whether the hash table has per entry lock
|
||||
* @return hash table object
|
||||
* @param pHashObj
|
||||
* @return
|
||||
*/
|
||||
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
|
||||
return taosHashGetSize(pHashObj) == 0;
|
||||
}
|
||||
|
||||
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
|
||||
if (fn == NULL) {
|
||||
assert(0);
|
||||
|
@ -296,10 +295,6 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) {
|
|||
return (int32_t)atomic_load_64(&pHashObj->size);
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
|
||||
return taosHashGetSize(pHashObj) == 0;
|
||||
}
|
||||
|
||||
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
|
||||
if (pHashObj == NULL || key == NULL || keyLen == 0) {
|
||||
return -1;
|
||||
|
@ -318,6 +313,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
|||
taosHashWUnlock(pHashObj);
|
||||
}
|
||||
|
||||
// disable resize
|
||||
taosHashRLock(pHashObj);
|
||||
|
||||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
|
@ -326,11 +322,13 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
|||
taosHashEntryWLock(pHashObj, pe);
|
||||
|
||||
SHashNode *pNode = pe->next;
|
||||
#if 0
|
||||
if (pe->num > 0) {
|
||||
assert(pNode != NULL);
|
||||
} else {
|
||||
assert(pNode == NULL);
|
||||
}
|
||||
#endif
|
||||
|
||||
SHashNode* prev = NULL;
|
||||
while (pNode) {
|
||||
|
@ -369,7 +367,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
|
|||
|
||||
// enable resize
|
||||
taosHashRUnlock(pHashObj);
|
||||
|
||||
return pHashObj->enableUpdate ? 0 : -1;
|
||||
}
|
||||
}
|
||||
|
@ -532,49 +529,6 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
|||
return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0);
|
||||
}
|
||||
|
||||
void taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) {
|
||||
if (pHashObj == NULL || taosHashTableEmpty(pHashObj) || fp == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// disable the resize process
|
||||
taosHashRLock(pHashObj);
|
||||
|
||||
int32_t numOfEntries = (int32_t)pHashObj->capacity;
|
||||
for (int32_t i = 0; i < numOfEntries; ++i) {
|
||||
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||
if (pEntry->num == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
taosHashEntryWLock(pHashObj, pEntry);
|
||||
|
||||
SHashNode *pPrevNode = NULL;
|
||||
SHashNode *pNode = pEntry->next;
|
||||
while (pNode != NULL) {
|
||||
if (fp(param, GET_HASH_NODE_DATA(pNode))) {
|
||||
pPrevNode = pNode;
|
||||
pNode = pNode->next;
|
||||
} else {
|
||||
if (pPrevNode == NULL) {
|
||||
pEntry->next = pNode->next;
|
||||
} else {
|
||||
pPrevNode->next = pNode->next;
|
||||
}
|
||||
pEntry->num -= 1;
|
||||
atomic_sub_fetch_64(&pHashObj->size, 1);
|
||||
SHashNode *next = pNode->next;
|
||||
FREE_HASH_NODE(pNode);
|
||||
pNode = next;
|
||||
}
|
||||
}
|
||||
|
||||
taosHashEntryWUnlock(pHashObj, pEntry);
|
||||
}
|
||||
|
||||
taosHashRUnlock(pHashObj);
|
||||
}
|
||||
|
||||
void taosHashClear(SHashObj *pHashObj) {
|
||||
if (pHashObj == NULL) {
|
||||
return;
|
||||
|
@ -897,6 +851,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
|
|||
taosHashRUnlock(pHashObj);
|
||||
}
|
||||
|
||||
//TODO remove it
|
||||
void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen) {
|
||||
void* p = NULL;
|
||||
return taosHashGetImpl(pHashObj, key, keyLen, &p, 0, true);
|
||||
|
|
|
@ -269,11 +269,12 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag
|
|||
SPageInfo* ppi = malloc(sizeof(SPageInfo));
|
||||
|
||||
ppi->pageId = pageId;
|
||||
ppi->pData = NULL;
|
||||
ppi->pData = NULL;
|
||||
ppi->offset = -1;
|
||||
ppi->length = -1;
|
||||
ppi->used = true;
|
||||
ppi->pn = NULL;
|
||||
ppi->used = true;
|
||||
ppi->pn = NULL;
|
||||
ppi->dirty = false;
|
||||
|
||||
return *(SPageInfo**)taosArrayPush(list, &ppi);
|
||||
}
|
||||
|
@ -471,7 +472,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
|||
|
||||
return (void*)(GET_DATA_PAYLOAD(*pi));
|
||||
} else { // not in memory
|
||||
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0);
|
||||
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
|
||||
|
||||
char* availablePage = NULL;
|
||||
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
|
||||
|
@ -493,9 +494,12 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
|
|||
lruListPushFront(pBuf->lruList, *pi);
|
||||
(*pi)->used = true;
|
||||
|
||||
int32_t code = loadPageFromDisk(pBuf, *pi);
|
||||
if (code != 0) {
|
||||
return NULL;
|
||||
// some data has been flushed to disk, and needs to be loaded into buffer again.
|
||||
if ((*pi)->length > 0 && (*pi)->offset >= 0) {
|
||||
int32_t code = loadPageFromDisk(pBuf, *pi);
|
||||
if (code != 0) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return (void*)(GET_DATA_PAYLOAD(*pi));
|
||||
|
|
|
@ -43,6 +43,9 @@ static void remove_batch_test() {
|
|||
taosArrayPush(delList, &a);
|
||||
taosArrayRemoveBatch(pa, (const int32_t*) TARRAY_GET_START(delList), taosArrayGetSize(delList));
|
||||
EXPECT_EQ(taosArrayGetSize(pa), 17);
|
||||
|
||||
taosArrayDestroy(pa);
|
||||
taosArrayDestroy(delList);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
|
@ -79,4 +82,6 @@ TEST(arrayTest, array_search_test) {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
taosArrayDestroy(pa);
|
||||
}
|
||||
|
|
|
@ -201,8 +201,8 @@ TEST(td_encode_test, encode_decode_cstr) {
|
|||
}
|
||||
}
|
||||
|
||||
delete buf;
|
||||
delete cstr;
|
||||
delete[] buf;
|
||||
delete[] cstr;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
|
@ -354,7 +354,7 @@ static int32_t tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) {
|
|||
tEndDecode(pCoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST(td_encode_test, compound_struct_encode_test) {
|
||||
SCoder encoder, decoder;
|
||||
uint8_t * buf1;
|
||||
|
@ -436,5 +436,5 @@ TEST(td_encode_test, compound_struct_encode_test) {
|
|||
GTEST_ASSERT_EQ(dreq21.v_b, req2.v_b);
|
||||
tCoderClear(&decoder);
|
||||
}
|
||||
|
||||
#endif
|
||||
#pragma GCC diagnostic pop
|
|
@ -106,7 +106,7 @@ void noLockPerformanceTest() {
|
|||
ASSERT_EQ(taosHashGetSize(hashTable), 0);
|
||||
|
||||
char key[128] = {0};
|
||||
int32_t num = 5000000;
|
||||
int32_t num = 5000;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
|
@ -186,10 +186,15 @@ void acquireRleaseTest() {
|
|||
|
||||
printf("%s,expect:%s", pdata->p, str3);
|
||||
ASSERT_TRUE(strcmp(pdata->p, str3) == 0);
|
||||
|
||||
|
||||
tfree(pdata->p);
|
||||
|
||||
taosHashRelease(hashTable, pdata);
|
||||
num = taosHashGetSize(hashTable);
|
||||
ASSERT_EQ(num, 1);
|
||||
|
||||
taosHashCleanup(hashTable);
|
||||
tfree(data.p);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -12,145 +12,150 @@
|
|||
namespace {
|
||||
// simple test
|
||||
void simpleTest() {
|
||||
SDiskbasedBuf* pResultBuf = NULL;
|
||||
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4096, "", "/tmp/");
|
||||
SDiskbasedBuf* pBuf = NULL;
|
||||
int32_t ret = createDiskbasedBuf(&pBuf, 1024, 4096, "", "/tmp/");
|
||||
|
||||
int32_t pageId = 0;
|
||||
int32_t groupId = 0;
|
||||
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
ASSERT_TRUE(pBufPage != NULL);
|
||||
|
||||
ASSERT_EQ(getTotalBufSize(pResultBuf), 1024);
|
||||
ASSERT_EQ(getTotalBufSize(pBuf), 1024);
|
||||
|
||||
SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
|
||||
SIDList list = getDataBufPagesIdList(pBuf, groupId);
|
||||
ASSERT_EQ(taosArrayGetSize(list), 1);
|
||||
ASSERT_EQ(getNumOfBufGroupId(pResultBuf), 1);
|
||||
ASSERT_EQ(getNumOfBufGroupId(pBuf), 1);
|
||||
|
||||
releaseBufPage(pResultBuf, pBufPage);
|
||||
releaseBufPage(pBuf, pBufPage);
|
||||
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
|
||||
SFilePage* t = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* t = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t == pBufPage1);
|
||||
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t1 == pBufPage2);
|
||||
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t2 == pBufPage3);
|
||||
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t3 == pBufPage4);
|
||||
|
||||
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
releaseBufPage(pBuf, pBufPage2);
|
||||
|
||||
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t4 == pBufPage5);
|
||||
|
||||
destroyDiskbasedBuf(pResultBuf);
|
||||
destroyDiskbasedBuf(pBuf);
|
||||
}
|
||||
|
||||
void writeDownTest() {
|
||||
SDiskbasedBuf* pResultBuf = NULL;
|
||||
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, "1", "/tmp/");
|
||||
SDiskbasedBuf* pBuf = NULL;
|
||||
int32_t ret = createDiskbasedBuf(&pBuf, 1024, 4*1024, "1", "/tmp/");
|
||||
|
||||
int32_t pageId = 0;
|
||||
int32_t writePageId = 0;
|
||||
int32_t groupId = 0;
|
||||
int32_t nx = 12345;
|
||||
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
ASSERT_TRUE(pBufPage != NULL);
|
||||
|
||||
*(int32_t*)(pBufPage->data) = nx;
|
||||
writePageId = pageId;
|
||||
releaseBufPage(pResultBuf, pBufPage);
|
||||
|
||||
setBufPageDirty(pBufPage, true);
|
||||
releaseBufPage(pBuf, pBufPage);
|
||||
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t1 == pBufPage1);
|
||||
ASSERT_TRUE(pageId == 1);
|
||||
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t2 == pBufPage2);
|
||||
ASSERT_TRUE(pageId == 2);
|
||||
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t3 == pBufPage3);
|
||||
ASSERT_TRUE(pageId == 3);
|
||||
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t4 == pBufPage4);
|
||||
ASSERT_TRUE(pageId == 4);
|
||||
releaseBufPage(pResultBuf, t4);
|
||||
releaseBufPage(pBuf, t4);
|
||||
|
||||
// flush the written page to disk, and read it out again
|
||||
SFilePage* pBufPagex = static_cast<SFilePage*>(getBufPage(pResultBuf, writePageId));
|
||||
SFilePage* pBufPagex = static_cast<SFilePage*>(getBufPage(pBuf, writePageId));
|
||||
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
|
||||
|
||||
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
|
||||
SArray* pa = getDataBufPagesIdList(pBuf, groupId);
|
||||
ASSERT_EQ(taosArrayGetSize(pa), 5);
|
||||
|
||||
destroyDiskbasedBuf(pResultBuf);
|
||||
destroyDiskbasedBuf(pBuf);
|
||||
}
|
||||
|
||||
void recyclePageTest() {
|
||||
SDiskbasedBuf* pResultBuf = NULL;
|
||||
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, "1", "/tmp/");
|
||||
SDiskbasedBuf* pBuf = NULL;
|
||||
int32_t ret = createDiskbasedBuf(&pBuf, 1024, 4*1024, "1", "/tmp/");
|
||||
|
||||
int32_t pageId = 0;
|
||||
int32_t writePageId = 0;
|
||||
int32_t groupId = 0;
|
||||
int32_t nx = 12345;
|
||||
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
ASSERT_TRUE(pBufPage != NULL);
|
||||
releaseBufPage(pResultBuf, pBufPage);
|
||||
releaseBufPage(pBuf, pBufPage);
|
||||
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t1 == pBufPage1);
|
||||
ASSERT_TRUE(pageId == 1);
|
||||
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t2 == pBufPage2);
|
||||
ASSERT_TRUE(pageId == 2);
|
||||
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t3 == pBufPage3);
|
||||
ASSERT_TRUE(pageId == 3);
|
||||
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t4 == pBufPage4);
|
||||
ASSERT_TRUE(pageId == 4);
|
||||
releaseBufPage(pResultBuf, t4);
|
||||
releaseBufPage(pBuf, t4);
|
||||
|
||||
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pResultBuf, groupId, &pageId));
|
||||
SFilePage* t5 = static_cast<SFilePage*>(getBufPage(pResultBuf, pageId));
|
||||
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* t5 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t5 == pBufPage5);
|
||||
ASSERT_TRUE(pageId == 5);
|
||||
releaseBufPage(pBuf, t5);
|
||||
|
||||
// flush the written page to disk, and read it out again
|
||||
SFilePage* pBufPagex = static_cast<SFilePage*>(getBufPage(pResultBuf, writePageId));
|
||||
SFilePage* pBufPagex = static_cast<SFilePage*>(getBufPage(pBuf, writePageId));
|
||||
*(int32_t*)(pBufPagex->data) = nx;
|
||||
writePageId = pageId; // update the data
|
||||
releaseBufPage(pResultBuf, pBufPagex);
|
||||
releaseBufPage(pBuf, pBufPagex);
|
||||
|
||||
SFilePage* pBufPagex1 = static_cast<SFilePage*>(getBufPage(pResultBuf, 1));
|
||||
SFilePage* pBufPagex1 = static_cast<SFilePage*>(getBufPage(pBuf, 1));
|
||||
|
||||
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
|
||||
SArray* pa = getDataBufPagesIdList(pBuf, groupId);
|
||||
ASSERT_EQ(taosArrayGetSize(pa), 6);
|
||||
|
||||
destroyDiskbasedBuf(pResultBuf);
|
||||
destroyDiskbasedBuf(pBuf);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
|
|
Loading…
Reference in New Issue