refactor stream backend

This commit is contained in:
yihaoDeng 2023-10-23 16:45:17 +08:00
parent 9d210ec957
commit ca1ffd584d
1 changed files with 25 additions and 25 deletions

View File

@ -2450,32 +2450,34 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t mndFindTransByName(SMnode *pMnode, char *key, int32_t *transId) {
SSdb *pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void *pIter = NULL;
bool found = false;
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) {
break;
}
if (strncmp(pTrans->opername, key, tListLen(pTrans->opername) - 1) == 0) {
*transId = pTrans->id;
found = true;
sdbRelease(pSdb, pTrans);
sdbCancelFetch(pSdb, pIter);
break;
}
sdbRelease(pSdb, pTrans);
}
return found ? 0 : -1;
}
int32_t mndResetFromCheckpoint(SMnode *pMnode) {
// find the checkpoint trans id
int32_t transId = 0;
{
SSdb *pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) {
break;
}
if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) {
transId = pTrans->id;
sdbRelease(pSdb, pTrans);
sdbCancelFetch(pSdb, pIter);
break;
}
sdbRelease(pSdb, pTrans);
}
}
if (transId == 0) {
int32_t code = mndFindTransByName(pMnode, MND_STREAM_CHECKPOINT_NAME, &transId);
if (code == -1) {
mError("failed to find the checkpoint trans, reset not executed");
return TSDB_CODE_SUCCESS;
}
@ -2492,7 +2494,6 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) {
if (pIter == NULL) {
break;
}
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid);
int32_t code = createStreamResetStatusTrans(pMnode, pStream);
if (code != TSDB_CODE_SUCCESS) {
@ -2500,7 +2501,6 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) {
return code;
}
}
return 0;
}