add trigger checkpoint

This commit is contained in:
yihaoDeng 2023-06-09 11:00:07 +00:00
parent 404c43d1de
commit 74797f5ba6
1 changed files with 48 additions and 50 deletions

View File

@ -144,7 +144,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
pCfg->syncCfg.replicaNum++; pCfg->syncCfg.replicaNum++;
} }
if(pCreate->selfIndex != -1){ if (pCreate->selfIndex != -1) {
pCfg->syncCfg.myIndex = pCreate->selfIndex; pCfg->syncCfg.myIndex = pCreate->selfIndex;
} }
for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) { for (int32_t i = pCfg->syncCfg.replicaNum; i < pCreate->replica + pCreate->learnerReplica; ++i) {
@ -157,7 +157,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->syncCfg.totalReplicaNum++; pCfg->syncCfg.totalReplicaNum++;
} }
pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum; pCfg->syncCfg.totalReplicaNum += pCfg->syncCfg.replicaNum;
if(pCreate->learnerSelfIndex != -1){ if (pCreate->learnerSelfIndex != -1) {
pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex; pCfg->syncCfg.myIndex = pCreate->replica + pCreate->learnerSelfIndex;
} }
} }
@ -201,38 +201,37 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if(req.learnerReplica == 0) if (req.learnerReplica == 0) {
{
req.learnerSelfIndex = -1; req.learnerSelfIndex = -1;
} }
dInfo("vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64 dInfo(
", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64 "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d" "szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64 ", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d " ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
"learnerReplica:%d learnerSelfIndex:%d strict:%d", ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, "learnerReplica:%d learnerSelfIndex:%d strict:%d",
(uint64_t)req.buffer * 1024 * 1024, req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid, (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression, req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize, req.isTsma, req.precision, req.compression, req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel,
req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.walRetentionPeriod, req.walRetentionSize, req.walRollPeriod, req.walSegmentSize, req.hashMethod,
req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict); req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica,
req.learnerSelfIndex, req.strict);
for (int32_t i = 0; i < req.replica; ++i) { for (int32_t i = 0; i < req.replica; ++i) {
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port, dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
req.replicas[i].id); req.replicas[i].id);
} }
for (int32_t i = 0; i < req.learnerReplica; ++i) { for (int32_t i = 0; i < req.learnerReplica; ++i) {
dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn, dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
req.learnerReplicas[i].port, req.replicas[i].id); req.learnerReplicas[i].port, req.replicas[i].id);
} }
SReplica *pReplica = NULL; SReplica *pReplica = NULL;
if(req.selfIndex != -1){ if (req.selfIndex != -1) {
pReplica = &req.replicas[req.selfIndex]; pReplica = &req.replicas[req.selfIndex];
} } else {
else{
pReplica = &req.learnerReplicas[req.learnerSelfIndex]; pReplica = &req.learnerReplicas[req.learnerSelfIndex];
} }
if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort || if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
@ -279,7 +278,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
} }
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
tFreeSCreateVnodeReq(&req); tFreeSCreateVnodeReq(&req);
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr()); dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
code = terrno; code = terrno;
@ -326,8 +325,8 @@ _OVER:
vnodeClose(pImpl); vnodeClose(pImpl);
vnodeDestroy(path, pMgmt->pTfs); vnodeDestroy(path, pMgmt->pTfs);
} else { } else {
dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
req.vgId, TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
} }
tFreeSCreateVnodeReq(&req); tFreeSCreateVnodeReq(&req);
@ -342,12 +341,12 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if(req.learnerReplicas == 0){ if (req.learnerReplicas == 0) {
req.learnerSelfIndex = -1; req.learnerSelfIndex = -1;
} }
dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
req.vgId, TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
@ -358,7 +357,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
ESyncRole role = vnodeGetRole(pVnode->pImpl); ESyncRole role = vnodeGetRole(pVnode->pImpl);
dInfo("vgId:%d, checking node role:%d", req.vgId, role); dInfo("vgId:%d, checking node role:%d", req.vgId, role);
if(role == TAOS_SYNC_ROLE_VOTER){ if (role == TAOS_SYNC_ROLE_VOTER) {
dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role); dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
terrno = TSDB_CODE_VND_ALREADY_IS_VOTER; terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
@ -366,7 +365,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
dInfo("vgId:%d, checking node catch up", req.vgId); dInfo("vgId:%d, checking node catch up", req.vgId);
if(vnodeIsCatchUp(pVnode->pImpl) != 1){ if (vnodeIsCatchUp(pVnode->pImpl) != 1) {
terrno = TSDB_CODE_VND_NOT_CATCH_UP; terrno = TSDB_CODE_VND_NOT_CATCH_UP;
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
return -1; return -1;
@ -386,9 +385,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id); dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
} }
if (req.replica <= 0 || if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
(req.selfIndex < 0 && req.learnerSelfIndex <0)|| req.learnerSelfIndex >= req.learnerReplica) {
req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId); dError("vgId:%d, failed to alter replica since invalid msg", vgId);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
@ -396,10 +394,9 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
SReplica *pReplica = NULL; SReplica *pReplica = NULL;
if(req.selfIndex != -1){ if (req.selfIndex != -1) {
pReplica = &req.replicas[req.selfIndex]; pReplica = &req.replicas[req.selfIndex];
} } else {
else{
pReplica = &req.learnerReplicas[req.learnerSelfIndex]; pReplica = &req.learnerReplicas[req.learnerSelfIndex];
} }
@ -448,7 +445,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered", dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
req.vgId, TMSG_INFO(pMsg->msgType)); req.vgId, TMSG_INFO(pMsg->msgType));
return 0; return 0;
} }
@ -546,15 +543,16 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if(alterReq.learnerReplica == 0){ if (alterReq.learnerReplica == 0) {
alterReq.learnerSelfIndex = -1; alterReq.learnerSelfIndex = -1;
} }
int32_t vgId = alterReq.vgId; int32_t vgId = alterReq.vgId;
dInfo("vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d " dInfo(
"learnerSelfIndex:%d strict:%d", "vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, "learnerSelfIndex:%d strict:%d",
alterReq.learnerSelfIndex, alterReq.strict); vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
for (int32_t i = 0; i < alterReq.replica; ++i) { for (int32_t i = 0; i < alterReq.replica; ++i) {
SReplica *pReplica = &alterReq.replicas[i]; SReplica *pReplica = &alterReq.replicas[i];
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
@ -564,8 +562,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
} }
if (alterReq.replica <= 0 || if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
(alterReq.selfIndex < 0 && alterReq.learnerSelfIndex <0)||
alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) { alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId); dError("vgId:%d, failed to alter replica since invalid msg", vgId);
@ -573,10 +570,9 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
SReplica *pReplica = NULL; SReplica *pReplica = NULL;
if(alterReq.selfIndex != -1){ if (alterReq.selfIndex != -1) {
pReplica = &alterReq.replicas[alterReq.selfIndex]; pReplica = &alterReq.replicas[alterReq.selfIndex];
} } else {
else{
pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex]; pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
} }
@ -630,10 +626,11 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
dInfo("vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d " dInfo(
"learnerSelfIndex:%d strict:%d", "vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, "learnerSelfIndex:%d strict:%d",
alterReq.learnerSelfIndex, alterReq.strict); vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
return 0; return 0;
} }
@ -733,6 +730,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;