refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-29 13:46:33 +08:00
parent 9087a0d9d0
commit b714c70434
5 changed files with 257 additions and 243 deletions

View File

@ -120,6 +120,14 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj*
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
typedef struct SStreamTaskIter SStreamTaskIter;
SStreamTaskIter *createTaskIter(SStreamObj *pStream);
bool taskIterNextTask(SStreamTaskIter *pIter);
SStreamTask *taskIterGetCurrent(SStreamTaskIter *pIter);
void destroyTaskIter(SStreamTaskIter *pIter);
#ifdef __cplusplus
}

View File

@ -476,22 +476,20 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
}
int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
return -1;
}
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
destroyTaskIter(pIter);
return -1;
}
}
destroyTaskIter(pIter);
// persistent stream task for already stored ts data
if (pStream->conf.fillHistory) {
level = taosArrayGetSize(pStream->pHTasksList);
int32_t level = taosArrayGetSize(pStream->pHTasksList);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
@ -856,6 +854,32 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
return 0;
}
static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger) {
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
taosMemoryFree(buf);
return -1;
}
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(buf);
return -1;
}
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (code != 0) {
taosMemoryFree(buf);
}
return code;
}
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
int8_t mndTrigger, bool lock) {
int32_t code = -1;
@ -865,6 +889,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId);
@ -887,8 +912,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
pStream->currentTick = 1;
// 1. redo action: broadcast checkpoint source msg for all source vg
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
int32_t totalLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totalLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *p = taosArrayGetP(pLevel, 0);
@ -896,28 +921,9 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
code = doSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger);
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
SEpSet epset = {0};
bool hasEpset = false;
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(buf);
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (code != 0) {
taosMemoryFree(buf);
if (code != TSDB_CODE_SUCCESS) {
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
@ -1500,21 +1506,19 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
}
// add row for each task
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
if (code == TSDB_CODE_SUCCESS) {
numOfRows++;
}
int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows);
if (code == TSDB_CODE_SUCCESS) {
numOfRows++;
}
}
// unlock
destroyTaskIter(pIter);
taosRUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
}
@ -1854,22 +1858,19 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
}
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
SStreamTaskIter *pTaskIter = createTaskIter(pStream);
while (taskIterNextTask(pTaskIter)) {
SStreamTask *pTask = taskIterGetCurrent(pTaskIter);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
epsetAssign(&entry.epset, &pTask->info.epSet);
taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
}
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
epsetAssign(&entry.epset, &pTask->info.epSet);
taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
}
destroyTaskIter(pTaskIter);
taosWUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
}
@ -2055,58 +2056,50 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
}
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {0};
streamTaskStatusInit(&entry, pTask);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {0};
streamTaskStatusInit(&entry, pTask);
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
}
taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry));
taosArrayPush(pExecNode->pTaskList, &id);
mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId,
(int32_t)taosArrayGetSize(pExecNode->pTaskList));
}
}
destroyTaskIter(pIter);
}
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
break;
}
int32_t num = taosArrayGetSize(pExecNode->pTaskList);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num);
break;
}
}
}
}
destroyTaskIter(pIter);
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}

View File

@ -65,61 +65,24 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI
taosArrayPush(pList, pInfo);
}
static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
}
/*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
// todo extract method, with pause stream task
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosWUnLockLatch(&pStream->lock);
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
continue;
}
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return terrno;
}
}
int32_t code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream);
if (code != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return code;
}
taosWUnLockLatch(&pStream->lock);
int32_t code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}

View File

@ -301,86 +301,4 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("complete clear checkpoints in Dbs");
}
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t transId) {
pMsg->streamId = pId->streamId;
pMsg->taskId = pId->taskId;
pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}
// todo extract method: traverse stream tasks
// build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
void *pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
}
taosWUnLockLatch(&pStream->lock);
return 0;
}

View File

@ -18,13 +18,13 @@
#include "tmisce.h"
#include "mndVgroup.h"
typedef struct SStreamTaskIter {
struct SStreamTaskIter {
SStreamObj *pStream;
int32_t level;
int32_t ordinalIndex;
int32_t totalLevel;
SStreamTask *pTask;
} SStreamTaskIter;
};
SStreamTaskIter* createTaskIter(SStreamObj* pStream) {
SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
@ -235,18 +235,16 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
}
SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (pTask->id.taskId == pId->taskId) {
return pTask;
}
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
if (pTask->id.taskId == pId->taskId) {
destroyTaskIter(pIter);
return pTask;
}
}
destroyTaskIter(pIter);
return NULL;
}
@ -261,21 +259,20 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
}
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
return -1;
}
SStreamTaskIter *pIter = createTaskIter(pStream);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
destroyTaskIter(pIter);
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
}
destroyTaskIter(pIter);
return 0;
}
@ -410,4 +407,139 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* p
doSetDropActionFromId(pMnode, pTrans, pTask);
}
return 0;
}
}
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t transId) {
pMsg->streamId = pId->streamId;
pMsg->taskId = pId->taskId;
pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}
static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
void *pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
}
return code;
}
// build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock);
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo);
if (code != TSDB_CODE_SUCCESS) {
destroyTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
destroyTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
return code;
}
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
}
return code;
}
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
taosWLockLatch(&pStream->lock);
SStreamTaskIter *pIter = createTaskIter(pStream);
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
int32_t code = doSetResetAction(pMnode, pTrans, pTask);
if (code != TSDB_CODE_SUCCESS) {
destroyTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
destroyTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);
return 0;
}