fix:snode upport stream task

This commit is contained in:
wangmm0220 2023-11-16 21:19:58 +08:00
parent a16189d2a9
commit a5de1b032b
9 changed files with 108 additions and 75 deletions

View File

@ -45,6 +45,7 @@ typedef struct {
*/ */
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption); SSnode *sndOpen(const char *path, const SSnodeOpt *pOption);
int32_t sndInit(SSnode * pSnode);
/** /**
* @brief Stop Snode in Dnode. * @brief Stop Snode in Dnode.
* *

View File

@ -592,7 +592,7 @@ typedef struct {
int32_t downstreamNodeId; int32_t downstreamNodeId;
int32_t downstreamTaskId; int32_t downstreamTaskId;
int32_t childId; int32_t childId;
int32_t oldStage; int64_t oldStage;
int8_t status; int8_t status;
} SStreamTaskCheckRsp; } SStreamTaskCheckRsp;
@ -760,7 +760,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen)
// recover and fill history // recover and fill history
void streamTaskCheckDownstream(SStreamTask* pTask); void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
bool streamTaskAllUpstreamClosed(SStreamTask* pTask); bool streamTaskAllUpstreamClosed(SStreamTask* pTask);

View File

@ -76,9 +76,14 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
return 0; return 0;
} }
static int32_t smStartSnodes(SSnodeMgmt *pMgmt) {
return sndInit(pMgmt->pSnode);
}
SMgmtFunc smGetMgmtFunc() { SMgmtFunc smGetMgmtFunc() {
SMgmtFunc mgmtFunc = {0}; SMgmtFunc mgmtFunc = {0};
mgmtFunc.openFp = smOpen; mgmtFunc.openFp = smOpen;
mgmtFunc.startFp = (NodeStartFp)smStartSnodes;
mgmtFunc.closeFp = (NodeCloseFp)smClose; mgmtFunc.closeFp = (NodeCloseFp)smClose;
mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq; mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq;
mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq; mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq;

View File

@ -143,50 +143,6 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
return 0; return 0;
} }
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
if (pSnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pSnode->path = taosStrdup(path);
if (pSnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs());
if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) {
goto FAIL;
}
stopRsync();
startRsync();
return pSnode;
FAIL:
taosMemoryFree(pSnode->path);
taosMemoryFree(pSnode);
return NULL;
}
void sndClose(SSnode *pSnode) {
streamMetaNotifyClose(pSnode->pMeta);
streamMetaCommit(pSnode->pMeta);
streamMetaClose(pSnode->pMeta);
taosMemoryFree(pSnode->path);
taosMemoryFree(pSnode);
}
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
int32_t sndStartStreamTasks(SSnode* pSnode) { int32_t sndStartStreamTasks(SSnode* pSnode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = SNODE_HANDLE; int32_t vgId = SNODE_HANDLE;
@ -301,11 +257,12 @@ int32_t sndRestartStreamTasks(SSnode* pSnode) {
return code; return code;
} }
streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
sndInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); sndInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
code = streamMetaLoadAllTasks(pSnode->pMeta); code = streamMetaLoadAllTasks(pMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sndError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); sndError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -314,13 +271,64 @@ int32_t sndRestartStreamTasks(SSnode* pSnode) {
} }
sndInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); sndInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
sndResetStreamTaskStatus(pSnode); sndResetStreamTaskStatus(pSnode);
sndStartStreamTasks(pSnode);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
sndStartStreamTasks(pSnode);
code = terrno; code = terrno;
return code; return code;
} }
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
if (pSnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pSnode->path = taosStrdup(path);
if (pSnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs());
if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) {
goto FAIL;
}
stopRsync();
startRsync();
return pSnode;
FAIL:
taosMemoryFree(pSnode->path);
taosMemoryFree(pSnode);
return NULL;
}
int32_t sndInit(SSnode * pSnode) {
sndResetStreamTaskStatus(pSnode);
sndStartStreamTasks(pSnode);
return 0;
}
void sndClose(SSnode *pSnode) {
streamMetaNotifyClose(pSnode->pMeta);
streamMetaCommit(pSnode->pMeta);
streamMetaClose(pSnode->pMeta);
taosMemoryFree(pSnode->path);
taosMemoryFree(pSnode);
}
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
int32_t sndStartStreamTaskAsync(SSnode* pSnode, bool restart) { int32_t sndStartStreamTaskAsync(SSnode* pSnode, bool restart) {
SStreamMeta* pMeta = pSnode->pMeta; SStreamMeta* pMeta = pSnode->pMeta;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
@ -622,7 +630,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
char* p = NULL; char* p = NULL;
streamTaskGetStatus(pTask, &p); streamTaskGetStatus(pTask, &p);

View File

@ -923,12 +923,12 @@ int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
} else { } else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
char* p = NULL; char* p = NULL;
streamTaskGetStatus(pTask, &p); streamTaskGetStatus(pTask, &p);
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = TASK_DOWNSTREAM_NOT_READY; rsp.status = TASK_DOWNSTREAM_NOT_READY;

View File

@ -290,10 +290,11 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
} }
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage) { int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId); SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
ASSERT(pInfo != NULL); ASSERT(pInfo != NULL);
*oldStage = pInfo->stage;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
if (stage == -1) { if (stage == -1) {
stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id, stDebug("s-task:%s receive check msg from upstream task:0x%x(vgId:%d), invalid stageId:%" PRId64 ", not ready", id,
@ -459,9 +460,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) { if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
stError( stError(
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, " "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", "
"not check wait for downstream task nodeUpdate, and all tasks restart", "not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, (int32_t)pTask->pMeta->stage); id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
} else { } else {
stError( stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
@ -476,7 +477,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer); pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
} }
@ -926,7 +927,7 @@ int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp*
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->oldStage) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->oldStage) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
@ -941,7 +942,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->oldStage) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->oldStage) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;

View File

@ -105,7 +105,6 @@ class TDTestCase:
topicNameList = ['topic1'] topicNameList = ['topic1']
# expectRowsList = [] # expectRowsList = []
tmqCom.initConsumerTable("cdb", self.replicaVar)
tdLog.info("create topics from stb with filter") tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])

View File

@ -15,40 +15,59 @@ from util.common import *
from util.cluster import * from util.cluster import *
class TDTestCase: class TDTestCase:
updatecfgDict = {'checkpointInterval': 1100}
print("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False) tdSql.init(conn.cursor(), False)
def case1(self): def case1(self):
tdLog.debug("case1 start") tdLog.debug("========case1 start========")
os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &") os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000 -i 1000 -v 2 > /dev/null 2>&1 &")
time.sleep(1) time.sleep(4)
tdSql.query("use test") tdSql.query("use test")
tdSql.query("create snode on dnode 4") tdSql.query("create snode on dnode 4")
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart, sum(voltage) from meters partition by groupid interval(4s)") tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
tdLog.debug("create stream use snode and insert data") tdLog.debug("========create stream useing snode and insert data ok========")
time.sleep(10) time.sleep(4)
tdDnodes = cluster.dnodes tdDnodes = cluster.dnodes
tdDnodes[3].stoptaosd() tdDnodes[3].stoptaosd()
time.sleep(2) time.sleep(2)
tdDnodes[3].starttaosd() tdDnodes[3].starttaosd()
tdLog.debug("snode restart ok") tdLog.debug("========snode restart ok========")
time.sleep(500) time.sleep(30)
os.system("kill -9 `pgrep taosBenchmark`") os.system("kill -9 `pgrep taosBenchmark`")
tdLog.debug("========stop insert ok========")
time.sleep(2)
tdSql.query("select sum(voltage) from meters partition by groupid interval(4s)") tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
# tdSql.checkRows(1) rowCnt = tdSql.getRows()
# results = []
for i in range(rowCnt):
results.append(tdSql.getData(i,1))
tdSql.query("select * from st1 order by groupid,_wstart")
tdSql.checkRows(rowCnt)
for i in range(rowCnt):
data1 = tdSql.getData(i,1)
data2 = results[i]
if data1 != data2:
tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2))
tdLog.exit("check data error!")
# tdSql.checkData(0, 0, '2016-01-01 08:00:07.000') # tdSql.checkData(0, 0, '2016-01-01 08:00:07.000')
# tdSql.checkData(0, 1, 2000) # tdSql.checkData(0, 1, 2000)
tdSql.query("drop snode on dnode 4") # tdLog.debug("========sleep 500s========")
tdSql.query("drop stream if exists s1") # time.sleep(500)
tdSql.query("drop database test") #
# tdSql.query("drop snode on dnode 4")
# tdSql.query("drop stream if exists s1")
# tdSql.query("drop database test")
tdLog.debug("case1 end") tdLog.debug("case1 end")

View File

@ -582,7 +582,7 @@ if __name__ == "__main__":
tdDnodes.setAsan(asan) tdDnodes.setAsan(asan)
tdDnodes.stopAll() tdDnodes.stopAll()
for dnode in tdDnodes.dnodes: for dnode in tdDnodes.dnodes:
tdDnodes.deploy(dnode.index,{}) tdDnodes.deploy(dnode.index,updateCfgDict)
for dnode in tdDnodes.dnodes: for dnode in tdDnodes.dnodes:
tdDnodes.starttaosd(dnode.index) tdDnodes.starttaosd(dnode.index)
tdCases.logSql(logSql) tdCases.logSql(logSql)