refact: adjust sync resp mgr
This commit is contained in:
parent
4112f1c1f0
commit
a4e96ca833
|
@ -41,13 +41,12 @@ typedef struct SSyncRespMgr {
|
||||||
|
|
||||||
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl);
|
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl);
|
||||||
void syncRespMgrDestroy(SSyncRespMgr *pObj);
|
void syncRespMgrDestroy(SSyncRespMgr *pObj);
|
||||||
int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub);
|
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub);
|
||||||
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index);
|
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq);
|
||||||
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
|
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub);
|
||||||
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
|
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *pInfo);
|
||||||
void syncRespClean(SSyncRespMgr *pObj);
|
void syncRespClean(SSyncRespMgr *pObj);
|
||||||
void syncRespCleanRsp(SSyncRespMgr *pObj);
|
void syncRespCleanRsp(SSyncRespMgr *pObj);
|
||||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,9 +26,6 @@ typedef struct SRaftId {
|
||||||
SyncGroupId vgId;
|
SyncGroupId vgId;
|
||||||
} SRaftId;
|
} SRaftId;
|
||||||
|
|
||||||
// for compatibility, the same as syncPropose
|
|
||||||
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
|
||||||
|
|
||||||
// ------------------ for debug -------------------
|
// ------------------ for debug -------------------
|
||||||
void syncRpcMsgPrint(SRpcMsg* pMsg);
|
void syncRpcMsgPrint(SRpcMsg* pMsg);
|
||||||
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
|
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -452,11 +452,6 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
|
||||||
int32_t ret = syncPropose(rid, pMsg, isWeak);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSyncState syncGetState(int64_t rid) {
|
SSyncState syncGetState(int64_t rid) {
|
||||||
SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
|
SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
|
||||||
|
|
||||||
|
@ -558,109 +553,27 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
|
||||||
return lastIndex;
|
return lastIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
SyncTerm syncGetMyTerm(int64_t rid) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
return TAOS_SYNC_STATE_ERROR;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
SyncTerm term = pSyncNode->pRaftStore->currentTerm;
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return term;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncIndex syncGetLastIndex(int64_t rid) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
return SYNC_INDEX_INVALID;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return lastIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncIndex syncGetCommitIndex(int64_t rid) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
return SYNC_INDEX_INVALID;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
SyncIndex cmtIndex = pSyncNode->commitIndex;
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return cmtIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncGroupId syncGetVgId(int64_t rid) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
return TAOS_SYNC_STATE_ERROR;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
SyncGroupId vgId = pSyncNode->vgId;
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
return vgId;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
|
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
memset(pEpSet, 0, sizeof(*pEpSet));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
pEpSet->numOfEps = 0;
|
|
||||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
|
||||||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
|
||||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
|
||||||
(pEpSet->numOfEps)++;
|
|
||||||
sInfo("vgId:%d, sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
|
||||||
}
|
|
||||||
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
|
|
||||||
sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
memset(pEpSet, 0, sizeof(*pEpSet));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pEpSet->numOfEps = 0;
|
pEpSet->numOfEps = 0;
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
|
if (pSyncNode == NULL) return;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
SEp* pEp = &pEpSet->eps[i];
|
||||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
|
||||||
(pEpSet->numOfEps)++;
|
pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||||
sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn,
|
pEpSet->numOfEps++;
|
||||||
pEpSet->eps[i].port);
|
sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
|
||||||
}
|
}
|
||||||
if (pEpSet->numOfEps > 0) {
|
if (pEpSet->numOfEps > 0) {
|
||||||
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
|
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
|
||||||
}
|
}
|
||||||
sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
|
||||||
|
|
||||||
|
sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) {
|
|
||||||
SRespStub stub;
|
|
||||||
int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
|
|
||||||
if (ret == 1) {
|
|
||||||
*pInfo = stub.rpcMsg.info;
|
|
||||||
}
|
|
||||||
|
|
||||||
sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
@ -2759,7 +2672,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
||||||
.flag = flag,
|
.flag = flag,
|
||||||
};
|
};
|
||||||
|
|
||||||
syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info);
|
syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
|
||||||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
|
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,21 +13,22 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "syncRespMgr.h"
|
#include "syncRespMgr.h"
|
||||||
#include "syncRaftEntry.h"
|
#include "syncRaftEntry.h"
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
|
|
||||||
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
||||||
SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
|
SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr));
|
||||||
if (pObj == NULL) {
|
if (pObj == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memset(pObj, 0, sizeof(SSyncRespMgr));
|
|
||||||
|
|
||||||
pObj->pRespHash =
|
pObj->pRespHash =
|
||||||
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
ASSERT(pObj->pRespHash != NULL);
|
if (pObj->pRespHash == NULL) return NULL;
|
||||||
|
|
||||||
pObj->ttl = ttl;
|
pObj->ttl = ttl;
|
||||||
pObj->data = data;
|
pObj->data = data;
|
||||||
pObj->seqNum = 0;
|
pObj->seqNum = 0;
|
||||||
|
@ -38,93 +39,84 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
||||||
|
|
||||||
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
||||||
if (pObj != NULL) {
|
if (pObj != NULL) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
taosHashCleanup(pObj->pRespHash);
|
taosHashCleanup(pObj->pRespHash);
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
taosThreadMutexDestroy(&(pObj->mutex));
|
taosThreadMutexDestroy(&(pObj->mutex));
|
||||||
taosMemoryFree(pObj);
|
taosMemoryFree(pObj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
|
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
|
||||||
uint64_t keyCode = ++(pObj->seqNum);
|
uint64_t seq = ++(pObj->seqNum);
|
||||||
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
|
int32_t code = taosHashPut(pObj->pRespHash, &seq, sizeof(uint64_t), pStub, sizeof(SRespStub));
|
||||||
|
sNTrace(pObj->data, "save message handle:%p, type:%s seq:%" PRIu64 " code:0x%x", pStub->rpcMsg.info.handle,
|
||||||
|
TMSG_INFO(pStub->rpcMsg.msgType), seq, code);
|
||||||
|
|
||||||
sNTrace(pObj->data, "save message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType),
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
keyCode, pStub->rpcMsg.info.handle);
|
return seq;
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
return keyCode;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index) {
|
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
|
||||||
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
|
int32_t code = taosHashRemove(pObj->pRespHash, &seq, sizeof(seq));
|
||||||
|
sNTrace(pObj->data, "remove message handle, seq:%" PRIu64 " code:%d", seq, code);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
|
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
|
||||||
void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
|
SRespStub *pTmp = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
|
||||||
if (pTmp != NULL) {
|
if (pTmp != NULL) {
|
||||||
memcpy(pStub, pTmp, sizeof(SRespStub));
|
memcpy(pStub, pTmp, sizeof(SRespStub));
|
||||||
|
sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType), seq,
|
||||||
|
pStub->rpcMsg.info.handle);
|
||||||
|
|
||||||
sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType),
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
index, pStub->rpcMsg.info.handle);
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
return 1; // get one object
|
return 1; // get one object
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return 0; // get none object
|
return 0; // get none object
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
|
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *pInfo) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
|
||||||
void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
|
SRespStub *pStub = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
|
||||||
if (pTmp != NULL) {
|
if (pStub != NULL) {
|
||||||
memcpy(pStub, pTmp, sizeof(SRespStub));
|
*pInfo = pStub->rpcMsg.info;
|
||||||
|
sNTrace(pObj->data, "get-and-del message handle:%p, type:%s seq:%" PRIu64, pStub->rpcMsg.info.handle,
|
||||||
|
TMSG_INFO(pStub->rpcMsg.msgType), seq);
|
||||||
|
taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t));
|
||||||
|
|
||||||
sNTrace(pObj->data, "get-and-del message handle, type:%s seq:%" PRIu64 " handle:%p",
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle);
|
|
||||||
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
return 1; // get one object
|
return 1; // get one object
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return 0; // get none object
|
return 0; // get none object
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
|
||||||
syncRespCleanByTTL(pObj, -1, true);
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRespClean(SSyncRespMgr *pObj) {
|
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
|
||||||
syncRespCleanByTTL(pObj, pObj->ttl, false);
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
|
||||||
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
|
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
|
||||||
int cnt = 0;
|
int cnt = 0;
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
SSyncNode *pSyncNode = pObj->data;
|
SSyncNode *pSyncNode = pObj->data;
|
||||||
|
|
||||||
SArray *delIndexArray = taosArrayInit(0, sizeof(uint64_t));
|
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
|
||||||
ASSERT(delIndexArray != NULL);
|
if (delIndexArray == NULL) return;
|
||||||
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
|
|
||||||
|
|
||||||
|
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
|
||||||
while (pStub) {
|
while (pStub) {
|
||||||
size_t len;
|
size_t len;
|
||||||
void * key = taosHashGetKey(pStub, &len);
|
void *key = taosHashGetKey(pStub, &len);
|
||||||
uint64_t *pSeqNum = (uint64_t *)key;
|
uint64_t *pSeqNum = (uint64_t *)key;
|
||||||
sum++;
|
sum++;
|
||||||
|
|
||||||
|
@ -149,15 +141,15 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
pStub->rpcMsg.contLen = 0;
|
pStub->rpcMsg.contLen = 0;
|
||||||
|
|
||||||
// TODO: and make rpcMsg body, call commit cb
|
// TODO: and make rpcMsg body, call commit cb
|
||||||
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
|
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
|
||||||
|
|
||||||
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
|
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
if (pStub->rpcMsg.info.handle != NULL) {
|
if (pStub->rpcMsg.info.handle != NULL) {
|
||||||
tmsgSendRsp(&(pStub->rpcMsg));
|
tmsgSendRsp(&pStub->rpcMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
|
pStub = taosHashIterate(pObj->pRespHash, pStub);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
||||||
|
@ -170,3 +162,15 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(delIndexArray);
|
taosArrayDestroy(delIndexArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
||||||
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
syncRespCleanByTTL(pObj, -1, true);
|
||||||
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRespClean(SSyncRespMgr *pObj) {
|
||||||
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
|
syncRespCleanByTTL(pObj, pObj->ttl, false);
|
||||||
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue