fix:add logic when snode restart

This commit is contained in:
wangmm0220 2023-11-13 18:39:42 +08:00
parent 6a44bd54ac
commit 1cbe568be7
4 changed files with 33 additions and 8 deletions

View File

@ -656,7 +656,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
typedef struct STaskStatusEntry {
STaskId id;
int32_t status;
int32_t stage;
int64_t stage;
int32_t nodeId;
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task

View File

@ -165,7 +165,7 @@ static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
{.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
// {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},

View File

@ -2053,6 +2053,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
taosArrayPush(info.pUpdateNodeList, &updateInfo);
}
if(pCurrent->nodeId != SNODE_HANDLE){
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
mndReleaseVgroup(pMnode, pVgroup);
@ -2107,6 +2110,24 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) {
sdbRelease(pSdb, pVgroup);
}
SSnodeObj *pObj = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
SNodeEntry entry = {0};
addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
entry.nodeId = SNODE_HANDLE;
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pObj);
}
return pVgroupListSnapshot;
}
@ -2284,6 +2305,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
STaskId* pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
if(pEntry->nodeId == SNODE_HANDLE) continue;
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
taosArrayPush(pRemovedTasks, pId);
@ -2629,13 +2652,13 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
return TSDB_CODE_SUCCESS;
}
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
mInfo("vgId:%d stage updated from %"PRId64 " to %"PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
pNodeEntry->stageUpdated = true;
@ -2672,6 +2695,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
setNodeEpsetExpiredFlag(req.pUpdateNodes);
bool snodeChanged = false;
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
@ -2682,6 +2706,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
if(pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true;
} else {
streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) {
@ -2710,7 +2735,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady) {
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
execInfo.activeCheckpoint);

View File

@ -770,7 +770,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
if (tEncodeI32(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI64(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
@ -808,7 +808,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.status) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1;
if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1;
@ -893,7 +893,7 @@ void metaHbToMnode(void* param, void* tmrId) {
SStreamHbMsg hbMsg = {0};
SEpSet epset = {0};
bool hasMnodeEpset = false;
int32_t stage = 0;
int64_t stage = 0;
streamMetaRLock(pMeta);