Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-31899
This commit is contained in:
commit
478414c32d
|
@ -94,6 +94,7 @@ extern int32_t tsElectInterval;
|
||||||
extern int32_t tsHeartbeatInterval;
|
extern int32_t tsHeartbeatInterval;
|
||||||
extern int32_t tsHeartbeatTimeout;
|
extern int32_t tsHeartbeatTimeout;
|
||||||
extern int32_t tsSnapReplMaxWaitN;
|
extern int32_t tsSnapReplMaxWaitN;
|
||||||
|
extern int64_t tsLogBufferMemoryAllowed; // maximum allowed log buffer size in bytes for each dnode
|
||||||
|
|
||||||
// arbitrator
|
// arbitrator
|
||||||
extern int32_t tsArbHeartBeatIntervalSec;
|
extern int32_t tsArbHeartBeatIntervalSec;
|
||||||
|
|
|
@ -332,6 +332,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_MAX_LEARNER_REPLICA 10
|
#define TSDB_MAX_LEARNER_REPLICA 10
|
||||||
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
|
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
|
||||||
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
|
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
|
||||||
|
#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5)
|
||||||
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
|
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
|
||||||
#define TSDB_SYNC_NEGOTIATION_WIN 512
|
#define TSDB_SYNC_NEGOTIATION_WIN 512
|
||||||
|
|
||||||
|
|
|
@ -81,6 +81,7 @@ int32_t tsElectInterval = 25 * 1000;
|
||||||
int32_t tsHeartbeatInterval = 1000;
|
int32_t tsHeartbeatInterval = 1000;
|
||||||
int32_t tsHeartbeatTimeout = 20 * 1000;
|
int32_t tsHeartbeatTimeout = 20 * 1000;
|
||||||
int32_t tsSnapReplMaxWaitN = 128;
|
int32_t tsSnapReplMaxWaitN = 128;
|
||||||
|
int64_t tsLogBufferMemoryAllowed = 0; // bytes
|
||||||
|
|
||||||
// mnode
|
// mnode
|
||||||
int64_t tsMndSdbWriteDelta = 200;
|
int64_t tsMndSdbWriteDelta = 200;
|
||||||
|
@ -613,7 +614,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
||||||
cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
|
cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
|
||||||
|
|
||||||
tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000);
|
tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000);
|
||||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH));
|
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_NONE));
|
||||||
|
|
||||||
tsNumOfTaskQueueThreads = tsNumOfCores * 2;
|
tsNumOfTaskQueueThreads = tsNumOfCores * 2;
|
||||||
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 16);
|
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 16);
|
||||||
|
@ -702,6 +703,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||||
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||||
|
|
||||||
|
tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||||
|
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||||
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||||
|
@ -736,6 +740,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||||
|
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||||
|
|
||||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||||
|
@ -970,6 +975,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
|
||||||
pItem->stype = stype;
|
pItem->stype = stype;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pItem = cfgGetItem(tsCfg, "syncLogBufferMemoryAllowed");
|
||||||
|
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
||||||
|
tsLogBufferMemoryAllowed = totalMemoryKB * 1024 * 0.1;
|
||||||
|
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||||
|
pItem->i64 = tsLogBufferMemoryAllowed;
|
||||||
|
pItem->stype = stype;
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1520,6 +1533,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncSnapReplMaxWaitN");
|
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncSnapReplMaxWaitN");
|
||||||
tsSnapReplMaxWaitN = pItem->i32;
|
tsSnapReplMaxWaitN = pItem->i32;
|
||||||
|
|
||||||
|
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncLogBufferMemoryAllowed");
|
||||||
|
tsLogBufferMemoryAllowed = pItem->i64;
|
||||||
|
|
||||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec");
|
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec");
|
||||||
tsArbHeartBeatIntervalSec = pItem->i32;
|
tsArbHeartBeatIntervalSec = pItem->i32;
|
||||||
|
|
||||||
|
@ -1954,6 +1970,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
|
||||||
{"randErrorChance", &tsRandErrChance},
|
{"randErrorChance", &tsRandErrChance},
|
||||||
{"randErrorDivisor", &tsRandErrDivisor},
|
{"randErrorDivisor", &tsRandErrDivisor},
|
||||||
{"randErrorScope", &tsRandErrScope},
|
{"randErrorScope", &tsRandErrScope},
|
||||||
|
{"syncLogBufferMemoryAllowed", &tsLogBufferMemoryAllowed},
|
||||||
|
|
||||||
{"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold},
|
{"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold},
|
||||||
{"checkpointInterval", &tsStreamCheckpointInterval},
|
{"checkpointInterval", &tsStreamCheckpointInterval},
|
||||||
|
|
|
@ -21,7 +21,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
#include "taosdef.h"
|
#include "tglobal.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,7 @@ typedef struct SSyncLogBuffer {
|
||||||
int64_t matchIndex;
|
int64_t matchIndex;
|
||||||
int64_t endIndex;
|
int64_t endIndex;
|
||||||
int64_t size;
|
int64_t size;
|
||||||
|
int64_t bytes;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
TdThreadMutexAttr attr;
|
TdThreadMutexAttr attr;
|
||||||
int64_t totalIndex;
|
int64_t totalIndex;
|
||||||
|
|
|
@ -28,6 +28,8 @@
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
#include "syncVoteMgr.h"
|
#include "syncVoteMgr.h"
|
||||||
|
|
||||||
|
static int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer
|
||||||
|
|
||||||
static bool syncIsMsgBlock(tmsg_t type) {
|
static bool syncIsMsgBlock(tmsg_t type) {
|
||||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
|
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
|
||||||
|
@ -101,6 +103,10 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
|
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
|
||||||
pBuf->entries[index % pBuf->size] = tmp;
|
pBuf->entries[index % pBuf->size] = tmp;
|
||||||
pBuf->endIndex = index + 1;
|
pBuf->endIndex = index + 1;
|
||||||
|
if (pNode->vgId > 1) {
|
||||||
|
pBuf->bytes += pEntry->bytes;
|
||||||
|
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||||
|
@ -260,6 +266,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
|
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
|
||||||
pBuf->entries[index % pBuf->size] = tmp;
|
pBuf->entries[index % pBuf->size] = tmp;
|
||||||
taken = true;
|
taken = true;
|
||||||
|
if (pNode->vgId > 1) {
|
||||||
|
pBuf->bytes += pEntry->bytes;
|
||||||
|
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index < toIndex) {
|
if (index < toIndex) {
|
||||||
|
@ -286,6 +296,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
}
|
}
|
||||||
SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
|
SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
|
||||||
pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
|
pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
|
||||||
|
if (pNode->vgId > 1) {
|
||||||
|
pBuf->bytes += pDummy->bytes;
|
||||||
|
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
if (index < toIndex) {
|
if (index < toIndex) {
|
||||||
pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
|
pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
|
||||||
|
@ -330,6 +344,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||||
}
|
}
|
||||||
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
|
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
|
||||||
|
pBuf->bytes = 0;
|
||||||
int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
|
int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
|
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
|
||||||
|
@ -470,8 +485,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
|
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
|
||||||
pEntry = NULL;
|
|
||||||
pBuf->entries[index % pBuf->size] = tmp;
|
pBuf->entries[index % pBuf->size] = tmp;
|
||||||
|
if (pNode->vgId > 1) {
|
||||||
|
pBuf->bytes += pEntry->bytes;
|
||||||
|
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||||
|
}
|
||||||
|
pEntry = NULL;
|
||||||
|
|
||||||
// update end index
|
// update end index
|
||||||
pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
|
pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
|
||||||
|
@ -846,14 +865,34 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
||||||
}
|
}
|
||||||
|
|
||||||
// recycle
|
// recycle
|
||||||
|
bool isVnode = pNode->vgId > 1;
|
||||||
SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
|
SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
|
||||||
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
|
do {
|
||||||
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
if ((pBuf->startIndex >= pBuf->commitIndex) ||
|
||||||
if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
|
||||||
|
atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
sError("vgId:%d, invalid log entry to recycle. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
|
||||||
|
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
|
||||||
|
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
|
||||||
|
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
if (isVnode) {
|
||||||
|
pBuf->bytes -= pEntry->bytes;
|
||||||
|
(void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||||
|
}
|
||||||
|
sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
|
||||||
|
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
|
||||||
|
", used:%" PRId64 ", allowed:%" PRId64,
|
||||||
|
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
|
||||||
|
pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
|
||||||
syncEntryDestroy(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
(void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||||
pBuf->startIndex = index + 1;
|
++pBuf->startIndex;
|
||||||
}
|
} while (true);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_out:
|
_out:
|
||||||
|
@ -1324,6 +1363,7 @@ void syncLogBufferClear(SSyncLogBuffer* pBuf) {
|
||||||
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||||
}
|
}
|
||||||
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
|
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
|
||||||
|
pBuf->bytes = 0;
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue