refactor: do some internal refactor.
This commit is contained in:
parent
49bc3924fb
commit
d4bab8c09b
|
@ -462,7 +462,6 @@ struct SStreamTask {
|
|||
struct SStreamMeta* pMeta;
|
||||
SSHashObj* pNameMap;
|
||||
void* pBackend;
|
||||
int64_t backendRefId;
|
||||
char reserve[256];
|
||||
};
|
||||
|
||||
|
@ -535,7 +534,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
|
|||
SArray* pTaskList, bool hasFillhistory);
|
||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
|
||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
|
||||
void tFreeStreamTask(SStreamTask* pTask, bool metaLock);
|
||||
void tFreeStreamTask(SStreamTask* pTask);
|
||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
|
||||
|
||||
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
|
||||
|
|
|
@ -17,11 +17,15 @@
|
|||
#define _TD_MND_STREAM_H_
|
||||
|
||||
#include "mndInt.h"
|
||||
#include "mndTrans.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define MND_STREAM_RESERVE_SIZE 64
|
||||
#define MND_STREAM_VER_NUMBER 4
|
||||
|
||||
typedef struct SStreamTransInfo {
|
||||
int64_t startTime;
|
||||
int64_t streamUid;
|
||||
|
@ -53,6 +57,19 @@ typedef struct SStreamExecInfo {
|
|||
SHashObj *pTransferStateStreams;
|
||||
} SStreamExecInfo;
|
||||
|
||||
typedef struct SNodeEntry {
|
||||
int32_t nodeId;
|
||||
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
|
||||
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
|
||||
int64_t hbTimestamp; // second
|
||||
} SNodeEntry;
|
||||
|
||||
typedef struct SFailedCheckpointInfo {
|
||||
int64_t streamUid;
|
||||
int64_t checkpointId;
|
||||
int32_t transId;
|
||||
} SFailedCheckpointInfo;
|
||||
|
||||
#define MND_STREAM_CREATE_NAME "stream-create"
|
||||
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
|
||||
#define MND_STREAM_PAUSE_NAME "stream-pause"
|
||||
|
@ -68,7 +85,7 @@ void mndCleanupStream(SMnode *pMnode);
|
|||
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
|
||||
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
|
||||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid);
|
||||
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
|
||||
|
@ -81,6 +98,21 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
|||
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
|
||||
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
|
||||
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
|
||||
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
|
||||
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
||||
int32_t retryCode);
|
||||
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
|
||||
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
|
||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
||||
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
||||
int32_t mndProcessStreamHb(SRpcMsg *pReq);
|
||||
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||
int32_t initStreamNodeList(SMnode *pMnode);
|
||||
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
|
||||
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -182,7 +182,7 @@ void *freeStreamTasks(SArray *pTaskLevel) {
|
|||
int32_t taskSz = taosArrayGetSize(pLevel);
|
||||
for (int32_t j = 0; j < taskSz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
tFreeStreamTask(pTask, true);
|
||||
tFreeStreamTask(pTask);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pLevel);
|
||||
|
|
|
@ -639,7 +639,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
|
||||
if (mndScheduleStream(pMnode, &streamObj, 1685959190000) != 0) goto _OVER;
|
||||
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
|
||||
if (mndPersistStream(pTrans, &streamObj) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
|
||||
|
@ -872,7 +872,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
}
|
||||
|
||||
// drop stream
|
||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) {
|
||||
mError("stream:%s, failed to drop log since %s", pStream->name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
goto _OVER;
|
||||
|
@ -923,7 +923,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) {
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
goto _OVER;
|
||||
}
|
||||
|
|
|
@ -27,17 +27,8 @@
|
|||
#include "tmisce.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define MND_STREAM_VER_NUMBER 4
|
||||
#define MND_STREAM_RESERVE_SIZE 64
|
||||
#define MND_STREAM_MAX_NUM 60
|
||||
|
||||
typedef struct SNodeEntry {
|
||||
int32_t nodeId;
|
||||
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
|
||||
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
|
||||
int64_t hbTimestamp; // second
|
||||
} SNodeEntry;
|
||||
|
||||
typedef struct SVgroupChangeInfo {
|
||||
SHashObj *pDBMap;
|
||||
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
|
||||
|
@ -54,7 +45,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
|
|||
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
|
||||
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
|
||||
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq);
|
||||
static int32_t mndProcessStreamHb(SRpcMsg *pReq);
|
||||
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
|
@ -66,28 +56,18 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
|
|||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
||||
static SArray *extractNodeListFromStream(SMnode *pMnode);
|
||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
|
||||
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
|
||||
|
||||
static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
||||
|
||||
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
|
||||
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
|
||||
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
||||
int32_t retryCode);
|
||||
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
||||
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||
static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
||||
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
|
||||
static void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
|
||||
|
||||
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
|
||||
static void freeCheckpointCandEntry(void *);
|
||||
static void freeTaskList(void *param);
|
||||
|
||||
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
|
||||
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
|
||||
|
@ -176,53 +156,6 @@ void mndCleanupStream(SMnode *pMnode) {
|
|||
mDebug("mnd stream exec info cleanup");
|
||||
}
|
||||
|
||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void *buf = NULL;
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, NULL, 0);
|
||||
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
|
||||
tEncoderClear(&encoder);
|
||||
goto STREAM_ENCODE_OVER;
|
||||
}
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto STREAM_ENCODE_OVER;
|
||||
|
||||
buf = taosMemoryMalloc(tlen);
|
||||
if (buf == NULL) goto STREAM_ENCODE_OVER;
|
||||
|
||||
tEncoderInit(&encoder, buf, tlen);
|
||||
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
|
||||
tEncoderClear(&encoder);
|
||||
goto STREAM_ENCODE_OVER;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
STREAM_ENCODE_OVER:
|
||||
taosMemoryFreeClear(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
|
||||
pStream->checkpointId);
|
||||
return pRaw;
|
||||
}
|
||||
|
||||
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
|
@ -548,7 +481,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t level = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < level; i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
|
@ -582,30 +515,12 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
if (mndPersistStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream) {
|
||||
if (mndPersistStreamTasks(pTrans, pStream) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
return 0;
|
||||
return mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
|
||||
}
|
||||
|
||||
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
|
||||
|
@ -699,40 +614,7 @@ _OVER:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
|
||||
*hasEpset = false;
|
||||
|
||||
pEpSet->numOfEps = 0;
|
||||
if (nodeId == SNODE_HANDLE) {
|
||||
SSnodeObj *pObj = NULL;
|
||||
void *pIter = NULL;
|
||||
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||
if (pIter != NULL) {
|
||||
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
|
||||
sdbRelease(pMnode->pSdb, pObj);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
*hasEpset = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
mError("failed to acquire snode epset");
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
} else {
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
|
||||
if (pVgObj != NULL) {
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
epsetAssign(pEpSet, &epset);
|
||||
*hasEpset = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||
|
@ -900,7 +782,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// add stream to trans
|
||||
if (mndPersistStream(pMnode, pTrans, &streamObj) < 0) {
|
||||
if (mndPersistStream(pTrans, &streamObj) < 0) {
|
||||
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
goto _OVER;
|
||||
|
@ -1126,7 +1008,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
|||
pStream->version = pStream->version + 1;
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
||||
if ((code = mndPersistTransLog(pStream, pTrans)) != TSDB_CODE_SUCCESS) {
|
||||
if ((code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY)) != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1141,7 +1023,7 @@ _ERR:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t initStreamNodeList(SMnode *pMnode) {
|
||||
int32_t initStreamNodeList(SMnode *pMnode) {
|
||||
if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) {
|
||||
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
|
||||
execInfo.pNodeList = extractNodeListFromStream(pMnode);
|
||||
|
@ -1367,7 +1249,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// drop stream
|
||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
tFreeMDropStreamReq(&dropReq);
|
||||
|
@ -1386,7 +1268,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
|
||||
if (transId != 0) {
|
||||
mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
|
||||
killTransImpl(pMnode, transId, pStream->sourceDb);
|
||||
mndKillTransImpl(pMnode, transId, pStream->sourceDb);
|
||||
}
|
||||
|
||||
removeStreamTasksInBuf(pStream, &execInfo);
|
||||
|
@ -1434,13 +1316,13 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
|||
int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid);
|
||||
if (transId != 0) {
|
||||
mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name);
|
||||
killTransImpl(pMnode, transId, pStream->sourceDb);
|
||||
mndKillTransImpl(pMnode, transId, pStream->sourceDb);
|
||||
}
|
||||
|
||||
// drop the stream obj in execInfo
|
||||
removeStreamTasksInBuf(pStream, &execInfo);
|
||||
|
||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) {
|
||||
sdbRelease(pSdb, pStream);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return -1;
|
||||
|
@ -1741,69 +1623,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static int32_t mndPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
SEpSet epset = {0};
|
||||
mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps);
|
||||
bool hasEpset = false;
|
||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// no valid epset, return directly without redoAction
|
||||
if (!hasEpset) {
|
||||
taosMemoryFree(pReq);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndPauseAllStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
SArray *tasks = pStream->tasks;
|
||||
|
||||
int32_t size = taosArrayGetSize(tasks);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SArray *pTasks = taosArrayGetP(tasks, i);
|
||||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndPauseStreamTask(pMnode, pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
|
||||
atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
|
||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) {
|
||||
// SStreamObj streamObj = {0};
|
||||
// memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
|
||||
taosWLockLatch(&pStream->lock);
|
||||
pStream->status = status;
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
|
@ -1882,7 +1702,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
|
||||
|
||||
// if nodeUpdate happened, not send pause trans
|
||||
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
if (mndPauseStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -1909,57 +1729,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
|
||||
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
pReq->igUntreated = igUntreated;
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
|
||||
int32_t size = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
|
||||
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SStreamObj *pStream = NULL;
|
||||
|
@ -2019,7 +1788,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
|
||||
|
||||
// resume all tasks
|
||||
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
||||
if (mndResumeStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -2097,40 +1866,6 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans) {
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
if (pCommitRaw == NULL) {
|
||||
mError("failed to encode stream since %s", terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
|
||||
mError("stream trans:%d failed to set raw status since %s", pTrans->id, terrstr());
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
||||
int32_t retryCode) {
|
||||
pAction->epSet = *pEpset;
|
||||
pAction->contLen = contLen;
|
||||
pAction->pCont = pCont;
|
||||
pAction->msgType = msgType;
|
||||
pAction->retryCode = retryCode;
|
||||
}
|
||||
|
||||
// todo extract method: traverse stream tasks
|
||||
// build trans to update the epset
|
||||
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
|
||||
|
@ -2224,69 +1959,6 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
|||
return info;
|
||||
}
|
||||
|
||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SVgObj *pVgroup = NULL;
|
||||
|
||||
*allReady = true;
|
||||
SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
|
||||
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
|
||||
// if not all ready till now, no need to check the remaining vgroups.
|
||||
if (*allReady) {
|
||||
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||
if (!pVgroup->vnodeGid[i].syncRestore) {
|
||||
mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
|
||||
*allReady = false;
|
||||
break;
|
||||
}
|
||||
|
||||
ESyncState state = pVgroup->vnodeGid[i].syncState;
|
||||
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
|
||||
mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId);
|
||||
*allReady = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
char buf[256] = {0};
|
||||
EPSET_TO_STR(&entry.epset, buf);
|
||||
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||
taosArrayPush(pVgroupListSnapshot, &entry);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
SSnodeObj *pObj = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SNodeEntry entry = {0};
|
||||
addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
||||
entry.nodeId = SNODE_HANDLE;
|
||||
|
||||
char buf[256] = {0};
|
||||
EPSET_TO_STR(&entry.epset, buf);
|
||||
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
|
||||
taosArrayPush(pVgroupListSnapshot, &entry);
|
||||
sdbRelease(pSdb, pObj);
|
||||
}
|
||||
|
||||
return pVgroupListSnapshot;
|
||||
}
|
||||
|
||||
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SStreamObj *pStream = NULL;
|
||||
|
@ -2349,7 +2021,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
|||
continue;
|
||||
}
|
||||
|
||||
code = mndPersistTransLog(pStream, pTrans);
|
||||
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
|
||||
sdbRelease(pSdb, pStream);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2419,22 +2091,6 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
|
|||
return plist;
|
||||
}
|
||||
|
||||
static void doExtractTasksFromStream(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SStreamObj *pStream = NULL;
|
||||
void *pIter = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
saveStreamTasksInfo(pStream, &execInfo);
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
|
||||
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
|
||||
if (p == NULL) {
|
||||
|
@ -2679,114 +2335,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
|||
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
|
||||
}
|
||||
|
||||
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name);
|
||||
if (pTrans == NULL) {
|
||||
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg);
|
||||
|
||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
|
||||
mndTransDrop(pTrans);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
terrno = 0;
|
||||
return pTrans;
|
||||
}
|
||||
|
||||
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
|
||||
if (pTrans == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
taosWLockLatch(&pStream->lock);
|
||||
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
|
||||
|
||||
for (int32_t j = 0; j < numOfLevels; ++j) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pLevel);
|
||||
for (int32_t k = 0; k < numOfTasks; ++k) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, k);
|
||||
|
||||
// todo extract method, with pause stream task
|
||||
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(pReq);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!hasEpset) {
|
||||
taosMemoryFree(pReq);
|
||||
continue;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
mndTransDrop(pTrans);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
||||
int32_t code = mndPersistTransLog(pStream, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
|
||||
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
||||
if (pTrans != NULL) {
|
||||
mInfo("kill active transId:%d in Db:%s", transId, pDbName);
|
||||
mndKillTrans(pMnode, pTrans);
|
||||
mndReleaseTrans(pMnode, pTrans);
|
||||
} else {
|
||||
mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
|
||||
// data in the hash table will be removed automatically, no need to remove it here.
|
||||
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
|
||||
|
@ -2801,238 +2349,12 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
|
|||
}
|
||||
|
||||
char *pDupDBName = strndup(pDBName, len);
|
||||
killTransImpl(pMnode, pTransInfo->transId, pDupDBName);
|
||||
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName);
|
||||
taosMemoryFree(pDupDBName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
killTransImpl(pMnode, transId, "");
|
||||
|
||||
SStreamObj *pStream = mndGetStreamObj(pMnode, streamId);
|
||||
if (pStream == NULL) {
|
||||
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
|
||||
} else {
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
|
||||
if (conflict) {
|
||||
mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
|
||||
pStream->sourceDb, pStream->targetSTbName);
|
||||
} else {
|
||||
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
|
||||
pStream->uid, transId);
|
||||
code = createStreamResetStatusTrans(pMnode, pStream);
|
||||
}
|
||||
}
|
||||
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
return code;
|
||||
}
|
||||
|
||||
static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
|
||||
int32_t numOfLevels = taosArrayGetSize(pLevel);
|
||||
for (int32_t j = 0; j < numOfLevels; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
if (pTask->id.taskId == pId->taskId) {
|
||||
return pTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
|
||||
int32_t num = 0;
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
|
||||
SArray* pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
num += taosArrayGetSize(pLevel);
|
||||
}
|
||||
|
||||
return num;
|
||||
}
|
||||
|
||||
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||
int32_t num = taosArrayGetSize(pNodeList);
|
||||
mInfo("set node expired for %d nodes", num);
|
||||
|
||||
for (int k = 0; k < num; ++k) {
|
||||
int32_t *pVgId = taosArrayGet(pNodeList, k);
|
||||
mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
|
||||
|
||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
||||
for (int i = 0; i < numOfNodes; ++i) {
|
||||
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
|
||||
|
||||
if (pNodeEntry->nodeId == *pVgId) {
|
||||
mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
|
||||
pNodeEntry->stageUpdated = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
|
||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
||||
for (int32_t j = 0; j < numOfNodes; ++j) {
|
||||
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
|
||||
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
|
||||
mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64,
|
||||
pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId);
|
||||
|
||||
pNodeEntry->stageUpdated = true;
|
||||
pTaskEntry->stage = stage;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct SFailedCheckpointInfo {
|
||||
int64_t streamUid;
|
||||
int64_t checkpointId;
|
||||
int32_t transId;
|
||||
} SFailedCheckpointInfo;
|
||||
|
||||
static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pInfo) {
|
||||
int32_t num = taosArrayGetSize(pList);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
SFailedCheckpointInfo* p = taosArrayGet(pList, i);
|
||||
if (p->transId == pInfo->transId) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayPush(pList, pInfo);
|
||||
}
|
||||
|
||||
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SStreamHbMsg req = {0};
|
||||
|
||||
// bool checkpointFailed = false;
|
||||
// int64_t checkpointId = 0;
|
||||
// int64_t streamId = 0;
|
||||
// int32_t transId = 0;
|
||||
SArray* pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
|
||||
|
||||
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
|
||||
streamMetaClearHbMsg(&req);
|
||||
tDecoderClear(&decoder);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
|
||||
// extract stream task list
|
||||
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
|
||||
if (numOfExisted == 0) {
|
||||
doExtractTasksFromStream(pMnode);
|
||||
}
|
||||
|
||||
initStreamNodeList(pMnode);
|
||||
|
||||
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
||||
if (numOfUpdated > 0) {
|
||||
mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
||||
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||
}
|
||||
|
||||
bool snodeChanged = false;
|
||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||
|
||||
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
||||
if (pTaskEntry == NULL) {
|
||||
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
|
||||
updateStageInfo(pTaskEntry, p->stage);
|
||||
if (pTaskEntry->nodeId == SNODE_HANDLE) {
|
||||
snodeChanged = true;
|
||||
}
|
||||
} else {
|
||||
// task is idle for more than 50 sec.
|
||||
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
|
||||
if (!pTaskEntry->inputQChanging) {
|
||||
pTaskEntry->inputQUnchangeCounter++;
|
||||
} else {
|
||||
pTaskEntry->inputQChanging = false;
|
||||
}
|
||||
} else {
|
||||
pTaskEntry->inputQChanging = true;
|
||||
pTaskEntry->inputQUnchangeCounter = 0;
|
||||
}
|
||||
|
||||
streamTaskStatusCopy(pTaskEntry, p);
|
||||
if (p->checkpointId != 0) {
|
||||
if (p->checkpointFailed) {
|
||||
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
|
||||
p->checkpointId, p->chkpointTransId);
|
||||
|
||||
SFailedCheckpointInfo info = {
|
||||
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
|
||||
addIntoCheckpointList(pList, &info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (p->status == pTaskEntry->status) {
|
||||
pTaskEntry->statusLastDuration++;
|
||||
} else {
|
||||
pTaskEntry->status = p->status;
|
||||
pTaskEntry->statusLastDuration = 0;
|
||||
}
|
||||
|
||||
if (p->status != TASK_STATUS__READY) {
|
||||
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
|
||||
}
|
||||
}
|
||||
|
||||
// current checkpoint is failed, rollback from the checkpoint trans
|
||||
// kill the checkpoint trans and then set all tasks status to be normal
|
||||
if (taosArrayGetSize(pList) > 0) {
|
||||
bool allReady = true;
|
||||
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
|
||||
taosArrayDestroy(p);
|
||||
|
||||
if (allReady || snodeChanged) {
|
||||
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||
SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i);
|
||||
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
||||
pInfo->checkpointId, pInfo->transId);
|
||||
|
||||
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
|
||||
}
|
||||
} else {
|
||||
mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
|
||||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
streamMetaClearHbMsg(&req);
|
||||
|
||||
taosArrayDestroy(pList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void freeCheckpointCandEntry(void *param) {
|
||||
SCheckpointCandEntry *pEntry = param;
|
||||
taosMemoryFreeClear(pEntry->pName);
|
||||
|
@ -3043,22 +2365,6 @@ void freeTaskList(void* param) {
|
|||
taosArrayDestroy(*pList);
|
||||
}
|
||||
|
||||
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SStreamObj *pStream = NULL;
|
||||
|
||||
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
|
||||
if (pStream->uid == streamId) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return pStream;
|
||||
}
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void doAddTaskId(SArray* pList, int32_t taskId, int64_t uid, int32_t numOfTotal) {
|
||||
int32_t num = taosArrayGetSize(pList);
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
|
|
|
@ -160,3 +160,106 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId)
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name);
|
||||
if (pTrans == NULL) {
|
||||
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg);
|
||||
|
||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
|
||||
mndTransDrop(pTrans);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
terrno = 0;
|
||||
return pTrans;
|
||||
}
|
||||
|
||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
void *buf = NULL;
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, NULL, 0);
|
||||
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
|
||||
tEncoderClear(&encoder);
|
||||
goto STREAM_ENCODE_OVER;
|
||||
}
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto STREAM_ENCODE_OVER;
|
||||
|
||||
buf = taosMemoryMalloc(tlen);
|
||||
if (buf == NULL) goto STREAM_ENCODE_OVER;
|
||||
|
||||
tEncoderInit(&encoder, buf, tlen);
|
||||
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
|
||||
tEncoderClear(&encoder);
|
||||
goto STREAM_ENCODE_OVER;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
STREAM_ENCODE_OVER:
|
||||
taosMemoryFreeClear(buf);
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
|
||||
pStream->checkpointId);
|
||||
return pRaw;
|
||||
}
|
||||
|
||||
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
if (pCommitRaw == NULL) {
|
||||
mError("failed to encode stream since %s", terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (sdbSetRawStatus(pCommitRaw, status) != 0) {
|
||||
mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
|
||||
int32_t retryCode) {
|
||||
pAction->epSet = *pEpset;
|
||||
pAction->contLen = contLen;
|
||||
pAction->pCont = pCont;
|
||||
pAction->msgType = msgType;
|
||||
pAction->retryCode = retryCode;
|
||||
}
|
|
@ -97,7 +97,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
|
|||
}
|
||||
|
||||
if (pItem->pStreamTask) {
|
||||
tFreeStreamTask(pItem->pStreamTask, true);
|
||||
tFreeStreamTask(pItem->pStreamTask);
|
||||
}
|
||||
taosArrayDestroy(pItem->pResList);
|
||||
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
|
||||
|
|
|
@ -617,7 +617,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
|||
if (code < 0) {
|
||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks,
|
||||
tstrerror(code));
|
||||
tFreeStreamTask(pTask, true);
|
||||
tFreeStreamTask(pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -645,7 +645,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
|||
}
|
||||
} else {
|
||||
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
|
||||
tFreeStreamTask(pTask, true);
|
||||
tFreeStreamTask(pTask);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -257,8 +257,6 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
|
|||
|
||||
STaskDbWrapper* pBackend = *ppBackend;
|
||||
pBackend->pMeta = pMeta;
|
||||
|
||||
pTask->backendRefId = pBackend->refId;
|
||||
pTask->pBackend = pBackend;
|
||||
|
||||
taosThreadMutexUnlock(&pMeta->backendMutex);
|
||||
|
@ -283,7 +281,6 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
|
|||
}
|
||||
|
||||
int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
|
||||
pTask->backendRefId = tref;
|
||||
pTask->pBackend = pBackend;
|
||||
pBackend->refId = tref;
|
||||
pBackend->pTask = pTask;
|
||||
|
@ -599,19 +596,19 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
}
|
||||
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||
tFreeStreamTask(pTask, false);
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||
|
||||
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
||||
tFreeStreamTask(pTask, false);
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
tFreeStreamTask(pTask, false);
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -661,7 +658,7 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask)
|
|||
stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
|
||||
} else if (ref == 0) {
|
||||
stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
|
||||
tFreeStreamTask(pTask, true);
|
||||
tFreeStreamTask(pTask);
|
||||
} else if (ref < 0) {
|
||||
stError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr);
|
||||
}
|
||||
|
@ -871,7 +868,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (tDecodeStreamTask(&decoder, pTask) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
doClear(pKey, pVal, pCur, pRecycleList);
|
||||
tFreeStreamTask(pTask, false);
|
||||
tFreeStreamTask(pTask);
|
||||
stError(
|
||||
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
|
||||
"stream manually",
|
||||
|
@ -882,7 +879,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
|
||||
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
tFreeStreamTask(pTask, false);
|
||||
tFreeStreamTask(pTask);
|
||||
|
||||
STaskId id = streamTaskGetTaskId(pTask);
|
||||
taosArrayPush(pRecycleList, &id);
|
||||
|
@ -898,7 +895,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (p == NULL) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) {
|
||||
doClear(pKey, pVal, pCur, pRecycleList);
|
||||
tFreeStreamTask(pTask, false);
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -912,7 +909,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
|
||||
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) {
|
||||
doClear(pKey, pVal, pCur, pRecycleList);
|
||||
tFreeStreamTask(pTask, false);
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -340,16 +340,11 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeStreamTask(SStreamTask* pTask, bool metaLock) {
|
||||
void tFreeStreamTask(SStreamTask* pTask) {
|
||||
char* p = NULL;
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
STaskExecStatisInfo* pStatis = &pTask->execInfo;
|
||||
|
||||
// check for mnode
|
||||
// if (pTask->pMeta != NULL) {
|
||||
// streamTaskClearHTaskAttr(pTask, metaLock);
|
||||
// }
|
||||
|
||||
ETaskStatus status1 = TASK_STATUS__UNINIT;
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
if (pTask->status.pSM != NULL) {
|
||||
|
|
Loading…
Reference in New Issue