fix(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-09-26 09:32:57 +08:00
parent 6cc3618ed9
commit d214a2f6bd
7 changed files with 17 additions and 18 deletions

View File

@ -739,6 +739,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta);
void streamMetaStartHb(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta);
void streamMetaInitForSnode(SStreamMeta* pMeta);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -403,7 +403,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, 0) != CFG_SCOPE_SERVER) return -1; if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, 0) != CFG_SCOPE_SERVER) return -1;
if (cfgAddInt32(pCfg, "stDebugFlag", stDebugFlag, 0, 255, CFG_SCOPE_CLIENT) != 0) return -1; if (cfgAddInt32(pCfg, "stDebugFlag", stDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
return 0; return 0;
} }

View File

@ -112,18 +112,16 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }
pSnode->msgCb = pOption->msgCb;
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, -1); pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, -1);
if (pSnode->pMeta == NULL) { if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }
// todo fix it: send msg to mnode to rollback to an existed checkpoint, and broadcast the rollback msg to all other // todo fix it: send msg to mnode to rollback to an existed checkpoint
// computing nodes. streamMetaInitForSnode(pSnode->pMeta);
pSnode->pMeta->stage = 0;
return pSnode; return pSnode;
FAIL: FAIL:
@ -258,10 +256,11 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = htonl(pRsp->upstreamTaskId); pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
int64_t streamId = htobe64(pRsp->streamId); pRsp->streamId = htobe64(pRsp->streamId);
pRsp->msgId = htonl(pRsp->msgId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, streamId, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, pRsp->upstreamTaskId);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);

View File

@ -1083,12 +1083,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
// SStreamDataBlock* pMsgBlock = pTask->msgInfo.pData;
// if (pMsgBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id);
// streamProcessCheckpointReadyMsg(pTask);
// }
} else { } else {
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));

View File

@ -998,3 +998,8 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
*pRid = pMeta->rid; *pRid = pMeta->rid;
metaHbToMnode(pRid, NULL); metaHbToMnode(pRid, NULL);
} }
void streamMetaInitForSnode(SStreamMeta* pMeta) {
pMeta->stage = 0;
pMeta->leader = true;
}

View File

@ -22,7 +22,7 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor()) tdSql.init(conn.cursor(), True)
self.setsql = TDSetSql() self.setsql = TDSetSql()
self.dbname = 'db' self.dbname = 'db'
self.stbname = 'stb' self.stbname = 'stb'
@ -217,7 +217,7 @@ class TDTestCase:
tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.checkEqual(20470,len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdSql.checkEqual(193, len(tdSql.queryResult)) tdSql.checkEqual(194, len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult)) tdSql.checkEqual(54, len(tdSql.queryResult))

View File

@ -55,7 +55,7 @@ class TDTestCase:
tdSql.checkData(0, 2, 0) tdSql.checkData(0, 2, 0)
tdSql.query("show dnode 1 variables like '%debugFlag'") tdSql.query("show dnode 1 variables like '%debugFlag'")
tdSql.checkRows(21) tdSql.checkRows(22)
tdSql.query("show dnode 1 variables like '____debugFlag'") tdSql.query("show dnode 1 variables like '____debugFlag'")
tdSql.checkRows(2) tdSql.checkRows(2)