Merge pull request #15396 from taosdata/feature/3.0_mhli
refactor(sync): add ref in log entry
This commit is contained in:
commit
5a7fbb9652
|
@ -26,6 +26,7 @@ extern "C" {
|
|||
#include "syncInt.h"
|
||||
#include "syncMessage.h"
|
||||
#include "taosdef.h"
|
||||
#include "tref.h"
|
||||
#include "tskiplist.h"
|
||||
|
||||
typedef struct SSyncRaftEntry {
|
||||
|
@ -89,6 +90,7 @@ typedef struct SRaftEntryCache {
|
|||
SSkipList* pSkipList;
|
||||
int32_t maxCount;
|
||||
int32_t currentCount;
|
||||
int32_t refMgr;
|
||||
TdThreadMutex mutex;
|
||||
SSyncNode* pSyncNode;
|
||||
} SRaftEntryCache;
|
||||
|
|
|
@ -23,6 +23,7 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
|
|||
memset(pEntry, 0, bytes);
|
||||
pEntry->bytes = bytes;
|
||||
pEntry->dataLen = dataLen;
|
||||
pEntry->rid = -1;
|
||||
return pEntry;
|
||||
}
|
||||
|
||||
|
@ -451,6 +452,11 @@ static char* keyFn(const void* pData) {
|
|||
|
||||
static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }
|
||||
|
||||
static void freeRaftEntry(void* param) {
|
||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
||||
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
||||
SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
|
||||
if (pCache == NULL) {
|
||||
|
@ -466,6 +472,7 @@ SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
|||
}
|
||||
|
||||
taosThreadMutexInit(&(pCache->mutex), NULL);
|
||||
pCache->refMgr = taosOpenRef(10, freeRaftEntry);
|
||||
pCache->maxCount = maxCount;
|
||||
pCache->currentCount = 0;
|
||||
pCache->pSyncNode = pSyncNode;
|
||||
|
@ -477,6 +484,10 @@ void raftEntryCacheDestroy(SRaftEntryCache* pCache) {
|
|||
if (pCache != NULL) {
|
||||
taosThreadMutexLock(&(pCache->mutex));
|
||||
tSkipListDestroy(pCache->pSkipList);
|
||||
if (pCache->refMgr != -1) {
|
||||
taosCloseRef(pCache->refMgr);
|
||||
pCache->refMgr = -1;
|
||||
}
|
||||
taosThreadMutexUnlock(&(pCache->mutex));
|
||||
taosThreadMutexDestroy(&(pCache->mutex));
|
||||
taosMemoryFree(pCache);
|
||||
|
@ -498,6 +509,9 @@ int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* p
|
|||
ASSERT(pSkipListNode != NULL);
|
||||
++(pCache->currentCount);
|
||||
|
||||
pEntry->rid = taosAddRef(pCache->refMgr, pEntry);
|
||||
ASSERT(pEntry->rid >= 0);
|
||||
|
||||
do {
|
||||
char eventLog[128];
|
||||
snprintf(eventLog, sizeof(eventLog), "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
|
||||
|
@ -520,6 +534,7 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index,
|
|||
if (code == 1) {
|
||||
*ppEntry = taosMemoryMalloc(pEntry->bytes);
|
||||
memcpy(*ppEntry, pEntry, pEntry->bytes);
|
||||
(*ppEntry)->rid = -1;
|
||||
} else {
|
||||
*ppEntry = NULL;
|
||||
}
|
||||
|
@ -541,6 +556,7 @@ int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index,
|
|||
SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
|
||||
ASSERT(*ppNode != NULL);
|
||||
*ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
|
||||
taosAcquireRef(pCache->refMgr, (*ppEntry)->rid);
|
||||
code = 1;
|
||||
|
||||
} else if (arraySize == 0) {
|
||||
|
@ -600,7 +616,9 @@ int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
|
|||
taosArrayPush(delNodeArray, &pNode);
|
||||
++returnCnt;
|
||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||
syncEntryDestory(pEntry);
|
||||
|
||||
// syncEntryDestory(pEntry);
|
||||
taosRemoveRef(pCache->refMgr, pEntry->rid);
|
||||
}
|
||||
tSkipListDestroyIter(pIter);
|
||||
|
||||
|
|
|
@ -5,8 +5,8 @@
|
|||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
#include "tskiplist.h"
|
||||
#include "tref.h"
|
||||
#include "tskiplist.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
|
@ -122,8 +122,6 @@ void test3() {
|
|||
raftEntryCacheLog2((char*)"==test3 write 10 entries==", pCache);
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void freeObj(void* param) {
|
||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
|
||||
syncEntryLog2((char*)"freeObj: ", pEntry);
|
||||
|
@ -143,14 +141,36 @@ void test4() {
|
|||
SSyncRaftEntry* pAcquireEntry = (SSyncRaftEntry*)taosAcquireRef(testRefId, rid);
|
||||
syncEntryLog2((char*)"acquire: ", pAcquireEntry);
|
||||
|
||||
taosAcquireRef(testRefId, rid);
|
||||
taosAcquireRef(testRefId, rid);
|
||||
taosAcquireRef(testRefId, rid);
|
||||
|
||||
taosReleaseRef(testRefId, rid);
|
||||
// taosReleaseRef(testRefId, rid);
|
||||
// taosReleaseRef(testRefId, rid);
|
||||
} while (0);
|
||||
|
||||
taosRemoveRef(testRefId, rid);
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
sTrace("taosReleaseRef, %d", i);
|
||||
taosReleaseRef(testRefId, rid);
|
||||
}
|
||||
}
|
||||
|
||||
void test5() {
|
||||
int32_t testRefId = taosOpenRef(5, freeObj);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
SSyncRaftEntry* pEntry = createEntry(i);
|
||||
ASSERT(pEntry != NULL);
|
||||
|
||||
int64_t rid = taosAddRef(testRefId, pEntry);
|
||||
sTrace("rid: %ld", rid);
|
||||
}
|
||||
|
||||
for (int64_t rid = 2; rid < 101; rid++) {
|
||||
SSyncRaftEntry* pAcquireEntry = (SSyncRaftEntry*)taosAcquireRef(testRefId, rid);
|
||||
syncEntryLog2((char*)"taosAcquireRef: ", pAcquireEntry);
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -158,11 +178,13 @@ int main(int argc, char** argv) {
|
|||
tsAsyncLog = 0;
|
||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG;
|
||||
|
||||
/*
|
||||
test1();
|
||||
test2();
|
||||
test3();
|
||||
|
||||
//test4();
|
||||
*/
|
||||
test4();
|
||||
// test5();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue