refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-08-02 11:38:30 +08:00
parent e4ccf44302
commit 42c11e9e36
1 changed files with 91 additions and 54 deletions

View File

@ -293,10 +293,13 @@ static int32_t createSchemaByFields(const SArray *pFields, SSchemaWrapper *pWrap
return terrno;
}
SNode *pNode;
int32_t index = 0;
for (int32_t i = 0; i < pWrapper->nCols; i++) {
SField *pField = (SField *)taosArrayGet(pFields, i);
if (pField == NULL) {
return terrno;
}
if (TSDB_DATA_TYPE_NULL == pField->type) {
pWrapper->pSchema[index].type = TSDB_DATA_TYPE_VARCHAR;
pWrapper->pSchema[index].bytes = VARSTR_HEADER_SIZE;
@ -609,6 +612,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
// build fields
for (int32_t i = 0; i < createReq.numOfColumns; i++) {
SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i);
TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
tstrncpy(pField->name, pStream->outputSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
pField->flags = pStream->outputSchema.pSchema[i].flags;
pField->type = pStream->outputSchema.pSchema[i].type;
@ -623,6 +628,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
// build tags
SField *pField = taosArrayGet(createReq.pTags, 0);
TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno);
strcpy(pField->name, "group_id");
pField->type = TSDB_DATA_TYPE_UBIGINT;
pField->flags = 0;
@ -634,6 +641,10 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
for (int32_t i = 0; i < createReq.numOfTags; i++) {
SField *pField = taosArrayGet(createReq.pTags, i);
if (pField == NULL) {
continue;
}
pField->bytes = pStream->tagSchema.pSchema[i].bytes;
pField->flags = pStream->tagSchema.pSchema[i].flags;
pField->type = pStream->tagSchema.pSchema[i].type;
@ -920,10 +931,9 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
if (p == NULL || pEntry == NULL) {
continue;
}
@ -967,8 +977,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t tlen = sizeof(SMsgHead) + blen;
@ -1002,18 +1011,20 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
int8_t mndTrigger) {
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
taosMemoryFree(buf);
return -1;
}
int32_t code = 0;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id, mndTrigger)) < 0) {
taosMemoryFree(buf);
return code;
}
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(buf);
return -1;
return code;
}
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
@ -1130,6 +1141,10 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry == NULL) {
continue;
}
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
streamMutexUnlock(&execInfo.lock);
@ -1170,7 +1185,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
bool ready = true;
if (taskNodeIsUpdated(pMnode)) {
return -1;
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
}
streamMutexLock(&execInfo.lock);
@ -1240,7 +1255,7 @@ int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
STaskId *p = taosArrayGet(pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL || pEntry->id.streamId != streamId) {
if (p == NULL || pEntry == NULL || pEntry->id.streamId != streamId) {
continue;
}
@ -1278,13 +1293,12 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t numOfCheckpointTrans = 0;
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
terrno = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
return -1;
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
}
SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));
if (pList == NULL) {
return -1;
return terrno;
}
int64_t now = taosGetTimestampMs();
@ -1353,6 +1367,9 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
for (int32_t i = 0; i < numOfQual; ++i) {
SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
if (pCheckpointInfo == NULL) {
continue;
}
SStreamObj *p = NULL;
code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
@ -1521,8 +1538,9 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
}
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
int32_t code = 0;
while (1) {
SStreamObj *pStream = NULL;
@ -1535,18 +1553,8 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
sdbCancelFetch(pSdb, pIter);
mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
return -1;
TAOS_RETURN(TSDB_CODE_MND_STREAM_MUST_BE_DELETED);
} else {
#if 0
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1;
}
#endif
// kill the related checkpoint trans
int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
if (transId != 0) {
@ -1557,10 +1565,11 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
// drop the stream obj in execInfo
removeStreamTasksInBuf(pStream, &execInfo);
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) {
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
return -1;
return code;
}
}
}
@ -1575,8 +1584,7 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
}
int32_t numOfStreams = 0;
@ -1704,8 +1712,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
SMPauseStreamReq pauseReq = {0};
if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
}
code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
@ -1715,8 +1722,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return 0;
} else {
mError("stream:%s not exist, failed to pause stream", pauseReq.name);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
}
}
@ -1736,14 +1742,14 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT);
}
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for pause, node update detected");
sdbRelease(pMnode->pSdb, pStream);
return -1;
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
}
{ // check for tasks, if tasks are not ready, not allowed to pause
@ -1753,6 +1759,9 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
if (p == NULL) {
continue;
}
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
@ -1776,13 +1785,13 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
if (!found) {
mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
sdbRelease(pMnode->pSdb, pStream);
return -1;
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
}
if (!readyToPause) {
mError("stream:%s task not ready for pause yet", pauseReq.name);
sdbRelease(pMnode->pSdb, pStream);
return -1;
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
}
}
@ -1843,13 +1852,12 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
int32_t code = 0;
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
return -1;
return terrno;
}
SMResumeStreamReq resumeReq = {0};
if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
TAOS_RETURN(TSDB_CODE_INVALID_MSG);
}
code = mndAcquireStream(pMnode, resumeReq.name, &pStream);
@ -1860,8 +1868,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return 0;
} else {
mError("stream:%s not exist, failed to resume stream", resumeReq.name);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
}
}
@ -1956,10 +1963,16 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
if (pPrevEntry == NULL) {
continue;
}
int32_t num = taosArrayGetSize(pNodeList);
for (int32_t j = 0; j < num; ++j) {
SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
if(pCurrent == NULL) {
continue;
}
if (pCurrent->nodeId == pPrevEntry->nodeId) {
if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
@ -2302,7 +2315,7 @@ void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode)
bool exist = false;
for (int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) {
SNodeEntry *pEntry = taosArrayGet(pExecNode->pNodeList, j);
if (pEntry->nodeId == pTask->info.nodeId) {
if ((pEntry != NULL) && (pEntry->nodeId == pTask->info.nodeId)) {
exist = true;
break;
}
@ -2329,14 +2342,17 @@ static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numO
int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
int32_t *pId = taosArrayGet(pList, i);
if (pId == NULL) {
continue;
}
if (taskId == *pId) {
return;
}
}
void* p = taosArrayPush(pList, &taskId);
int32_t numOfTasks = taosArrayGetSize(pList);
void *p = taosArrayPush(pList, &taskId);
if (p) {
mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", uid, numOfTasks, numOfTotal - numOfTasks);
} else {
@ -2445,6 +2461,10 @@ static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pRepor
bool existed = false;
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
STaskChkptInfo *p = taosArrayGet(pList, i);
if (p == NULL) {
continue;
}
if (p->taskId == pReport->taskId) {
existed = true;
break;
@ -2554,6 +2574,10 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pEx
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId* p = taosArrayGet(execInfo.pTaskList, i);
if (p == NULL) {
continue;
}
if (p->streamId != streamId) {
continue;
}
@ -2634,6 +2658,10 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
for (int32_t j = 0; j < num; ++j) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
if (pe == NULL) {
continue;
}
streamId = pe->req.streamId;
int32_t existed = 0;
@ -2670,9 +2698,13 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
if (taosArrayGetSize(pList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
int32_t *taskId = taosArrayGet(pList, i);
if (taskId == NULL) {
continue;
}
for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
if (pe->req.taskId == *taskId) {
if ((pe != NULL) && (pe->req.taskId == *taskId)) {
taosArrayRemove(pInfo->pTaskList, k);
break;
}
@ -2694,6 +2726,10 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
if (pStreamId == NULL) {
continue;
}
code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
}
@ -2828,7 +2864,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
if (conflict) {
return -1;
TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT);
}
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
@ -2841,6 +2877,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
if (code) {
mndTransDrop(pTrans);
return code;
}