refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-25 16:55:05 +08:00
parent d4bab8c09b
commit 43c035678f
2 changed files with 578 additions and 0 deletions

View File

@ -0,0 +1,297 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndStream.h"
#include "mndTrans.h"
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
saveStreamTasksInfo(pStream, &execInfo);
sdbRelease(pSdb, pStream);
}
}
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64,
pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId);
pNodeEntry->stageUpdated = true;
pTaskEntry->stage = stage;
break;
}
}
}
static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pInfo) {
int32_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
SFailedCheckpointInfo* p = taosArrayGet(pList, i);
if (p->transId == pInfo->transId) {
return;
}
}
taosArrayPush(pList, pInfo);
}
static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
}
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
// todo extract method, with pause stream task
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosWUnLockLatch(&pStream->lock);
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
continue;
}
if (!hasEpset) {
taosMemoryFree(pReq);
continue;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return terrno;
}
}
}
taosWUnLockLatch(&pStream->lock);
int32_t code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t code = TSDB_CODE_SUCCESS;
mndKillTransImpl(pMnode, transId, "");
SStreamObj *pStream = mndGetStreamObj(pMnode, streamId);
if (pStream == NULL) {
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
} else {
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
if (conflict) {
mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetSTbName);
} else {
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, transId);
code = createStreamResetStatusTrans(pMnode, pStream);
}
}
mndReleaseStream(pMnode, pStream);
return code;
}
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
mInfo("set node expired for %d nodes", num);
for (int k = 0; k < num; ++k) {
int32_t *pVgId = taosArrayGet(pNodeList, k);
mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
pNodeEntry->stageUpdated = true;
break;
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
SArray *pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
streamMetaClearHbMsg(&req);
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
tDecoderClear(&decoder);
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execInfo.lock);
// extract stream task list
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
if (numOfExisted == 0) {
doExtractTasksFromStream(pMnode);
}
initStreamNodeList(pMnode);
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) {
mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId);
setNodeEpsetExpiredFlag(req.pUpdateNodes);
}
bool snodeChanged = false;
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
continue;
}
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
if (pTaskEntry->nodeId == SNODE_HANDLE) {
snodeChanged = true;
}
} else {
// task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging) {
pTaskEntry->inputQUnchangeCounter++;
} else {
pTaskEntry->inputQChanging = false;
}
} else {
pTaskEntry->inputQChanging = true;
pTaskEntry->inputQUnchangeCounter = 0;
}
streamTaskStatusCopy(pTaskEntry, p);
if (p->checkpointId != 0) {
if (p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
SFailedCheckpointInfo info = {
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
addIntoCheckpointList(pList, &info);
}
}
}
if (p->status == pTaskEntry->status) {
pTaskEntry->statusLastDuration++;
} else {
pTaskEntry->status = p->status;
pTaskEntry->statusLastDuration = 0;
}
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
}
}
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pList) > 0) {
bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId);
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
}
} else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
}
}
taosThreadMutexUnlock(&execInfo.lock);
streamMetaClearHbMsg(&req);
taosArrayDestroy(pList);
return TSDB_CODE_SUCCESS;
}

View File

@ -0,0 +1,281 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndStream.h"
#include "mndTrans.h"
#include "tmisce.h"
#include "mndVgroup.h"
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
*allReady = true;
SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) {
break;
}
SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
// if not all ready till now, no need to check the remaining vgroups.
if (*allReady) {
for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (!pVgroup->vnodeGid[i].syncRestore) {
mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
*allReady = false;
break;
}
ESyncState state = pVgroup->vnodeGid[i].syncState;
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId);
*allReady = false;
break;
}
}
}
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pVgroup);
}
SSnodeObj *pObj = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
SNodeEntry entry = {0};
addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
entry.nodeId = SNODE_HANDLE;
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pObj);
}
return pVgroupListSnapshot;
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
if (pStream->uid == streamId) {
sdbCancelFetch(pSdb, pIter);
return pStream;
}
sdbRelease(pSdb, pStream);
}
return NULL;
}
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill active transId:%d in Db:%s", transId, pDbName);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
}
}
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
*hasEpset = false;
pEpSet->numOfEps = 0;
if (nodeId == SNODE_HANDLE) {
SSnodeObj *pObj = NULL;
void *pIter = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter != NULL) {
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pMnode->pSdb, pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
*hasEpset = true;
return TSDB_CODE_SUCCESS;
} else {
mError("failed to acquire snode epset");
return TSDB_CODE_INVALID_PARA;
}
} else {
SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
if (pVgObj != NULL) {
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
epsetAssign(pEpSet, &epset);
*hasEpset = true;
return TSDB_CODE_SUCCESS;
} else {
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
return TSDB_CODE_SUCCESS;
}
}
}
static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pReq);
return -1;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (pTask->id.taskId == pId->taskId) {
return pTask;
}
}
}
return NULL;
}
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
int32_t num = 0;
for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
SArray* pLevel = taosArrayGetP(pStream->tasks, i);
num += taosArrayGetSize(pLevel);
}
return num;
}
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
}
}
return 0;
}
static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps);
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pReq);
return -1;
}
// no valid epset, return directly without redoAction
if (!hasEpset) {
taosMemoryFree(pReq);
return TSDB_CODE_SUCCESS;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SArray *tasks = pStream->tasks;
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doPauseStreamTask(pMnode, pTrans, pTask) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
}
}
return 0;
}