diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6ca6b176d0..5de0c3180b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -656,7 +656,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea typedef struct STaskStatusEntry { STaskId id; int32_t status; - int32_t stage; + int64_t stage; int32_t nodeId; int64_t verStart; // start version in WAL, only valid for source task int64_t verEnd; // end version in WAL, only valid for source task diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5f44d3e7fc..481c671d0f 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -165,7 +165,7 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 15 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fd0c349dd2..221518f4c3 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2053,6 +2053,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP epsetAssign(&updateInfo.newEp, &pCurrent->epset); taosArrayPush(info.pUpdateNodeList, &updateInfo); + + } + if(pCurrent->nodeId != SNODE_HANDLE){ SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId); taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); mndReleaseVgroup(pMnode, pVgroup); @@ -2107,6 +2110,24 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) { sdbRelease(pSdb, pVgroup); } + SSnodeObj *pObj = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + SNodeEntry entry = {0}; + addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); + entry.nodeId = SNODE_HANDLE; + + char buf[256] = {0}; + EPSET_TO_STR(&entry.epset, buf); + mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); + taosArrayPush(pVgroupListSnapshot, &entry); + sdbRelease(pSdb, pObj); + } + return pVgroupListSnapshot; } @@ -2284,6 +2305,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { STaskId* pId = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if(pEntry->nodeId == SNODE_HANDLE) continue; + bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); if (!existed) { taosArrayPush(pRemovedTasks, pId); @@ -2629,13 +2652,13 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { return TSDB_CODE_SUCCESS; } -static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { +static void updateStageInfo(STaskStatusEntry* pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); for(int32_t j = 0; j < numOfNodes; ++j) { SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); if (pNodeEntry->nodeId == pTaskEntry->nodeId) { - mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, + mInfo("vgId:%d stage updated from %"PRId64 " to %"PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId); pNodeEntry->stageUpdated = true; @@ -2672,6 +2695,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { setNodeEpsetExpiredFlag(req.pUpdateNodes); + bool snodeChanged = false; for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); @@ -2682,6 +2706,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); + if(pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true; } else { streamTaskStatusCopy(pTaskEntry, p); if (p->activeCheckpointId != 0) { @@ -2710,7 +2735,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady); taosArrayDestroy(p); - if (allReady) { + if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", execInfo.activeCheckpoint); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4539afb1b3..0a3c44441c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -770,7 +770,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1; - if (tEncodeI32(pEncoder, ps->stage) < 0) return -1; + if (tEncodeI64(pEncoder, ps->stage) < 0) return -1; if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; @@ -808,7 +808,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &taskId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; @@ -893,7 +893,7 @@ void metaHbToMnode(void* param, void* tmrId) { SStreamHbMsg hbMsg = {0}; SEpSet epset = {0}; bool hasMnodeEpset = false; - int32_t stage = 0; + int64_t stage = 0; streamMetaRLock(pMeta);