fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-07-22 13:31:57 +08:00
parent ebe1d09d74
commit be162fb62f
7 changed files with 655 additions and 345 deletions

View File

@ -113,38 +113,39 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode, int32_t acceptCode); int32_t retryCode, int32_t acceptCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg); int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
const char *pMsg, STrans **pTrans1);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq); int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t extractStreamNodeList(SMnode *pMnode); int32_t extractStreamNodeList(SMnode *pMnode);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream); int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
int64_t ts); int64_t ts);
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter);
void destroyStreamTaskIter(SStreamTaskIter *pIter); void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter); bool streamTaskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask);
void mndInitExecInfo(); int32_t mndInitExecInfo();
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo); int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo);

View File

@ -134,17 +134,18 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
mndInitExecInfo(); int32_t code = mndInitExecInfo();
if (code) {
if (sdbSetTable(pMnode->pSdb, table) != 0) { return code;
return -1;
} }
if (sdbSetTable(pMnode->pSdb, tableSeq) != 0) { code = sdbSetTable(pMnode->pSdb, table);
return -1; if (code) {
return terrno;
} }
return 0; code = sdbSetTable(pMnode->pSdb, tableSeq);
return code;
} }
void mndCleanupStream(SMnode *pMnode) { void mndCleanupStream(SMnode *pMnode) {
@ -252,6 +253,8 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
} }
int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) { int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) {
terrno = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
(*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName); (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName);
if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
@ -530,9 +533,21 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
} }
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream); SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create task iter for stream:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) { if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
return -1; return -1;
@ -727,7 +742,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
} }
code = mndAcquireStream(pMnode, createReq.name, &pStream); code = mndAcquireStream(pMnode, createReq.name, &pStream);
if (pStream != NULL || code != 0) { if (pStream != NULL || code == 0) {
if (createReq.igExists) { if (createReq.igExists) {
mInfo("stream:%s, already exist, ignore exist is set", createReq.name); mInfo("stream:%s, already exist, ignore exist is set", createReq.name);
goto _OVER; goto _OVER;
@ -760,8 +775,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg); STrans *pTrans = NULL;
if (pTrans == NULL) { code = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg, &pTrans);
if (pTrans == NULL || code) {
goto _OVER; goto _OVER;
} }
@ -802,11 +818,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// add into buffer firstly // add into buffer firstly
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name); mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo); saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
// mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid); streamMutexUnlock(&execInfo.lock);
taosThreadMutexUnlock(&execInfo.lock);
// execute creation // execute creation
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
@ -867,7 +882,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
{ // check the max checkpoint id from all vnodes. { // check the max checkpoint id from all vnodes.
int64_t maxCheckpointId = -1; int64_t maxCheckpointId = -1;
if (lock) { if (lock) {
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
} }
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
@ -888,7 +903,7 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
} }
if (lock) { if (lock) {
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
} }
if (maxCheckpointId > maxChkptId) { if (maxCheckpointId > maxChkptId) {
@ -989,11 +1004,13 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1; return -1;
} }
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME, STrans *pTrans = NULL;
"gen checkpoint for stream"); code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHECKPOINT_NAME,
if (pTrans == NULL) { "gen checkpoint for stream", &pTrans);
if (pTrans == NULL || code) {
code = TSDB_CODE_MND_TRANS_CONFLICT;
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); tstrerror(code));
goto _ERR; goto _ERR;
} }
@ -1033,7 +1050,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) { if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
return code; goto _ERR;
} }
if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) { if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) {
@ -1057,13 +1074,13 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
static bool taskNodeIsUpdated(SMnode *pMnode) { static bool taskNodeIsUpdated(SMnode *pMnode) {
// check if the node update happens or not // check if the node update happens or not
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
int32_t numOfNodes = extractStreamNodeList(pMnode); int32_t numOfNodes = extractStreamNodeList(pMnode);
if (numOfNodes == 0) { if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing"); mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec(); execInfo.ts = taosGetTimestampSec();
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
return false; return false;
} }
@ -1071,7 +1088,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->stageUpdated) { if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued"); mDebug("stream task not ready due to node update detected, checkpoint not issued");
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
return true; return true;
} }
} }
@ -1086,7 +1103,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
if (!allReady) { if (!allReady) {
mWarn("not all vnodes ready, quit from vnodes status check"); mWarn("not all vnodes ready, quit from vnodes status check");
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
return true; return true;
} }
@ -1102,7 +1119,7 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
mDebug("stream tasks not ready due to node update"); mDebug("stream tasks not ready due to node update");
} }
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
return nodeUpdated; return nodeUpdated;
} }
@ -1112,7 +1129,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
return -1; return -1;
} }
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
if (taosArrayGetSize(execInfo.pNodeList) == 0) { if (taosArrayGetSize(execInfo.pNodeList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing"); mDebug("stream task node change checking done, no vgroups exist, do nothing");
ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0); ASSERT(taosArrayGetSize(execInfo.pTaskList) == 0);
@ -1157,7 +1174,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
removeTasksInBuf(pInvalidList, &execInfo); removeTasksInBuf(pInvalidList, &execInfo);
taosArrayDestroy(pInvalidList); taosArrayDestroy(pInvalidList);
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
return ready ? 0 : -1; return ready ? 0 : -1;
} }
@ -1220,14 +1237,14 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
continue; continue;
} }
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) { if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
continue; continue;
} }
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration}; SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
taosArrayPush(pList, &in); taosArrayPush(pList, &in);
@ -1270,8 +1287,9 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
for (int32_t i = 0; i < numOfQual; ++i) { for (int32_t i = 0; i < numOfQual; ++i) {
SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i); SCheckpointInterval *pCheckpointInfo = taosArrayGet(pList, i);
SStreamObj *p = mndGetStreamObj(pMnode, pCheckpointInfo->streamId); SStreamObj *p = NULL;
if (p != NULL) { code = mndGetStreamObj(pMnode, pCheckpointInfo->streamId, &p);
if (p != NULL || code != 0) {
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true); code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
sdbRelease(pSdb, p); sdbRelease(pSdb, p);
@ -1362,8 +1380,9 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); STrans *pTrans = NULL;
if (pTrans == NULL) { code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
if (pTrans == NULL || code) {
mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, terrstr()); mError("stream:%s uid:0x%" PRIx64 " failed to drop since %s", dropReq.name, pStream->uid, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq); tFreeMDropStreamReq(&dropReq);
@ -1863,9 +1882,9 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
int32_t numOfRows = 0; int32_t numOfRows = 0;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
mndInitStreamExecInfo(pMnode, &execInfo); mndInitStreamExecInfo(pMnode, &execInfo);
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
while (numOfRows < rowsCapacity) { while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
@ -1882,11 +1901,24 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
} }
// add row for each task // add row for each task
SStreamTaskIter *pIter = createStreamTaskIter(pStream); SStreamTaskIter *pIter = NULL;
while (streamTaskIterNextTask(pIter)) { int32_t code = createStreamTaskIter(pStream, &pIter);
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); if (code) {
taosRUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
mError("failed to create task iter for stream:%s", pStream->name);
continue;
}
int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
break;
}
code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
numOfRows++; numOfRows++;
} }
@ -1961,7 +1993,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
{ // check for tasks, if tasks are not ready, not allowed to pause { // check for tasks, if tasks are not ready, not allowed to pause
bool found = false; bool found = false;
bool readyToPause = true; bool readyToPause = true;
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i); STaskId *p = taosArrayGet(execInfo.pTaskList, i);
@ -1984,7 +2016,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
found = true; found = true;
} }
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
if (!found) { if (!found) {
mError("stream:%s task not report status yet, not ready for pause", pauseReq.name); mError("stream:%s task not report status yet, not ready for pause", pauseReq.name);
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -1998,42 +2030,49 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
} }
} }
STrans *pTrans = STrans *pTrans = NULL;
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream"); code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_PAUSE_NAME, "pause the stream", &pTrans);
if (pTrans == NULL) { if (pTrans == NULL || code) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return -1; return code;
} }
code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid); code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
if (code) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
// if nodeUpdate happened, not send pause trans // if nodeUpdate happened, not send pause trans
if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) { code = mndStreamSetPauseAction(pMnode, pTrans, pStream);
if (code) {
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
// pause stream // pause stream
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
pStream->status = STREAM_STATUS__PAUSE; pStream->status = STREAM_STATUS__PAUSE;
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) { code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code) {
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
if (mndTransPrepare(pMnode, pTrans) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -2087,22 +2126,28 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
STrans *pTrans = STrans *pTrans = NULL;
doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream"); code =
if (pTrans == NULL) { doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESUME_NAME, "resume the stream", &pTrans);
if (pTrans == NULL || code) {
mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return -1; return code;
} }
code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
if (code) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
// set the resume action // set the resume action
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) { if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr()); mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
// resume stream // resume stream
@ -2113,7 +2158,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
@ -2121,7 +2166,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -2195,6 +2240,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
void *pIter = NULL; void *pIter = NULL;
STrans *pTrans = NULL; STrans *pTrans = NULL;
int32_t code = 0;
// conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool // conflict check for nodeUpdate trans, here we randomly chose one stream to add into the trans pool
while (1) { while (1) {
@ -2221,12 +2267,11 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// here create only one trans // here create only one trans
if (pTrans == NULL) { if (pTrans == NULL) {
pTrans = code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets", &pTrans);
doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_UPDATE_NAME, "update task epsets"); if (pTrans == NULL || code) {
if (pTrans == NULL) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return terrno; return terrno = code;
} }
mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
@ -2243,7 +2288,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id); pStream->name, pTrans->id);
int32_t code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans); code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans);
// todo: not continue, drop all and retry again // todo: not continue, drop all and retry again
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -2258,7 +2303,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return -1; return code;
} }
} }
@ -2267,16 +2312,17 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
return 0; return 0;
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
} }
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) { static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) {
@ -2293,9 +2339,21 @@ static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) {
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
SStreamTaskIter *pTaskIter = createStreamTaskIter(pStream); SStreamTaskIter *pTaskIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pTaskIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
mError("failed to create task iter for stream:%s", pStream->name);
continue;
}
while (streamTaskIterNextTask(pTaskIter)) { while (streamTaskIterNextTask(pTaskIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pTaskIter); SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pTaskIter, &pTask);
if (code) {
break;
}
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
epsetAssign(&entry.epset, &pTask->info.epSet); epsetAssign(&entry.epset, &pTask->info.epSet);
@ -2342,9 +2400,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
int32_t numOfNodes = extractStreamNodeList(pMnode); int32_t numOfNodes = extractStreamNodeList(pMnode);
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
if (numOfNodes == 0) { if (numOfNodes == 0) {
mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing"); mDebug("end to do stream task(s) node change checking, no stream tasks exist, do nothing");
@ -2368,7 +2426,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0; return 0;
} }
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot); removeExpiredNodeEntryAndTaskInBuf(pNodeSnapshot);
@ -2392,7 +2450,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
} }
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(changeInfo.pUpdateNodeList); taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap); taosHashCleanup(changeInfo.pDBMap);
@ -2418,9 +2476,19 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
} }
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream); SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create task iter for stream:%s", pStream->name);
return;
}
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
break;
}
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
@ -2490,10 +2558,11 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId); mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); SStreamObj *pStream = NULL;
if (pStream == NULL) { int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream);
if (pStream == NULL || code != 0) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf",
req.streamId); req.streamId);
@ -2504,7 +2573,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
if (p == NULL) { if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId); mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
return -1; return -1;
} else { } else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
@ -2549,7 +2618,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
} }
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
{ {
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)}; SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
@ -2609,10 +2678,11 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId); req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); SStreamObj *pStream = NULL;
if (pStream == NULL) { int32_t code = mndGetStreamObj(pMnode, req.streamId, &pStream);
if (pStream == NULL || code != 0) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId); mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer // not in meta-store yet, try to acquire the task in exec buffer
@ -2622,7 +2692,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
if (p == NULL) { if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId); mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
return -1; return -1;
} else { } else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet", mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
@ -2654,7 +2724,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
} }
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS); doSendQuickRsp(&pReq->info, sizeof(SMStreamUpdateChkptRsp), req.nodeId, TSDB_CODE_SUCCESS);
return 0; return 0;
@ -2719,7 +2789,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
// req.nodeId, req.streamId, req.taskId, req.checkpointId); // req.nodeId, req.streamId, req.taskId, req.checkpointId);
// //
// // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans. // // register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
// taosThreadMutexLock(&execInfo.lock); // streamMutexLock(&execInfo.lock);
// //
// // mnode handle the create stream transaction too slow may cause this problem // // mnode handle the create stream transaction too slow may cause this problem
// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
@ -2733,7 +2803,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
// if (p == NULL) { // if (p == NULL) {
// mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId); // mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId);
// terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; // terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
// taosThreadMutexUnlock(&execInfo.lock); // streamMutexUnlock(&execInfo.lock);
// //
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
// return -1; // return -1;
@ -2749,7 +2819,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
// //
// int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); // int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
// if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly // if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly
// taosThreadMutexUnlock(&execInfo.lock); // streamMutexUnlock(&execInfo.lock);
// mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs); // mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, 0, req.startTs);
// //
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
@ -2766,7 +2836,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
// SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks); // SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
// mndAddConsensusTasks(pInfo, &req); // mndAddConsensusTasks(pInfo, &req);
// //
// taosThreadMutexUnlock(&execInfo.lock); // streamMutexUnlock(&execInfo.lock);
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
// return 0; // return 0;
// } // }
@ -2776,7 +2846,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
// req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId); // req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId);
// mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs); // mndCreateSetConsensusChkptIdTrans(pMnode, pStream, req.taskId, chkId, req.startTs);
// //
// taosThreadMutexUnlock(&execInfo.lock); // streamMutexUnlock(&execInfo.lock);
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
// return 0; // return 0;
// } // }
@ -2789,7 +2859,7 @@ static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId,
// mndReleaseStream(pMnode, pStream); // mndReleaseStream(pMnode, pStream);
// } // }
// //
// taosThreadMutexUnlock(&execInfo.lock); // streamMutexUnlock(&execInfo.lock);
// doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno); // doSendQuickRsp(&pMsg->info, sizeof(SMStreamReqConsensChkptRsp), req.nodeId, terrno);
// return 0; // return 0;
//} //}
@ -2816,7 +2886,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
return 0; return 0;
} }
taosThreadMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
void *pIter = NULL; void *pIter = NULL;
while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) { while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
@ -2826,8 +2896,9 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
int32_t num = taosArrayGetSize(pInfo->pTaskList); int32_t num = taosArrayGetSize(pInfo->pTaskList);
SArray *pList = taosArrayInit(4, sizeof(int32_t)); SArray *pList = taosArrayInit(4, sizeof(int32_t));
SStreamObj *pStream = mndGetStreamObj(pMnode, pInfo->streamId); SStreamObj *pStream = NULL;
if (pStream == NULL) { // stream has been dropped already code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
if (pStream == NULL || code != 0) { // stream has been dropped already
mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId); mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
taosArrayDestroy(pList); taosArrayDestroy(pList);
continue; continue;
@ -2886,14 +2957,14 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pStreamList); ++i) {
int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i); int64_t *pStreamId = (int64_t *)taosArrayGet(pStreamList, i);
mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId); code = mndClearConsensusCheckpointId(execInfo.pStreamConsensus, *pStreamId);
} }
taosThreadMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pStreamList); taosArrayDestroy(pStreamList);
mDebug("end to process consensus-checkpointId in tmr"); mDebug("end to process consensus-checkpointId in tmr");
return TSDB_CODE_SUCCESS; return code;
} }
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
@ -2944,32 +3015,41 @@ void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
} }
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) { int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME, STrans *pTrans = NULL;
"update checkpoint-info"); int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_UPDATE_NAME,
if (pTrans == NULL) { "update checkpoint-info", &pTrans);
return terrno; if (pTrans == NULL || code) {
sdbRelease(pMnode->pSdb, pStream);
return code;
} }
/*int32_t code = */ mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid); code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_UPDATE_NAME, pStream->uid);
int32_t code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream); if (code){
if (code != 0) { sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
code = mndStreamSetUpdateChkptAction(pMnode, pTrans, pStream);
if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code != TSDB_CODE_SUCCESS) { if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code) {
mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);

View File

@ -61,15 +61,23 @@ void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
} }
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, STrans *pTrans = NULL;
" reset from failed checkpoint"); int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME,
if (pTrans == NULL) { " reset from failed checkpoint", &pTrans);
if (pTrans == NULL || code) {
sdbRelease(pMnode->pSdb, pStream);
return terrno; return terrno;
} }
/*int32_t code = */ mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
int32_t code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream); if (code) {
if (code != 0) { sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream);
if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
@ -79,14 +87,15 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code != 0) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -99,8 +108,9 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t t
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
mndKillTransImpl(pMnode, transId, ""); mndKillTransImpl(pMnode, transId, "");
SStreamObj *pStream = mndGetStreamObj(pMnode, streamId); SStreamObj *pStream = NULL;
if (pStream == NULL) { code = mndGetStreamObj(pMnode, streamId, &pStream);
if (pStream == NULL || code != 0) {
code = TSDB_CODE_STREAM_TASK_NOT_EXIST; code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
} else { } else {
@ -159,34 +169,39 @@ int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) {
} }
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
STrans *pTrans = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); STrans *pTrans = NULL;
if (pTrans == NULL) { int32_t code =
doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
if (pTrans == NULL || code != 0) {
mError("failed to create trans to drop orphan tasks since %s", terrstr()); mError("failed to create trans to drop orphan tasks since %s", terrstr());
return -1; return code;
} }
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
if (code) {
return code;
}
// drop all tasks // drop all tasks
if (mndStreamSetDropActionFromList(pMnode, pTrans, pList) < 0) { if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, pList)) < 0) {
mError("failed to create trans to drop orphan tasks since %s", terrstr()); mError("failed to create trans to drop orphan tasks since %s", terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
// drop stream // drop stream
if (mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED) < 0) { if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
} }
int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) { int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) {
@ -230,9 +245,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray *pOrphanTasks = NULL; SArray *pOrphanTasks = NULL;
int32_t code = 0; int32_t code = 0;
if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
if (suspendAllStreams(pMnode, &pReq->info) < 0) { if (suspendAllStreams(pMnode, &pReq->info) < 0) {
return -1; return code;
} }
} }
@ -242,8 +257,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (tDecodeStreamHbMsg(&decoder, &req) < 0) { if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
tCleanupStreamHbMsg(&req); tCleanupStreamHbMsg(&req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG; code = terrno = TSDB_CODE_INVALID_MSG;
return -1; return code;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -258,12 +273,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
mError("vgId:%d not exists in nodeList buf, discarded", req.vgId); mError("vgId:%d not exists in nodeList buf, discarded", req.vgId);
terrno = TSDB_CODE_INVALID_MSG; code = terrno = TSDB_CODE_INVALID_MSG;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return -1; return code;
} }
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
@ -294,9 +309,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
.startTs = pChkInfo->consensusTs, .startTs = pChkInfo->consensusTs,
}; };
SStreamObj *pStream = mndGetStreamObj(pMnode, p->id.streamId); SStreamObj *pStream = NULL;
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
if (code) {
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
continue;
}
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
SCheckpointConsensusInfo *pInfo = NULL; SCheckpointConsensusInfo *pInfo = NULL;
code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo); code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo);
@ -350,7 +370,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pMnode != NULL) { if (pMnode != NULL) {
SArray *p = NULL; SArray *p = NULL;
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &p); code = mndTakeVgroupSnapshot(pMnode, &allReady, &p);
taosArrayDestroy(p); taosArrayDestroy(p);
if (code) { if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue"); mError("failed to get the vgroup snapshot, ignore it and continue");
@ -388,7 +408,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return TSDB_CODE_SUCCESS; return terrno;
} }
void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg

View File

@ -153,27 +153,30 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) {
return 0; return 0;
} }
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name,
const char *pMsg) { const char *pMsg, STrans ** pTrans1) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name); *pTrans1 = NULL;
if (pTrans == NULL) { terrno = 0;
STrans *p = mndTransCreate(pMnode, TRN_POLICY_RETRY, conflict, pReq, name);
if (p == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return terrno;
} }
mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id); mInfo("stream:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, p->id);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); mndTransSetDbName(p, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, p) != 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT; terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno)); mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
mndTransDrop(pTrans); mndTransDrop(p);
return NULL; return terrno;
} }
terrno = 0; *pTrans1 = p;
return pTrans; return 0;
} }
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
@ -272,8 +275,9 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
continue; continue;
} }
SStreamObj *pStream = mndGetStreamObj(pMnode, pTransInfo->streamId); SStreamObj *pStream = NULL;
if (pStream != NULL) { int32_t code = mndGetStreamObj(pMnode, pTransInfo->streamId, &pStream);
if (pStream != NULL || code != 0) {
if (identicalName(pStream->sourceDb, pDBName, len)) { if (identicalName(pStream->sourceDb, pDBName, len)) {
mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb); mndKillTransImpl(pMnode, pTransInfo->transId, pStream->sourceDb);
} else if (identicalName(pStream->targetDb, pDBName, len)) { } else if (identicalName(pStream->targetDb, pDBName, len)) {

View File

@ -28,20 +28,20 @@ struct SStreamTaskIter {
int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId); int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId);
SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) { int32_t createStreamTaskIter(SStreamObj* pStream, SStreamTaskIter** pIter) {
SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); *pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
if (pIter == NULL) { if (*pIter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return terrno;
} }
pIter->level = -1; (*pIter)->level = -1;
pIter->ordinalIndex = 0; (*pIter)->ordinalIndex = 0;
pIter->pStream = pStream; (*pIter)->pStream = pStream;
pIter->totalLevel = taosArrayGetSize(pStream->tasks); (*pIter)->totalLevel = taosArrayGetSize(pStream->tasks);
pIter->pTask = NULL; (*pIter)->pTask = NULL;
return pIter; return 0;
} }
bool streamTaskIterNextTask(SStreamTaskIter* pIter) { bool streamTaskIterNextTask(SStreamTaskIter* pIter) {
@ -72,8 +72,15 @@ bool streamTaskIterNextTask(SStreamTaskIter* pIter) {
return false; return false;
} }
SStreamTask* streamTaskIterGetCurrent(SStreamTaskIter* pIter) { int32_t streamTaskIterGetCurrent(SStreamTaskIter* pIter, SStreamTask** pTask) {
return pIter->pTask; if (pTask) {
*pTask = pIter->pTask;
if (*pTask != NULL) {
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_INVALID_PARA;
} }
void destroyStreamTaskIter(SStreamTaskIter* pIter) { void destroyStreamTaskIter(SStreamTaskIter* pIter) {
@ -132,10 +139,15 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
} }
char buf[256] = {0}; char buf[256] = {0};
epsetToStr(&entry.epset, buf, tListLen(buf)); (void) epsetToStr(&entry.epset, buf, tListLen(buf));
void* p = taosArrayPush(pVgroupList, &entry);
if (p == NULL) {
mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
} else {
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
}
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupList, &entry);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
@ -146,15 +158,23 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
break; break;
} }
SNodeEntry entry = {0}; SNodeEntry entry = {.nodeId = SNODE_HANDLE};
addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
entry.nodeId = SNODE_HANDLE; if (code) {
sdbRelease(pSdb, pObj);
continue;
}
char buf[256] = {0}; char buf[256] = {0};
epsetToStr(&entry.epset, buf, tListLen(buf)); (void) epsetToStr(&entry.epset, buf, tListLen(buf));
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
void* p = taosArrayPush(pVgroupList, &entry);
if (p == NULL) {
mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
} else {
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
}
taosArrayPush(pVgroupList, &entry);
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);
} }
@ -162,28 +182,33 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
return code; return code;
} }
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
void *pIter = NULL; void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL; *pStream = NULL;
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) { SStreamObj *p = NULL;
if (pStream->uid == streamId) { while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
if (p->uid == streamId) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return pStream; *pStream = p;
return TSDB_CODE_SUCCESS;
} }
sdbRelease(pSdb, pStream); sdbRelease(pSdb, p);
} }
return NULL; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} }
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) { void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) { if (pTrans != NULL) {
mInfo("kill active transId:%d in Db:%s", transId, pDbName); mInfo("kill active transId:%d in Db:%s", transId, pDbName);
mndKillTrans(pMnode, pTrans); int32_t code = mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
if (code) {
mError("failed to kill trans:%d", pTrans->id);
}
} else { } else {
mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId); mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
} }
@ -199,11 +224,16 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter != NULL) { if (pIter != NULL) {
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); int32_t code = addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pMnode->pSdb, pObj); sdbRelease(pMnode->pSdb, pObj);
sdbCancelFetch(pMnode->pSdb, pIter); sdbCancelFetch(pMnode->pSdb, pIter);
*hasEpset = true; if (code) {
return TSDB_CODE_SUCCESS; *hasEpset = false;
mError("failed to set epset");
} else {
*hasEpset = true;
}
return code;
} else { } else {
mError("failed to acquire snode epset"); mError("failed to acquire snode epset");
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
@ -225,12 +255,14 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t
} }
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
terrno = 0;
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY)); tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return terrno;
} }
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
@ -244,31 +276,45 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
terrno = code; terrno = code;
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return terrno;
} }
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0); code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return terrno;
} }
mDebug("set the resume action for trans:%d", pTrans->id); mDebug("set the resume action for trans:%d", pTrans->id);
return 0; return 0;
} }
SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream); *pTask = NULL;
SStreamTask *p = NULL;
SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); code = streamTaskIterGetCurrent(pIter, &p);
if (pTask->id.taskId == pId->taskId) { if (code) {
continue;
}
if (p->id.taskId == pId->taskId) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
return pTask; *pTask = p;
return 0;
} }
} }
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
return NULL; return TSDB_CODE_FAILED;
} }
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
@ -282,13 +328,25 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
} }
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream); SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = NULL;
if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) { code = streamTaskIterGetCurrent(pIter, &pTask);
if (code || pTask == NULL) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
return -1; return code;
}
code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated);
if (code) {
destroyStreamTaskIter(pIter);
return code;
} }
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
@ -305,7 +363,7 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY)); tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return terrno;
} }
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
@ -322,25 +380,38 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
} }
char buf[256] = {0}; char buf[256] = {0};
epsetToStr(&epset, buf, tListLen(buf)); (void) epsetToStr(&epset, buf, tListLen(buf));
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf); mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0); code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return code;
} }
return 0; return 0;
} }
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream); SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = NULL;
if (doSetPauseAction(pMnode, pTrans, pTask) < 0) { code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
return -1; return code;
}
code = doSetPauseAction(pMnode, pTrans, pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
} }
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
@ -350,14 +421,14 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
} }
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
return 0; return code;
} }
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return terrno;
} }
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
@ -368,28 +439,40 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas
bool hasEpset = false; bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
terrno = code; return code;
return -1;
} }
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return code;
} }
return 0; return 0;
} }
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = createStreamTaskIter(pStream); SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while(streamTaskIterNextTask(pIter)) { while(streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = NULL;
if (doSetDropAction(pMnode, pTrans, pTask) < 0) { code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
return -1; return code;
}
code = doSetDropAction(pMnode, pTrans, pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
} }
} }
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
@ -400,7 +483,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return terrno;
} }
pReq->head.vgId = htonl(pTask->nodeId); pReq->head.vgId = htonl(pTask->nodeId);
@ -411,16 +494,15 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask
bool hasEpset = false; bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId); int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction
terrno = code;
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return code;
} }
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0); code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, 0);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return code;
} }
return 0; return 0;
@ -429,19 +511,35 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) { int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SOrphanTask* pTask = taosArrayGet(pList, i); SOrphanTask* pTask = taosArrayGet(pList, i);
mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId); int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask);
doSetDropActionFromId(pMnode, pTrans, pTask); if (code != 0) {
return code;
} else {
mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
}
} }
return 0; return 0;
} }
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t transId) { int32_t transId) {
int32_t code = 0;
pMsg->streamId = pId->streamId; pMsg->streamId = pId->streamId;
pMsg->taskId = pId->taskId; pMsg->taskId = pId->taskId;
pMsg->transId = transId; pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); if (pMsg->pNodeList == NULL) {
mError("failed to prepare node list, code:out of memory");
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == 0) {
void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
if (p == NULL) {
mError("failed to add update node list into nodeList");
}
}
} }
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
@ -456,7 +554,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
if (code < 0) { if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList); taosArrayDestroy(req.pNodeList);
return -1; return terrno;
} }
int32_t tlen = sizeof(SMsgHead) + blen; int32_t tlen = sizeof(SMsgHead) + blen;
@ -465,13 +563,18 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList); taosArrayDestroy(req.pNodeList);
return -1; return terrno;
} }
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req); code = tEncodeStreamTaskUpdateMsg(&encoder, &req);
if (code == -1) {
tEncoderClear(&encoder);
taosArrayDestroy(req.pNodeList);
return code;
}
SMsgHead *pMsgHead = (SMsgHead *)buf; SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen); pMsgHead->contLen = htonl(tlen);
@ -489,15 +592,20 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
void *pBuf = NULL; void *pBuf = NULL;
int32_t len = 0; int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); int32_t code = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
if (code) {
return code;
}
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
if (code) {
return code;
}
SEpSet epset = {0}; SEpSet epset = {0};
bool hasEpset = false; bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) { if (code != TSDB_CODE_SUCCESS || !hasEpset) {
terrno = code;
return code; return code;
} }
@ -512,16 +620,30 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask
// build trans to update the epset // build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock); SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = NULL;
int32_t code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo); code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
return -1; return code;
} }
} }
@ -560,16 +682,30 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
} }
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
taosWLockLatch(&pStream->lock); SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = NULL;
int32_t code = doSetResetAction(pMnode, pTrans, pTask); code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
code = doSetResetAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
return -1; return code;
} }
} }
@ -583,8 +719,12 @@ static void freeTaskList(void* param) {
taosArrayDestroy(*pList); taosArrayDestroy(*pList);
} }
void mndInitExecInfo() { int32_t mndInitExecInfo() {
taosThreadMutexInit(&execInfo.lock, NULL); int32_t code = taosThreadMutexInit(&execInfo.lock, NULL);
if (code) {
return code;
}
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
@ -598,6 +738,7 @@ void mndInitExecInfo() {
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList); taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList); taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
return 0;
} }
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
@ -610,7 +751,10 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
for (int32_t j = 0; j < size; ++j) { for (int32_t j = 0; j < size; ++j) {
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j); SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
if (pEntry->nodeId == p->nodeId) { if (pEntry->nodeId == p->nodeId) {
taosArrayPush(pValidList, p); void* px = taosArrayPush(pValidList, p);
if (px == NULL) {
mError("failed to put node into list, nodeId:%d", p->nodeId);
}
break; break;
} }
} }
@ -628,7 +772,10 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); int32_t code = taosHashRemove(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
if (code) {
return code;
}
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
@ -647,28 +794,45 @@ int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) { void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo* pExecInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTaskIds); ++i) {
STaskId *pId = taosArrayGet(pTaskIds, i); STaskId *pId = taosArrayGet(pTaskIds, i);
doRemoveTasks(pExecInfo, pId); int32_t code = doRemoveTasks(pExecInfo, pId);
if (code) {
mError("failed to remove task in buffer list, 0x%"PRIx64, pId->taskId);
}
} }
} }
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
taosThreadMutexLock(&pExecNode->lock); SStreamTaskIter *pIter = NULL;
streamMutexLock(&pExecNode->lock);
// 1. remove task entries // 1. remove task entries
SStreamTaskIter *pIter = createStreamTaskIter(pStream); int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
streamMutexUnlock(&pExecNode->lock);
mError("failed to create stream task iter:%s", pStream->name);
return;
}
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
continue;
}
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
doRemoveTasks(pExecNode, &id); code = doRemoveTasks(pExecNode, &id);
if (code) {
mError("failed to remove task in buffer list, 0x%"PRIx64, id.taskId);
}
} }
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
// 2. remove stream entry in consensus hash table // 2. remove stream entry in consensus hash table
mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); (void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
taosThreadMutexUnlock(&pExecNode->lock); streamMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
} }
@ -699,7 +863,10 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) { if (!existed) {
taosArrayPush(pRemovedTasks, pId); void* p = taosArrayPush(pRemovedTasks, pId);
if (p == NULL) {
mError("failed to put task entry into remove list, taskId:0x%" PRIx64, pId->taskId);
}
} }
} }
@ -761,45 +928,64 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas
} }
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
SStreamTaskIter *pIter = createStreamTaskIter(pStream);
while (streamTaskIterNextTask(pIter)) { while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = streamTaskIterGetCurrent(pIter); SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
int32_t code = doSetUpdateChkptAction(pMnode, pTrans, pTask); code = doSetUpdateChkptAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
return -1; return code;
} }
} }
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
return 0; return code;
} }
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
void *pIter = NULL; void *pIter = NULL;
SArray *pDropped = taosArrayInit(4, sizeof(int64_t)); SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
int32_t code = 0;
mDebug("start to scan checkpoint report info"); mDebug("start to scan checkpoint report info");
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
SArray *pList = *(SArray **)pIter; SArray *pList = *(SArray **)pIter;
STaskChkptInfo* pInfo = taosArrayGet(pList, 0); STaskChkptInfo *pInfo = taosArrayGet(pList, 0);
SStreamObj* pStream = mndGetStreamObj(pMnode, pInfo->streamId); SStreamObj *pStream = NULL;
if (pStream == NULL) { code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
if (pStream == NULL || code != 0) {
mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId); mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
taosArrayPush(pDropped, &pInfo->streamId); void* p = taosArrayPush(pDropped, &pInfo->streamId);
if (p == NULL) {
mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
}
continue; continue;
} }
int32_t total = mndGetNumOfStreamTasks(pStream); int32_t total = mndGetNumOfStreamTasks(pStream);
int32_t existed = (int32_t) taosArrayGetSize(pList); int32_t existed = (int32_t)taosArrayGetSize(pList);
if (total == existed) { if (total == existed) {
mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
@ -807,17 +993,21 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
if (!conflict) { if (!conflict) {
int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
taosArrayPush(pDropped, &pInfo->streamId); void* p = taosArrayPush(pDropped, &pInfo->streamId);
mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId); if (p == NULL) {
mError("failed to remove stream:0x%" PRIx64, pInfo->streamId);
} else {
mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId);
}
} else { } else {
mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet",
pInfo->streamId); pInfo->streamId);
} }
break; break;
} else { } else {
mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId); mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
} }
} else { } else {
mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name, mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name,
@ -831,7 +1021,10 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
if (size > 0) { if (size > 0) {
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i); int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i);
taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId)); code = taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId));
if (code) {
mError("failed to remove stream in buf:0x%"PRIx64, streamId);
}
} }
int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams); int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
@ -856,29 +1049,30 @@ static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStream
int32_t blen; int32_t blen;
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code); tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code);
if (code < 0) { if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
int32_t tlen = sizeof(SMsgHead) + blen; int32_t tlen = sizeof(SMsgHead) + blen;
void *pBuf = taosMemoryMalloc(tlen); void *pBuf = taosMemoryMalloc(tlen);
if (pBuf == NULL) { if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
tEncodeRestoreCheckpointInfo(&encoder, &req); code = tEncodeRestoreCheckpointInfo(&encoder, &req);
tEncoderClear(&encoder);
if (code == -1) {
taosMemoryFree(pBuf);
return code;
}
SMsgHead *pMsgHead = (SMsgHead *)pBuf; SMsgHead *pMsgHead = (SMsgHead *)pBuf;
pMsgHead->contLen = htonl(tlen); pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pTask->info.nodeId); pMsgHead->vgId = htonl(pTask->info.nodeId);
tEncoderClear(&encoder);
SEpSet epset = {0}; SEpSet epset = {0};
bool hasEpset = false; bool hasEpset = false;
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
@ -900,17 +1094,28 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i
char msg[128] = {0}; char msg[128] = {0};
snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId); snprintf(msg, tListLen(msg), "set consen-chkpt-id for task:0x%x", taskId);
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg); STrans *pTrans = NULL;
if (pTrans == NULL) { int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_CHKPT_CONSEN_NAME, msg, &pTrans);
if (pTrans == NULL || code != 0) {
return terrno; return terrno;
} }
STaskId id = {.streamId = pStream->uid, .taskId = taskId}; STaskId id = {.streamId = pStream->uid, .taskId = taskId};
SStreamTask *pTask = mndGetStreamTask(&id, pStream); SStreamTask *pTask = NULL;
ASSERT(pTask); code = mndGetStreamTask(&id, pStream, &pTask);
if (code) {
mError("failed to get task:0x%x in stream:%s, failed to create consensus-checkpointId", taskId, pStream->name);
sdbRelease(pMnode->pSdb, pStream);
return code;
}
/*int32_t code = */ mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid); code = mndStreamRegisterTrans(pTrans, MND_STREAM_CHKPT_CONSEN_NAME, pStream->uid);
int32_t code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts); if (code) {
sdbRelease(pMnode->pSdb, pStream);
return code;
}
code = mndStreamSetChkptIdAction(pMnode, pTrans, pTask, checkpointId, ts);
if (code != 0) { if (code != 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -918,17 +1123,18 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i
} }
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code != TSDB_CODE_SUCCESS) { if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code) {
mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return code;
} }
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
@ -938,8 +1144,11 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i
} }
int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) { int32_t mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks, SCheckpointConsensusInfo **pInfo) {
*pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); *pInfo = NULL;
if (pInfo != NULL) {
void* px = taosHashGet(pHash, &streamId, sizeof(streamId));
if (px != NULL) {
*pInfo = px;
return 0; return 0;
} }
@ -977,11 +1186,15 @@ void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpo
} }
} }
taosArrayPush(pInfo->pTaskList, &info); void *p = taosArrayPush(pInfo->pTaskList, &info);
int32_t num = taosArrayGetSize(pInfo->pTaskList); if (p == NULL) {
mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64 mError("s-task:0x%x failed to put task into consensus-checkpointId list, code: out of memory", info.req.taskId);
" waiting tasks:%d", } else {
pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num); int32_t num = taosArrayGetSize(pInfo->pTaskList);
mDebug("s-task:0x%x checkpointId:%" PRId64 " added into consensus-checkpointId list, stream:0x%" PRIx64
" waiting tasks:%d",
pRestoreInfo->taskId, pRestoreInfo->checkpointId, pRestoreInfo->streamId, num);
}
} }
void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) { void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) {
@ -990,22 +1203,14 @@ void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) {
} }
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) { int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
taosHashRemove(pHash, &streamId, sizeof(streamId)); int32_t code = taosHashRemove(pHash, &streamId, sizeof(streamId));
int32_t numOfStreams = taosHashGetSize(pHash); if (code == 0) {
mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list after new checkpoint generated, remain:%d", streamId, int32_t numOfStreams = taosHashGetSize(pHash);
numOfStreams); mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list after new checkpoint generated, remain:%d",
return TSDB_CODE_SUCCESS; streamId, numOfStreams);
} } else {
mError("failed to remove stream:0x%"PRIx64" in consensus-checkpointId list", streamId);
}
//int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) { return code;
// void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId)); }
// ASSERT(pInfo == NULL);
//
// SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL};
// taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
//
// SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId));
// ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0);
// mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId);
// return TSDB_CODE_SUCCESS;
//}

View File

@ -639,7 +639,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
code = tEncodeStreamTask(&encoder, pTask); code = tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder); tEncoderClear(&encoder);
if (code) { if (code == -1) {
stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code)); stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }

View File

@ -252,7 +252,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci
} }
int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type, int8_t lvl) { const char type, int8_t lvl) {
size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); size_t len = 0;//FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl);
if (len > inputSize) { if (len > inputSize) {
output[0] = 0; output[0] = 0;
memcpy(output + 1, input, inputSize); memcpy(output + 1, input, inputSize);
@ -264,7 +264,7 @@ int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char
int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output, int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) { int32_t outputSize, const char type) {
if (input[0] == 1) { if (input[0] == 1) {
return FL2_decompress(output, outputSize, input + 1, compressedSize - 1); return 0;//FL2_decompress(output, outputSize, input + 1, compressedSize - 1);
} else if (input[0] == 0) { } else if (input[0] == 0) {
memcpy(output, input + 1, compressedSize - 1); memcpy(output, input + 1, compressedSize - 1);
return compressedSize - 1; return compressedSize - 1;