add trigger checkpoint

This commit is contained in:
yihaoDeng 2023-06-09 10:59:55 +00:00
parent a0c2897124
commit 404c43d1de
3 changed files with 18 additions and 12 deletions

View File

@ -33,10 +33,12 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return -1;
}
SMnodeOpt option = {.deploy = true, .numOfReplicas = createReq.replica,
.numOfTotalReplicas = createReq.replica + createReq.learnerReplica,
.selfIndex = -1, .lastIndex = createReq.lastIndex};
SMnodeOpt option = {.deploy = true,
.numOfReplicas = createReq.replica,
.numOfTotalReplicas = createReq.replica + createReq.learnerReplica,
.selfIndex = -1,
.lastIndex = createReq.lastIndex};
memcpy(option.replicas, createReq.replicas, sizeof(createReq.replicas));
for (int32_t i = 0; i < createReq.replica; ++i) {
if (createReq.replicas[i].id == pInput->pData->dnodeId) {
@ -200,6 +202,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -76,6 +76,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -822,7 +822,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
return -1;
}
if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
@ -957,7 +957,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
return -1;
}
if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) {
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
@ -1188,8 +1188,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags){
if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags) {
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
@ -1473,7 +1473,8 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj
SSchema *pTag = pNew->pTags + tag;
if (!(pTag->type == TSDB_DATA_TYPE_BINARY || pTag->type == TSDB_DATA_TYPE_NCHAR || pTag->type == TSDB_DATA_TYPE_GEOMETRY)) {
if (!(pTag->type == TSDB_DATA_TYPE_BINARY || pTag->type == TSDB_DATA_TYPE_NCHAR ||
pTag->type == TSDB_DATA_TYPE_GEOMETRY)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
@ -1501,7 +1502,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
return -1;
}
if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols){
if (pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols) {
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
@ -1593,7 +1594,8 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO
}
SSchema *pCol = pNew->pColumns + col;
if (!(pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR || pCol->type == TSDB_DATA_TYPE_GEOMETRY)) {
if (!(pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR ||
pCol->type == TSDB_DATA_TYPE_GEOMETRY)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
@ -3177,7 +3179,7 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
if (pShow->pIter == NULL) break;
} else {
fetch = true;
void *pKey = taosHashGetKey(pShow->pIter, NULL);
void *pKey = taosHashGetKey(pShow->pIter, NULL);
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
if (!pStb) continue;
}