refactor: adjust transaction code
This commit is contained in:
parent
71021334fb
commit
2f42c2e793
|
@ -121,6 +121,10 @@ extern char tsCompressor[];
|
||||||
extern int32_t tsDiskCfgNum;
|
extern int32_t tsDiskCfgNum;
|
||||||
extern SDiskCfg tsDiskCfg[];
|
extern SDiskCfg tsDiskCfg[];
|
||||||
|
|
||||||
|
// internal
|
||||||
|
extern int32_t tsTransPullupMs;
|
||||||
|
extern int32_t tsMaRebalanceMs;
|
||||||
|
|
||||||
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
|
||||||
|
|
||||||
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile,
|
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile,
|
||||||
|
|
|
@ -169,6 +169,10 @@ uint32_t tsMaxRange = 500; // max range
|
||||||
uint32_t tsCurRange = 100; // range
|
uint32_t tsCurRange = 100; // range
|
||||||
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
|
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
|
||||||
|
|
||||||
|
// internal
|
||||||
|
int32_t tsTransPullupMs = 6000;
|
||||||
|
int32_t tsMaRebalanceMs = 2000;
|
||||||
|
|
||||||
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
|
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
|
||||||
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
|
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
|
||||||
tsDiskCfg[index].level = level;
|
tsDiskCfg[index].level = level;
|
||||||
|
|
|
@ -15,37 +15,83 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
|
||||||
static SMsgCb tsDefaultMsgCb;
|
static SMsgCb tsDefaultMsgCb;
|
||||||
|
|
||||||
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
|
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
|
||||||
|
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
|
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
|
||||||
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
|
PutToQueueFp fp = pMsgCb->queueFps[qtype];
|
||||||
|
if (fp != NULL) {
|
||||||
|
return (*fp)(pMsgCb->pWrapper, pReq);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
|
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
|
||||||
return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype);
|
GetQueueSizeFp fp = pMsgCb->qsizeFp;
|
||||||
|
if (fp != NULL) {
|
||||||
|
return (*fp)(pMsgCb->pWrapper, vgId, qtype);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
|
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
|
||||||
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
|
SendReqFp fp = pMsgCb->sendReqFp;
|
||||||
|
if (fp != NULL) {
|
||||||
|
return (*fp)(pMsgCb->pWrapper, epSet, pReq);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); }
|
void tmsgSendRsp(const SRpcMsg* pRsp) {
|
||||||
|
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
|
||||||
|
if (fp != NULL) {
|
||||||
|
return (*fp)(tsDefaultMsgCb.pWrapper, pRsp);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
|
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
|
||||||
return (*tsDefaultMsgCb.sendRedirectRspFp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
|
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
|
||||||
|
if (fp != NULL) {
|
||||||
|
(*fp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
|
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
|
||||||
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
|
RegisterBrokenLinkArgFp fp = pMsgCb->registerBrokenLinkArgFp;
|
||||||
|
if (fp != NULL) {
|
||||||
|
(*fp)(pMsgCb->pWrapper, pMsg);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgReleaseHandle(void* handle, int8_t type) {
|
void tmsgReleaseHandle(void* handle, int8_t type) {
|
||||||
(*tsDefaultMsgCb.releaseHandleFp)(tsDefaultMsgCb.pWrapper, handle, type);
|
ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp;
|
||||||
|
if (fp != NULL) {
|
||||||
|
(*fp)(tsDefaultMsgCb.pWrapper, handle, type);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgReportStartup(const char* name, const char* desc) {
|
void tmsgReportStartup(const char* name, const char* desc) {
|
||||||
(*tsDefaultMsgCb.reportStartupFp)(tsDefaultMsgCb.pWrapper, name, desc);
|
ReportStartup fp = tsDefaultMsgCb.reportStartupFp;
|
||||||
|
if (fp != NULL && tsDefaultMsgCb.pWrapper != NULL) {
|
||||||
|
(*fp)(tsDefaultMsgCb.pWrapper, name, desc);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -1126,7 +1126,7 @@ static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
} else {
|
} else {
|
||||||
pTrans->code = terrno;
|
pTrans->code = terrno;
|
||||||
pTrans->failedTimes++;
|
pTrans->failedTimes++;
|
||||||
mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr());
|
mError("trans:%d, stage keep on commitLog since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
|
||||||
continueExec = false;
|
continueExec = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1162,7 +1162,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
continueExec = false;
|
continueExec = false;
|
||||||
} else {
|
} else {
|
||||||
pTrans->failedTimes++;
|
pTrans->failedTimes++;
|
||||||
mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr());
|
mError("trans:%d, stage keep on undoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
|
||||||
continueExec = false;
|
continueExec = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1179,7 +1179,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
continueExec = true;
|
continueExec = true;
|
||||||
} else {
|
} else {
|
||||||
pTrans->failedTimes++;
|
pTrans->failedTimes++;
|
||||||
mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
|
mError("trans:%d, stage keep on rollback since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
|
||||||
continueExec = false;
|
continueExec = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,9 +43,6 @@
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
|
||||||
#define MQ_TIMER_MS 2000
|
|
||||||
#define TRNAS_TIMER_MS 6000
|
|
||||||
|
|
||||||
static void *mndBuildTimerMsg(int32_t *pContLen) {
|
static void *mndBuildTimerMsg(int32_t *pContLen) {
|
||||||
SMTimerReq timerReq = {0};
|
SMTimerReq timerReq = {0};
|
||||||
|
|
||||||
|
@ -68,7 +65,7 @@ static void mndPullupTrans(void *param, void *tmrId) {
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
|
taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndCalMqRebalance(void *param, void *tmrId) {
|
static void mndCalMqRebalance(void *param, void *tmrId) {
|
||||||
|
@ -84,7 +81,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
|
||||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
|
taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndPullupTelem(void *param, void *tmrId) {
|
static void mndPullupTelem(void *param, void *tmrId) {
|
||||||
|
@ -106,12 +103,12 @@ static int32_t mndInitTimer(SMnode *pMnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
|
if (taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer)) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
|
if (taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer)) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,9 +58,12 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
strcpy(opt.replicas[0].fqdn, "localhost");
|
strcpy(opt.replicas[0].fqdn, "localhost");
|
||||||
opt.msgCb = msgCb;
|
opt.msgCb = msgCb;
|
||||||
|
|
||||||
|
tsTransPullupMs = 1000;
|
||||||
|
|
||||||
const char *mnodepath = "/tmp/mnode_test_trans";
|
const char *mnodepath = "/tmp/mnode_test_trans";
|
||||||
taosRemoveDir(mnodepath);
|
taosRemoveDir(mnodepath);
|
||||||
pMnode = mndOpen(mnodepath, &opt);
|
pMnode = mndOpen(mnodepath, &opt);
|
||||||
|
mndStart(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void SetUpTestSuite() {
|
static void SetUpTestSuite() {
|
||||||
|
@ -70,6 +73,7 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void TearDownTestSuite() {
|
static void TearDownTestSuite() {
|
||||||
|
mndStop(pMnode);
|
||||||
mndClose(pMnode);
|
mndClose(pMnode);
|
||||||
walCleanUp();
|
walCleanUp();
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
@ -128,4 +132,6 @@ TEST_F(MndTestTrans2, 01_CbFunc) {
|
||||||
EXPECT_EQ(CreateUser(acct_invalid, user2), 0);
|
EXPECT_EQ(CreateUser(acct_invalid, user2), 0);
|
||||||
pUser2 = mndAcquireUser(pMnode, user2);
|
pUser2 = mndAcquireUser(pMnode, user2);
|
||||||
ASSERT_EQ(pUser2, nullptr);
|
ASSERT_EQ(pUser2, nullptr);
|
||||||
|
|
||||||
|
mndTransPullup(pMnode);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue