refactor(sync): add resp syncRespCleanByTTL
This commit is contained in:
parent
eae44cd9df
commit
6437d20cda
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "syncRespMgr.h"
|
||||
#include "syncRaftEntry.h"
|
||||
#include "syncRaftStore.h"
|
||||
|
||||
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
||||
|
@ -116,4 +117,59 @@ void syncRespClean(SSyncRespMgr *pObj) {
|
|||
taosThreadMutexUnlock(&(pObj->mutex));
|
||||
}
|
||||
|
||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {}
|
||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
|
||||
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
|
||||
int cnt = 0;
|
||||
SSyncNode *pSyncNode = pObj->data;
|
||||
|
||||
SArray *delIndexArray = taosArrayInit(0, sizeof(SyncIndex));
|
||||
ASSERT(delIndexArray != NULL);
|
||||
|
||||
while (pStub) {
|
||||
size_t len;
|
||||
void *key = taosHashGetKey(pStub, &len);
|
||||
SyncIndex *pIndex = (SyncIndex *)key;
|
||||
|
||||
int64_t nowMS = taosGetTimestampMs();
|
||||
if (nowMS - pStub->createTime > ttl) {
|
||||
taosArrayPush(delIndexArray, pIndex);
|
||||
cnt++;
|
||||
|
||||
SSyncRaftEntry *pEntry = NULL;
|
||||
int32_t code = 0;
|
||||
if (pSyncNode->pLogStore != NULL) {
|
||||
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, *pIndex, &pEntry);
|
||||
if (code == 0 && pEntry != NULL) {
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, cbMeta.index);
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
cbMeta.code = TSDB_CODE_SYN_TIMEOUT;
|
||||
cbMeta.state = pSyncNode->state;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
cbMeta.term = pEntry->term;
|
||||
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
|
||||
cbMeta.flag = 0;
|
||||
|
||||
SRpcMsg rpcMsg = pStub->rpcMsg;
|
||||
rpcMsg.pCont = rpcMallocCont(pEntry->dataLen);
|
||||
memcpy(rpcMsg.pCont, pEntry->data, pEntry->dataLen);
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
|
||||
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
|
||||
}
|
||||
|
||||
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
||||
sDebug("vgId:%d, resp clean by ttl, cnt:%d, array-size:%d", pSyncNode->vgId, cnt, arraySize);
|
||||
|
||||
for (int32_t i = 0; i < arraySize; ++i) {
|
||||
SyncIndex *pIndex = taosArrayGet(delIndexArray, i);
|
||||
taosHashRemove(pObj->pRespHash, pIndex, sizeof(SyncIndex));
|
||||
}
|
||||
taosArrayDestroy(delIndexArray);
|
||||
}
|
||||
|
|
|
@ -8,7 +8,13 @@ void print(SHashObj *pNextIndex) {
|
|||
printf("----------------\n");
|
||||
uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL);
|
||||
while (p) {
|
||||
printf("%lu \n", *p);
|
||||
|
||||
size_t len;
|
||||
void* key = taosHashGetKey(p, &len);
|
||||
|
||||
SRaftId *pRaftId = (SRaftId*)key;
|
||||
|
||||
printf("key:<%lu, %d>, value:%lu \n", pRaftId->addr, pRaftId->vgId, *p);
|
||||
p = (uint64_t *)taosHashIterate(pNextIndex, p);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,9 +73,15 @@ void syncRespMgrGetAndDelTest(uint64_t i) {
|
|||
}
|
||||
}
|
||||
|
||||
SSyncNode *createSyncNode() {
|
||||
SSyncNode *pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
|
||||
memset(pSyncNode, 0, sizeof(SSyncNode));
|
||||
return pSyncNode;
|
||||
}
|
||||
|
||||
void test1() {
|
||||
printf("------- test1 ---------\n");
|
||||
pMgr = syncRespMgrCreate(NULL, 0);
|
||||
pMgr = syncRespMgrCreate(createSyncNode(), 0);
|
||||
assert(pMgr != NULL);
|
||||
|
||||
syncRespMgrInsert(10);
|
||||
|
@ -100,7 +106,7 @@ void test1() {
|
|||
|
||||
void test2() {
|
||||
printf("------- test2 ---------\n");
|
||||
pMgr = syncRespMgrCreate(NULL, 0);
|
||||
pMgr = syncRespMgrCreate(createSyncNode(), 0);
|
||||
assert(pMgr != NULL);
|
||||
|
||||
syncRespMgrInsert(10);
|
||||
|
@ -117,7 +123,7 @@ void test2() {
|
|||
|
||||
void test3() {
|
||||
printf("------- test3 ---------\n");
|
||||
pMgr = syncRespMgrCreate(NULL, 0);
|
||||
pMgr = syncRespMgrCreate(createSyncNode(), 0);
|
||||
assert(pMgr != NULL);
|
||||
|
||||
syncRespMgrInsert(10);
|
||||
|
@ -132,13 +138,34 @@ void test3() {
|
|||
syncRespMgrDestroy(pMgr);
|
||||
}
|
||||
|
||||
void test4() {
|
||||
printf("------- test4 ---------\n");
|
||||
pMgr = syncRespMgrCreate(createSyncNode(), 2);
|
||||
assert(pMgr != NULL);
|
||||
|
||||
syncRespMgrInsert(5);
|
||||
syncRespMgrPrint();
|
||||
|
||||
taosMsleep(3000);
|
||||
|
||||
syncRespMgrInsert(3);
|
||||
syncRespMgrPrint();
|
||||
|
||||
printf("====== after clean ttl \n");
|
||||
syncRespClean(pMgr);
|
||||
syncRespMgrPrint();
|
||||
|
||||
syncRespMgrDestroy(pMgr);
|
||||
}
|
||||
|
||||
int main() {
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||
sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||
logTest();
|
||||
test1();
|
||||
test2();
|
||||
test3();
|
||||
test4();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue