refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-30 09:17:02 +08:00
parent 3730e43f0e
commit 11c9c7d936
4 changed files with 101 additions and 105 deletions

View File

@ -26,9 +26,17 @@ extern "C" {
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_VER_NUMBER 4
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
typedef struct SStreamTransInfo {
int64_t startTime;
int64_t streamUid;
int64_t streamId;
const char *name;
int32_t transId;
} SStreamTransInfo;
@ -41,7 +49,7 @@ typedef struct SVgroupChangeInfo {
// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard
// to avoid too many checkpoints for a taskk in the waiting list
typedef struct SCheckpointCandEntry {
char * pName;
char *pName;
int64_t streamId;
int64_t checkpointTs;
int64_t checkpointId;
@ -62,6 +70,9 @@ typedef struct SStreamExecInfo {
SHashObj *pTransferStateStreams;
} SStreamExecInfo;
extern SStreamExecInfo execInfo;
typedef struct SStreamTaskIter SStreamTaskIter;
typedef struct SNodeEntry {
int32_t nodeId;
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
@ -69,15 +80,11 @@ typedef struct SNodeEntry {
int64_t hbTimestamp; // second
} SNodeEntry;
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
extern SStreamExecInfo execInfo;
typedef struct SOrphanTask {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SOrphanTask;
int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode);
@ -85,49 +92,38 @@ SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock);
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
typedef struct SOrphanTask {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SOrphanTask;
// for sma
// TODO refactor
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t initStreamNodeList(SMnode *pMnode);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
typedef struct SStreamTaskIter SStreamTaskIter;
SStreamTaskIter *createTaskIter(SStreamObj *pStream);
bool taskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *taskIterGetCurrent(SStreamTaskIter *pIter);
void destroyTaskIter(SStreamTaskIter *pIter);
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter);
#ifdef __cplusplus
}

View File

@ -476,16 +476,16 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
}
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
return -1;
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
// persistent stream task for already stored ts data
if (pStream->conf.fillHistory) {
@ -1506,9 +1506,9 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
}
// add row for each task
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
if (code == TSDB_CODE_SUCCESS) {
@ -1516,7 +1516,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
taosRUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
@ -1859,16 +1859,16 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
taosWLockLatch(&pStream->lock);
SStreamTaskIter *pTaskIter = createTaskIter(pStream);
while (taskIterNextTask(pTaskIter)) {
SStreamTask *pTask = taskIterGetCurrent(pTaskIter);
SStreamTaskIter *pTaskIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pTaskIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pTaskIter);
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
epsetAssign(&entry.epset, &pTask->info.epSet);
taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
}
destroyTaskIter(pTaskIter);
destroyStreamTaskIter(pTaskIter);
taosWUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
@ -2056,9 +2056,9 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
}
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
@ -2073,13 +2073,13 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
@ -2099,7 +2099,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}

View File

@ -23,10 +23,10 @@ typedef struct SKeyInfo {
static int32_t clearFinishedTrans(SMnode* pMnode);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid) {
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamId) {
SStreamTransInfo info = {
.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamUid = streamUid};
taosHashPut(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid), &info, sizeof(SStreamTransInfo));
.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo));
return 0;
}
@ -65,7 +65,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
return 0;
}
bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* pTransName, bool lock) {
bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* pTransName, bool lock) {
if (lock) {
taosThreadMutexLock(&execInfo.lock);
}
@ -80,7 +80,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
clearFinishedTrans(pMnode);
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid));
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId));
if (pEntry != NULL) {
SStreamTransInfo tInfo = *pEntry;
@ -90,7 +90,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
return true;
@ -99,13 +99,13 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
}
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
return true;
}
} else {
mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamUid);
mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamId);
}
if (lock) {

View File

@ -26,7 +26,7 @@ struct SStreamTaskIter {
SStreamTask *pTask;
};
SStreamTaskIter* createTaskIter(SStreamObj* pStream) {
SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) {
SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
if (pIter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -42,7 +42,7 @@ SStreamTaskIter* createTaskIter(SStreamObj* pStream) {
return pIter;
}
bool taskIterNextTask(SStreamTaskIter* pIter) {
bool streamTaskIterNextTask(SStreamTaskIter* pIter) {
if (pIter->level >= pIter->totalLevel) {
pIter->pTask = NULL;
return false;
@ -70,11 +70,11 @@ bool taskIterNextTask(SStreamTaskIter* pIter) {
return false;
}
SStreamTask* taskIterGetCurrent(SStreamTaskIter* pIter) {
SStreamTask* streamTaskIterGetCurrent(SStreamTaskIter* pIter) {
return pIter->pTask;
}
void destroyTaskIter(SStreamTaskIter* pIter) {
void destroyStreamTaskIter(SStreamTaskIter* pIter) {
taosMemoryFree(pIter);
}
@ -235,16 +235,16 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
}
SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (pTask->id.taskId == pId->taskId) {
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
return pTask;
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
return NULL;
}
@ -259,12 +259,12 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
}
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
SStreamTaskIter *pIter = createTaskIter(pStream);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
return -1;
}
@ -272,7 +272,7 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
return 0;
}
@ -308,12 +308,12 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
}
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = createTaskIter(pStream);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
return -1;
}
@ -323,7 +323,7 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
return 0;
}
@ -357,16 +357,16 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas
}
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = createTaskIter(pStream);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while(taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
while(streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
if (doSetDropAction(pMnode, pTrans, pTask) < 0) {
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
return -1;
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
return 0;
}
@ -480,18 +480,18 @@ int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pI
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock);
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo);
if (code != TSDB_CODE_SUCCESS) {
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}
@ -528,18 +528,18 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
taosWLockLatch(&pStream->lock);
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter);
int32_t code = doSetResetAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) {
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
destroyTaskIter(pIter);
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}