diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index e8c64b07c4..c3dfd3a611 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -45,6 +45,7 @@ typedef struct { */ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption); +int32_t sndInit(SSnode * pSnode); /** * @brief Stop Snode in Dnode. * diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3ce2de476e..cb13a50ff4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -592,7 +592,7 @@ typedef struct { int32_t downstreamNodeId; int32_t downstreamTaskId; int32_t childId; - int32_t oldStage; + int64_t oldStage; int8_t status; } SStreamTaskCheckRsp; @@ -760,7 +760,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) // recover and fill history void streamTaskCheckDownstream(SStreamTask* pTask); -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); bool streamTaskAllUpstreamClosed(SStreamTask* pTask); diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 47c2993014..56744e4654 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -76,9 +76,14 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { return 0; } +static int32_t smStartSnodes(SSnodeMgmt *pMgmt) { + return sndInit(pMgmt->pSnode); +} + SMgmtFunc smGetMgmtFunc() { SMgmtFunc mgmtFunc = {0}; mgmtFunc.openFp = smOpen; + mgmtFunc.startFp = (NodeStartFp)smStartSnodes; mgmtFunc.closeFp = (NodeCloseFp)smClose; mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq; mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 25400220b8..4863a1f7b7 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -143,50 +143,6 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer return 0; } -SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { - SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode)); - if (pSnode == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - pSnode->path = taosStrdup(path); - if (pSnode->path == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; - } - - pSnode->msgCb = pOption->msgCb; - pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs()); - if (pSnode->pMeta == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; - } - - if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) { - goto FAIL; - } - - stopRsync(); - startRsync(); - - return pSnode; - -FAIL: - taosMemoryFree(pSnode->path); - taosMemoryFree(pSnode); - return NULL; -} - -void sndClose(SSnode *pSnode) { - streamMetaNotifyClose(pSnode->pMeta); - streamMetaCommit(pSnode->pMeta); - streamMetaClose(pSnode->pMeta); - taosMemoryFree(pSnode->path); - taosMemoryFree(pSnode); -} - -int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } - int32_t sndStartStreamTasks(SSnode* pSnode) { int32_t code = TSDB_CODE_SUCCESS; int32_t vgId = SNODE_HANDLE; @@ -223,7 +179,7 @@ int32_t sndStartStreamTasks(SSnode* pSnode) { if (pTask->status.downstreamReady == 1) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { sndDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", - pTask->id.idStr); + pTask->id.idStr); streamLaunchFillHistoryTask(pTask); } @@ -284,7 +240,7 @@ int32_t sndRestartStreamTasks(SSnode* pSnode) { terrno = 0; sndInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, - pMeta->updateInfo.transId); + pMeta->updateInfo.transId); while (streamMetaTaskInTimer(pMeta)) { sndDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); @@ -301,11 +257,12 @@ int32_t sndRestartStreamTasks(SSnode* pSnode) { return code; } + streamMetaInitBackend(pMeta); int64_t el = taosGetTimestampMs() - st; - sndInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); + sndInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); - code = streamMetaLoadAllTasks(pSnode->pMeta); + code = streamMetaLoadAllTasks(pMeta); if (code != TSDB_CODE_SUCCESS) { sndError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); streamMetaWUnLock(pMeta); @@ -314,13 +271,64 @@ int32_t sndRestartStreamTasks(SSnode* pSnode) { } sndInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); sndResetStreamTaskStatus(pSnode); - sndStartStreamTasks(pSnode); streamMetaWUnLock(pMeta); + sndStartStreamTasks(pSnode); + code = terrno; return code; } +SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { + SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode)); + if (pSnode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pSnode->path = taosStrdup(path); + if (pSnode->path == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + + pSnode->msgCb = pOption->msgCb; + pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs()); + if (pSnode->pMeta == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + + if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) { + goto FAIL; + } + + stopRsync(); + startRsync(); + + return pSnode; + +FAIL: + taosMemoryFree(pSnode->path); + taosMemoryFree(pSnode); + return NULL; +} + +int32_t sndInit(SSnode * pSnode) { + sndResetStreamTaskStatus(pSnode); + sndStartStreamTasks(pSnode); + return 0; +} + +void sndClose(SSnode *pSnode) { + streamMetaNotifyClose(pSnode->pMeta); + streamMetaCommit(pSnode->pMeta); + streamMetaClose(pSnode->pMeta); + taosMemoryFree(pSnode->path); + taosMemoryFree(pSnode); +} + +int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; } + int32_t sndStartStreamTaskAsync(SSnode* pSnode, bool restart) { SStreamMeta* pMeta = pSnode->pMeta; int32_t vgId = pMeta->vgId; @@ -622,7 +630,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId); if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); + rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); streamMetaReleaseTask(pSnode->pMeta, pTask); char* p = NULL; streamTaskGetStatus(pTask, &p); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8ee0edcb31..6ba5529049 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -923,12 +923,12 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { } else { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); + rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); streamMetaReleaseTask(pMeta, pTask); char* p = NULL; streamTaskGetStatus(pTask, &p); - tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", + tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = TASK_DOWNSTREAM_NOT_READY; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 97eb7b79a2..a1c0fa2040 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -290,10 +290,11 @@ static void recheckDownstreamTasks(void* param, void* tmrId) { stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); } -int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage) { +int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) { SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); ASSERT(pInfo != NULL); + *oldStage = pInfo->stage; const char* id = pTask->id.idStr; if (stage == -1) { stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id, @@ -459,9 +460,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { stError( - "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, " + "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", " "not check wait for downstream task nodeUpdate, and all tasks restart", - id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage); + id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); } else { stError( "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " @@ -476,7 +477,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, + stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer); } @@ -926,7 +927,7 @@ int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->oldStage) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -941,7 +942,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->oldStage) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; tEndDecode(pDecoder); return 0; diff --git a/tests/system-test/7-tmq/tmqVnodeReplicate.py b/tests/system-test/7-tmq/tmqVnodeReplicate.py index fd8ece02e0..afb3319491 100644 --- a/tests/system-test/7-tmq/tmqVnodeReplicate.py +++ b/tests/system-test/7-tmq/tmqVnodeReplicate.py @@ -105,7 +105,6 @@ class TDTestCase: topicNameList = ['topic1'] # expectRowsList = [] - tmqCom.initConsumerTable("cdb", self.replicaVar) tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) diff --git a/tests/system-test/8-stream/snodeRestart.py b/tests/system-test/8-stream/snodeRestart.py index 6adf874ecd..d2b9062c6e 100644 --- a/tests/system-test/8-stream/snodeRestart.py +++ b/tests/system-test/8-stream/snodeRestart.py @@ -15,40 +15,59 @@ from util.common import * from util.cluster import * class TDTestCase: + updatecfgDict = {'checkpointInterval': 1100} + print("===================: ", updatecfgDict) + def init(self, conn, logSql, replicaVar=1): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False) def case1(self): - tdLog.debug("case1 start") + tdLog.debug("========case1 start========") os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &") - time.sleep(1) + time.sleep(4) tdSql.query("use test") tdSql.query("create snode on dnode 4") - tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart, sum(voltage) from meters partition by groupid interval(4s)") - tdLog.debug("create stream use snode and insert data") - time.sleep(10) + tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") + tdLog.debug("========create stream useing snode and insert data ok========") + time.sleep(4) tdDnodes = cluster.dnodes tdDnodes[3].stoptaosd() time.sleep(2) tdDnodes[3].starttaosd() - tdLog.debug("snode restart ok") + tdLog.debug("========snode restart ok========") - time.sleep(500) + time.sleep(30) os.system("kill -9 `pgrep taosBenchmark`") + tdLog.debug("========stop insert ok========") + time.sleep(2) - tdSql.query("select sum(voltage) from meters partition by groupid interval(4s)") - # tdSql.checkRows(1) - # + tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart") + rowCnt = tdSql.getRows() + results = [] + for i in range(rowCnt): + results.append(tdSql.getData(i,1)) + + tdSql.query("select * from st1 order by groupid,_wstart") + tdSql.checkRows(rowCnt) + for i in range(rowCnt): + data1 = tdSql.getData(i,1) + data2 = results[i] + if data1 != data2: + tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2)) + tdLog.exit("check data error!") # tdSql.checkData(0, 0, '2016-01-01 08:00:07.000') # tdSql.checkData(0, 1, 2000) - tdSql.query("drop snode on dnode 4") - tdSql.query("drop stream if exists s1") - tdSql.query("drop database test") + # tdLog.debug("========sleep 500s========") + # time.sleep(500) + # + # tdSql.query("drop snode on dnode 4") + # tdSql.query("drop stream if exists s1") + # tdSql.query("drop database test") tdLog.debug("case1 end") diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 81f98fea22..795132b14e 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -582,7 +582,7 @@ if __name__ == "__main__": tdDnodes.setAsan(asan) tdDnodes.stopAll() for dnode in tdDnodes.dnodes: - tdDnodes.deploy(dnode.index,{}) + tdDnodes.deploy(dnode.index,updateCfgDict) for dnode in tdDnodes.dnodes: tdDnodes.starttaosd(dnode.index) tdCases.logSql(logSql)