Merge pull request #27978 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2024-09-20 16:03:46 +08:00 committed by GitHub
commit 689608b147
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 988 additions and 844 deletions

View File

@ -35,6 +35,7 @@ extern "C" {
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
#define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update"
#define MND_STREAM_CHKPT_CONSEN_NAME "stream-chkpt-consen"
#define MND_STREAM_RESTART_NAME "stream-restart"
typedef struct SStreamTransInfo {
int64_t startTime;
@ -120,6 +121,7 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList);
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode, int32_t acceptCode);
@ -127,31 +129,39 @@ int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnC
const char *pMsg, STrans **pTrans1);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream);
bool mndStreamNodeIsUpdated(SMnode *pMnode);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t extractStreamNodeList(SMnode *pMnode);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts);
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream);
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
int64_t ts);
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
SVgroupChangeInfo *pInfo);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter);
void destroyStreamTaskIter(SStreamTaskIter *pIter);
bool streamTaskIterNextTask(SStreamTaskIter *pIter);
int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask);
int32_t mndInitExecInfo();
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
void mndStreamResetInitTaskListLoadFlag();

View File

@ -63,8 +63,7 @@ static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo* pInfo);
static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo);
static void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
static void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
@ -920,6 +919,85 @@ _OVER:
return code;
}
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
int32_t code = 0;
SMPauseStreamReq pauseReq = {0};
if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) {
return TSDB_CODE_INVALID_MSG;
}
code = mndAcquireStream(pMnode, pauseReq.name, &pStream);
if (pStream == NULL || code != 0) {
if (pauseReq.igNotExists) {
mInfo("stream:%s, not exist, not restart stream", pauseReq.name);
return 0;
} else {
mError("stream:%s not exist, failed to restart stream", pauseReq.name);
TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST);
}
}
mInfo("stream:%s,%" PRId64 " start to restart stream", pauseReq.name, pStream->uid);
if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return code;
}
// check if it is conflict with other trans in both sourceDb and targetDb.
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true);
if (code) {
sdbRelease(pMnode->pSdb, pStream);
return code;
}
bool updated = mndStreamNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for restart, node update detected");
sdbRelease(pMnode->pSdb, pStream);
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
}
STrans *pTrans = NULL;
code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream", &pTrans);
if (pTrans == NULL || code) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream);
return code;
}
code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid);
if (code) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
// if nodeUpdate happened, not send pause trans
code = mndStreamSetRestartAction(pMnode, pTrans, pStream);
if (code) {
mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare restart stream trans since %s", pTrans->id, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
SStreamObj *pStream = NULL;
void *pIter = NULL;
@ -973,82 +1051,6 @@ int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) {
return maxChkptId + 1;
}
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = checkpointId;
req.nodeId = nodeId;
req.expireTime = -1;
req.streamId = streamId; // pTask->id.streamId;
req.taskId = taskId; // pTask->id.taskId;
req.transId = transId;
req.mndTrigger = mndTrigger;
int32_t code;
int32_t blen;
tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
if (code < 0) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
return terrno;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req);
if (pos == -1) {
tEncoderClear(&encoder);
return TSDB_CODE_INVALID_MSG;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
return 0;
}
static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger) {
void *buf;
int32_t tlen;
int32_t code = 0;
SEpSet epset = {0};
bool hasEpset = false;
if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id, mndTrigger)) < 0) {
taosMemoryFree(buf);
return code;
}
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(buf);
return code;
}
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(buf);
}
return code;
}
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
int8_t mndTrigger, bool lock) {
int32_t code = TSDB_CODE_SUCCESS;
@ -1096,7 +1098,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
code = doSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
code = mndStreamSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
if (code != TSDB_CODE_SUCCESS) {
taosWUnLockLatch(&pStream->lock);
@ -1143,70 +1145,9 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
return taosArrayGetSize(execInfo.pNodeList);
}
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
bool allReady = false;
bool nodeUpdated = false;
SVgroupChangeInfo changeInfo = {0};
int32_t numOfNodes = extractStreamNodeList(pMnode);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec();
return false;
}
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry == NULL) {
continue;
}
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return true;
}
}
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
}
if (!allReady) {
mWarn("not all vnodes ready, quit from vnodes status check");
return true;
}
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
if (code) {
nodeUpdated = false;
} else {
nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
if (nodeUpdated) {
mDebug("stream tasks not ready due to node update");
}
}
mndDestroyVgroupChangeInfo(&changeInfo);
return nodeUpdated;
}
// check if the node update happens or not
static bool taskNodeIsUpdated(SMnode *pMnode) {
SArray *pNodeSnapshot = NULL;
streamMutexLock(&execInfo.lock);
bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pNodeSnapshot);
return updated;
}
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
bool ready = true;
if (taskNodeIsUpdated(pMnode)) {
if (mndStreamNodeIsUpdated(pMnode)) {
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
}
@ -1605,32 +1546,6 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
return 0;
}
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
}
int32_t numOfStreams = 0;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->sourceDbUid == pDb->uid) {
numOfStreams++;
}
sdbRelease(pSdb, pStream);
}
*pNumOfStreams = numOfStreams;
mndReleaseDb(pMnode, pDb);
return 0;
}
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -1770,7 +1685,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
TAOS_RETURN(code);
}
bool updated = taskNodeIsUpdated(pMnode);
bool updated = mndStreamNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for pause, node update detected");
sdbRelease(pMnode->pSdb, pStream);
@ -1965,102 +1880,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
const SEp *p = GET_ACTIVE_EP(pCurrent);
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
return false;
}
return true;
}
// 1. increase the replica does not affect the stream process.
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
// tasks on the will be removed replica.
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
// will handle it as mentioned in 1 & 2 items.
static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
SVgroupChangeInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
if (pInfo == NULL) {
return TSDB_CODE_INVALID_PARA;
}
pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
mndDestroyVgroupChangeInfo(pInfo);
TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
}
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
if (pPrevEntry == NULL) {
continue;
}
int32_t num = taosArrayGetSize(pNodeList);
for (int32_t j = 0; j < num; ++j) {
SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
if(pCurrent == NULL) {
continue;
}
if (pCurrent->nodeId == pPrevEntry->nodeId) {
if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
char buf[256] = {0};
code = epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error
if (code) {
mError("failed to convert epset string, code:%s", tstrerror(code));
TSDB_CHECK_CODE(code, lino, _err);
}
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
TSDB_CHECK_NULL(p, code, lino, _err, terrno);
}
// todo handle the snode info
if (pCurrent->nodeId != SNODE_HANDLE) {
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
mndReleaseVgroup(pMnode, pVgroup);
TSDB_CHECK_CODE(code, lino, _err);
}
break;
}
}
}
return code;
_err:
mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
mndDestroyVgroupChangeInfo(pInfo);
return code;
}
static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) {
if (pInfo != NULL) {
taosArrayDestroy(pInfo->pUpdateNodeList);
taosHashCleanup(pInfo->pDBMap);
}
}
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;

View File

@ -21,6 +21,10 @@ typedef struct SKeyInfo {
int32_t keyLen;
} SKeyInfo;
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
}
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId) {
SStreamTransInfo info = {
.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId};
@ -117,7 +121,8 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char
}
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) {
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
(strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
return TSDB_CODE_MND_TRANS_CONFLICT;
@ -126,7 +131,8 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char
}
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
(strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name);
return TSDB_CODE_MND_TRANS_CONFLICT;
@ -282,10 +288,6 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg
return mndTransAppendRedoAction(pTrans, &action);
}
static bool identicalName(const char *pDb, const char *pParam, int32_t len) {
return (strlen(pDb) == len) && (strncmp(pDb, pParam, len) == 0);
}
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
void *pIter = NULL;

View File

@ -0,0 +1,669 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndDb.h"
#include "mndStb.h"
#include "mndStream.h"
#include "mndTrans.h"
#include "mndVgroup.h"
#include "taoserror.h"
#include "tmisce.h"
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
terrno = code;
taosMemoryFree(pReq);
return code;
}
char buf[256] = {0};
code = epsetToStr(&epset, buf, tListLen(buf));
if (code != 0) { // print error and continue
mError("failed to convert epset to str, code:%s", tstrerror(code));
}
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
}
return 0;
}
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
return code;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
}
return 0;
}
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
terrno = 0;
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
terrno = code;
taosMemoryFree(pReq);
return terrno;
}
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return terrno;
}
mDebug("set the resume action for trans:%d", pTrans->id);
return 0;
}
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
pReq->streamId = pTask->streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction
taosMemoryFree(pReq);
return code;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
}
return 0;
}
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t transId) {
int32_t code = 0;
pMsg->streamId = pId->streamId;
pMsg->taskId = pId->taskId;
pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
if (pMsg->pNodeList == NULL) {
mError("failed to prepare node list, code:%s", tstrerror(terrno));
code = terrno;
}
if (code == 0) {
void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
if (p == NULL) {
mError("failed to add update node list into nodeList");
}
}
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return terrno;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
taosArrayDestroy(req.pNodeList);
return terrno;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
code = tEncodeStreamTaskUpdateMsg(&encoder, &req);
if (code == -1) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
taosArrayDestroy(req.pNodeList);
return code;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
void *pBuf = NULL;
int32_t len = 0;
SEpSet epset = {0};
bool hasEpset = false;
bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
if (code) {
mError("failed to build stream task epset update msg, code:%s", tstrerror(code));
return code;
}
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
mError("failed to extract epset during create update epset, code:%s", tstrerror(code));
return code;
}
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
mError("failed to create update task epset trans, code:%s", tstrerror(code));
taosMemoryFree(pBuf);
}
return code;
}
static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq));
if (pReq == NULL) {
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq),
tstrerror(terrno));
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId));
if (pStreamItem == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t size = taosArrayGetSize(pStreamItem->pTaskList);
for(int32_t i = 0; i < size; ++i) {
STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i);
if (pInfo == NULL) {
continue;
}
if (pInfo->taskId == pTask->id.taskId) {
pReq->checkpointId = pInfo->checkpointId;
pReq->checkpointVer = pInfo->version;
pReq->checkpointTs = pInfo->ts;
pReq->dropRelHTask = pInfo->dropHTask;
pReq->transId = pInfo->transId;
pReq->hStreamId = pTask->hTaskInfo.id.streamId;
pReq->hTaskId = pTask->hTaskInfo.id.taskId;
}
}
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}
return code;
}
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(terrno));
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}
return code;
}
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = checkpointId;
req.nodeId = nodeId;
req.expireTime = -1;
req.streamId = streamId; // pTask->id.streamId;
req.taskId = taskId; // pTask->id.taskId;
req.transId = transId;
req.mndTrigger = mndTrigger;
int32_t code;
int32_t blen;
tEncodeSize(tEncodeStreamCheckpointSourceReq, &req, blen, code);
if (code < 0) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
return terrno;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req);
if (pos == -1) {
tEncoderClear(&encoder);
return TSDB_CODE_INVALID_MSG;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
return 0;
}
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
code = doSetPauseAction(pMnode, pTrans, pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
}
destroyStreamTaskIter(pIter);
return code;
}
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while(streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
code = doSetDropAction(pMnode, pTrans, pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
}
destroyStreamTaskIter(pIter);
return 0;
}
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code || pTask == NULL) {
destroyStreamTaskIter(pIter);
return code;
}
code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
}
destroyStreamTaskIter(pIter);
return 0;
}
// build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
code = doSetUpdateChkptAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SOrphanTask* pTask = taosArrayGet(pList, i);
if (pTask == NULL) {
return terrno;
}
int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask);
if (code != 0) {
return code;
} else {
mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
}
}
return 0;
}
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
code = doSetResetAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
SRestoreCheckpointInfo req = {
.taskId = pTask->id.taskId,
.streamId = pTask->id.streamId,
.checkpointId = checkpointId,
.startTs = ts,
.nodeId = pTask->info.nodeId,
.transId = pTrans->id,
};
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code);
if (code < 0) {
return terrno;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *pBuf = taosMemoryMalloc(tlen);
if (pBuf == NULL) {
return terrno;
}
void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
code = tEncodeRestoreCheckpointInfo(&encoder, &req);
tEncoderClear(&encoder);
if (code == -1) {
taosMemoryFree(pBuf);
return code;
}
SMsgHead *pMsgHead = (SMsgHead *)pBuf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pTask->info.nodeId);
SEpSet epset = {0};
bool hasEpset = false;
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pBuf);
return code;
}
code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
}
return code;
}
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger) {
void *buf;
int32_t tlen;
int32_t code = 0;
SEpSet epset = {0};
bool hasEpset = false;
if ((code = mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id, mndTrigger)) < 0) {
taosMemoryFree(buf);
return code;
}
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(buf);
return code;
}
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(buf);
}
return code;
}
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) {
return 0;
}

View File

@ -304,41 +304,6 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t
}
}
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
terrno = 0;
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
terrno = code;
taosMemoryFree(pReq);
return terrno;
}
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return terrno;
}
mDebug("set the resume action for trans:%d", pTrans->id);
return 0;
}
int32_t mndGetStreamTask(STaskId *pId, SStreamObj *pStream, SStreamTask **pTask) {
*pTask = NULL;
@ -377,396 +342,29 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
return num;
}
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code || pTask == NULL) {
destroyStreamTaskIter(pIter);
return code;
int32_t numOfStreams = 0;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->sourceDbUid == pDb->uid) {
numOfStreams++;
}
code = doSetResumeAction(pTrans, pMnode, pTask, igUntreated);
if (code) {
destroyStreamTaskIter(pIter);
return code;
sdbRelease(pSdb, pStream);
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
}
destroyStreamTaskIter(pIter);
return 0;
}
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
terrno = code;
taosMemoryFree(pReq);
return code;
}
char buf[256] = {0};
code = epsetToStr(&epset, buf, tListLen(buf));
if (code != 0) { // print error and continue
mError("failed to convert epset to str, code:%s", tstrerror(code));
}
mDebug("pause stream task in node:%d, epset:%s", pTask->info.nodeId, buf);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
}
return 0;
}
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
code = doSetPauseAction(pMnode, pTrans, pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
}
destroyStreamTaskIter(pIter);
return code;
}
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
return code;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
}
return 0;
}
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = NULL;
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while(streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
code = doSetDropAction(pMnode, pTrans, pTask);
if (code) {
destroyStreamTaskIter(pIter);
return code;
}
}
destroyStreamTaskIter(pIter);
return 0;
}
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
pReq->streamId = pTask->streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction
taosMemoryFree(pReq);
return code;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(pReq);
return code;
}
return 0;
}
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SOrphanTask* pTask = taosArrayGet(pList, i);
if (pTask == NULL) {
return terrno;
}
int32_t code = doSetDropActionFromId(pMnode, pTrans, pTask);
if (code != 0) {
return code;
} else {
mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
}
}
return 0;
}
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t transId) {
int32_t code = 0;
pMsg->streamId = pId->streamId;
pMsg->taskId = pId->taskId;
pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
if (pMsg->pNodeList == NULL) {
mError("failed to prepare node list, code:%s", tstrerror(terrno));
code = terrno;
}
if (code == 0) {
void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
if (p == NULL) {
mError("failed to add update node list into nodeList");
}
}
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return terrno;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
taosArrayDestroy(req.pNodeList);
return terrno;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
code = tEncodeStreamTaskUpdateMsg(&encoder, &req);
if (code == -1) {
tEncoderClear(&encoder);
taosMemoryFree(buf);
taosArrayDestroy(req.pNodeList);
return code;
}
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
void *pBuf = NULL;
int32_t len = 0;
SEpSet epset = {0};
bool hasEpset = false;
bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
if (code) {
mError("failed to build stream task epset update msg, code:%s", tstrerror(code));
return code;
}
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
mError("failed to extract epset during create update epset, code:%s", tstrerror(code));
return code;
}
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
mError("failed to create update task epset trans, code:%s", tstrerror(code));
taosMemoryFree(pBuf);
}
return code;
}
// build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
code = doSetUpdateTaskAction(pMnode, pTrans, pTask, pInfo);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(terrno));
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}
return code;
}
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
code = doSetResetAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
*pNumOfStreams = numOfStreams;
mndReleaseDb(pMnode, pDb);
return 0;
}
@ -1000,90 +598,6 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
return 0;
}
static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq));
if (pReq == NULL) {
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq),
tstrerror(terrno));
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId));
if (pStreamItem == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t size = taosArrayGetSize(pStreamItem->pTaskList);
for(int32_t i = 0; i < size; ++i) {
STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i);
if (pInfo == NULL) {
continue;
}
if (pInfo->taskId == pTask->id.taskId) {
pReq->checkpointId = pInfo->checkpointId;
pReq->checkpointVer = pInfo->version;
pReq->checkpointTs = pInfo->ts;
pReq->dropRelHTask = pInfo->dropHTask;
pReq->transId = pInfo->transId;
pReq->hStreamId = pTask->hTaskInfo.id.streamId;
pReq->hTaskId = pTask->hTaskInfo.id.taskId;
}
}
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVUpdateCheckpointInfoReq), TDMT_STREAM_TASK_UPDATE_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}
return code;
}
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
int32_t code = createStreamTaskIter(pStream, &pIter);
if (code) {
taosWUnLockLatch(&pStream->lock);
mError("failed to create stream task iter:%s", pStream->name);
return code;
}
while (streamTaskIterNextTask(pIter)) {
SStreamTask *pTask = NULL;
code = streamTaskIterGetCurrent(pIter, &pTask);
if (code) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
code = doSetUpdateChkptAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
}
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return code;
}
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
void *pIter = NULL;
@ -1172,60 +686,6 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
return TSDB_CODE_SUCCESS;
}
static int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts) {
SRestoreCheckpointInfo req = {
.taskId = pTask->id.taskId,
.streamId = pTask->id.streamId,
.checkpointId = checkpointId,
.startTs = ts,
.nodeId = pTask->info.nodeId,
.transId = pTrans->id,
};
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, blen, code);
if (code < 0) {
return terrno;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *pBuf = taosMemoryMalloc(tlen);
if (pBuf == NULL) {
return terrno;
}
void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
code = tEncodeRestoreCheckpointInfo(&encoder, &req);
tEncoderClear(&encoder);
if (code == -1) {
taosMemoryFree(pBuf);
return code;
}
SMsgHead *pMsgHead = (SMsgHead *)pBuf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pTask->info.nodeId);
SEpSet epset = {0};
bool hasEpset = false;
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pBuf);
return code;
}
code = setTransAction(pTrans, pBuf, tlen, TDMT_STREAM_CONSEN_CHKPT, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
}
return code;
}
int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId,
int64_t ts) {
char msg[128] = {0};
@ -1880,6 +1340,163 @@ int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlo
return code;
}
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
const SEp *p = GET_ACTIVE_EP(pCurrent);
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
return false;
}
return true;
}
void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) {
if (pInfo != NULL) {
taosArrayDestroy(pInfo->pUpdateNodeList);
taosHashCleanup(pInfo->pDBMap);
}
}
// 1. increase the replica does not affect the stream process.
// 2. decreasing the replica may affect the stream task execution in the way that there is one or more running stream
// tasks on the will be removed replica.
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
// will handle it as mentioned in 1 & 2 items.
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
SVgroupChangeInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
if (pInfo == NULL) {
return TSDB_CODE_INVALID_PARA;
}
pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
mndDestroyVgroupChangeInfo(pInfo);
TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
}
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pPrevEntry = taosArrayGet(pPrevNodeList, i);
if (pPrevEntry == NULL) {
continue;
}
int32_t num = taosArrayGetSize(pNodeList);
for (int32_t j = 0; j < num; ++j) {
SNodeEntry *pCurrent = taosArrayGet(pNodeList, j);
if(pCurrent == NULL) {
continue;
}
if (pCurrent->nodeId == pPrevEntry->nodeId) {
if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) {
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
char buf[256] = {0};
code = epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error
if (code) {
mError("failed to convert epset string, code:%s", tstrerror(code));
TSDB_CHECK_CODE(code, lino, _err);
}
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId};
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
TSDB_CHECK_NULL(p, code, lino, _err, terrno);
}
// todo handle the snode info
if (pCurrent->nodeId != SNODE_HANDLE) {
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
mndReleaseVgroup(pMnode, pVgroup);
TSDB_CHECK_CODE(code, lino, _err);
}
break;
}
}
}
return code;
_err:
mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
mndDestroyVgroupChangeInfo(pInfo);
return code;
}
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
bool allReady = false;
bool nodeUpdated = false;
SVgroupChangeInfo changeInfo = {0};
int32_t numOfNodes = extractStreamNodeList(pMnode);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec();
return false;
}
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry == NULL) {
continue;
}
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return true;
}
}
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
}
if (!allReady) {
mWarn("not all vnodes ready, quit from vnodes status check");
return true;
}
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
if (code) {
nodeUpdated = false;
} else {
nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
if (nodeUpdated) {
mDebug("stream tasks not ready due to node update");
}
}
mndDestroyVgroupChangeInfo(&changeInfo);
return nodeUpdated;
}
// check if the node update happens or not
bool mndStreamNodeIsUpdated(SMnode *pMnode) {
SArray *pNodeSnapshot = NULL;
streamMutexLock(&execInfo.lock);
bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pNodeSnapshot);
return updated;
}
uint32_t seed = 0;
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};

View File

@ -3439,7 +3439,6 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
}
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
stDebug("streamStateGetGroupKVByCur_rocksdb");
if (!pCur) {
return -1;
}
@ -3885,7 +3884,6 @@ SStreamStateCur* streamStateSessionSeekKeyPrev_rocksdb(SStreamState* pState, con
}
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
stDebug("streamStateSessionGetKVByCur_rocksdb");
if (!pCur) {
return -1;
}
@ -3985,7 +3983,6 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
return NULL;
}
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
stDebug("streamStateFillGetKVByCur_rocksdb");
if (!pCur) {
return -1;
}

View File

@ -900,9 +900,6 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
ref);
// streamMutexUnlock(&pTask->lock);
// streamMetaReleaseTask(pTask->pMeta, pTask);
return -1;
}
@ -911,9 +908,6 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
// streamMutexUnlock(&pActiveInfo->lock);
// streamMetaReleaseTask(pTask->pMeta, pTask);
return -1;
}
@ -922,9 +916,6 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d",
id, vgId, ref);
// streamMutexUnlock(&pActiveInfo->lock);
// streamMetaReleaseTask(pTask->pMeta, pTask);
return -1;
}
@ -1020,7 +1011,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
int32_t code = doChkptStatusCheck(pTask);
if (code) {
streamMutexUnlock(&pTask->lock);
streamMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}

View File

@ -0,0 +1,39 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#define UP_TASKS_NOT_SEND_CHKPT_TRIGGER 1
#define DOWN_TASKS_NOT_READY 2
#define DOWN_TASKS_BACKPRESSURE 3
#define DOWN_TASKS_INPUTQ_CLOSED 4
#define TASK_OUTPUTQ_FULL 5
#define TASK_SINK_QUOTA_REACHED 6
typedef struct SStreamTaskExtraInfo {
int32_t infoId;
char* pMsg;
} SStreamTaskExtraInfo;
SStreamTaskExtraInfo extraInfoList[8] = {
{0},
{.infoId = UP_TASKS_NOT_SEND_CHKPT_TRIGGER, .pMsg = "%d(us) not send checkpoint-trigger"},
{.infoId = DOWN_TASKS_NOT_READY, .pMsg = "%d(ds) tasks not ready"},
{.infoId = DOWN_TASKS_BACKPRESSURE, .pMsg = "0x%x(ds) backpressure"},
{.infoId = DOWN_TASKS_INPUTQ_CLOSED, .pMsg = "0x%x(ds) inputQ closed"},
{.infoId = TASK_OUTPUTQ_FULL, .pMsg = "outputQ is full"},
{.infoId = TASK_SINK_QUOTA_REACHED, .pMsg = "sink quota reached"},
};