Merge pull request #26810 from taosdata/fix/TD-30989-scan1-4-2

fix/TD-30989-scan1-4-2
This commit is contained in:
Hongze Cheng 2024-07-29 13:08:32 +08:00 committed by GitHub
commit 82421f5833
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 42 additions and 24 deletions

View File

@ -224,7 +224,7 @@ int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeOb
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &createReq); (void)tSerializeSCreateDropMQSNodeReq(pReq, contLen, &createReq);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
@ -252,7 +252,7 @@ static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_RETURN(code); TAOS_RETURN(code);
} }
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); (void)tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
@ -330,7 +330,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj[33] = {0}; char obj[33] = {0};
sprintf(obj, "%d", createReq.dnodeId); (void)sprintf(obj, "%d", createReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "createQnode", "", obj, createReq.sql, createReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "createQnode", "", obj, createReq.sql, createReq.sqlLen);
_OVER: _OVER:
@ -383,7 +383,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_RETURN(code); TAOS_RETURN(code);
} }
tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); (void)tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
@ -459,7 +459,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char obj[33] = {0}; char obj[33] = {0};
sprintf(obj, "%d", dropReq.dnodeId); (void)sprintf(obj, "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropQnode", "", obj, dropReq.sql, dropReq.sqlLen); auditRecord(pReq, pMnode->clusterId, "dropQnode", "", obj, dropReq.sql, dropReq.sqlLen);
@ -531,7 +531,7 @@ static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp); (void)tSerializeSQnodeListRsp(pRsp, rspLen, &qlistRsp);
pReq->info.rspLen = rspLen; pReq->info.rspLen = rspLen;
pReq->info.rsp = pRsp; pReq->info.rsp = pRsp;
@ -556,15 +556,15 @@ static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
cols = 0; cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false); (void)colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0}; char ep[TSDB_EP_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(ep, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(ep, pObj->pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)ep, false); (void)colDataSetVal(pColInfo, numOfRows, (const char *)ep, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false); (void)colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);

View File

@ -27,7 +27,7 @@ int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
void mndPostProcessQueryMsg(SRpcMsg *pMsg) { void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return; if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); (void)qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg);
} }
int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) { int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) {
@ -134,7 +134,10 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
rsp.msgLen = reqMsg.info.rspLen; rsp.msgLen = reqMsg.info.rspLen;
rsp.msg = reqMsg.info.rsp; rsp.msg = reqMsg.info.rsp;
taosArrayPush(batchRsp.pRsps, &rsp); if (taosArrayPush(batchRsp.pRsps, &rsp) == NULL) {
mError("msg:%p, failed to put array since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
}
} }
rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp); rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);

View File

@ -115,7 +115,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
} else { } else {
pInfo->type = TASK_OUTPUT__TABLE; pInfo->type = TASK_OUTPUT__TABLE;
pInfo->tbSink.stbUid = pStream->targetStbUid; pInfo->tbSink.stbUid = pStream->targetStbUid;
memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
if (pInfo->tbSink.pSchemaWrapper == NULL) { if (pInfo->tbSink.pSchemaWrapper == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -145,7 +145,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr
int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList); int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
if (isShuffle) { if (isShuffle) {
memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs); int32_t numOfVgroups = taosArrayGetSize(pVgs);
@ -363,10 +363,14 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh
static void addNewTaskList(SStreamObj* pStream) { static void addNewTaskList(SStreamObj* pStream) {
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &pTaskList); if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
mError("failed to put array");
}
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
pTaskList = taosArrayInit(0, POINTER_BYTES); pTaskList = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->pHTasksList, &pTaskList); if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
mError("failed to put array");
}
} }
} }
@ -584,10 +588,15 @@ static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset)
} }
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) { static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task); int32_t code = 0;
if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
mError("failed bind task to sink task since %s", tstrerror(code));
}
for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) { for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k); SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
streamTaskSetUpstreamInfo(pSinkTask, task); if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
mError("failed bind task to sink task since %s", tstrerror(code));
}
} }
mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr); mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
} }
@ -604,6 +613,7 @@ static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
} }
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) { static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
int32_t code = 0;
SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL); SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
@ -614,12 +624,15 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b
if (hasExtraSink) { if (hasExtraSink) {
bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask); bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
} else { } else {
mndSetSinkTaskInfo(pStream, pSourceTask); if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
mError("failed bind task to sink task since %s", tstrerror(code));
}
} }
} }
} }
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
int32_t code = 0;
size_t size = taosArrayGetSize(tasks); size_t size = taosArrayGetSize(tasks);
ASSERT(size >= 2); ASSERT(size >= 2);
SArray* pDownTaskList = taosArrayGetP(tasks, size - 1); SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
@ -631,7 +644,9 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i); SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
pUpTask->info.selfChildId = i - begin; pUpTask->info.selfChildId = i - begin;
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
streamTaskSetUpstreamInfo(*pDownTask, pUpTask); if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
mError("failed bind task to sink task since %s", tstrerror(code));
}
} }
mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr); mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
} }

View File

@ -158,7 +158,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
showObj.id = showId; showObj.id = showId;
showObj.pMnode = pMnode; showObj.pMnode = pMnode;
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb)); showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); (void)memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN); tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000; int32_t keepTime = tsShellActivityTimer * 6 * 1000;
@ -270,9 +270,9 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type); mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
if (retrieveReq.user[0] != 0) { if (retrieveReq.user[0] != 0) {
memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN); (void)memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
} else { } else {
memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1); (void)memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
} }
code = -1; code = -1;
if (retrieveReq.db[0] && if (retrieveReq.db[0] &&
@ -303,10 +303,10 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
idata.info.bytes = p->bytes; idata.info.bytes = p->bytes;
idata.info.type = p->type; idata.info.type = p->type;
idata.info.colId = p->colId; idata.info.colId = p->colId;
blockDataAppendColInfo(pBlock, &idata); TAOS_CHECK_RETURN(blockDataAppendColInfo(pBlock, &idata));
} }
blockDataEnsureCapacity(pBlock, rowsToRead); TAOS_CHECK_RETURN(blockDataEnsureCapacity(pBlock, rowsToRead));
if (mndCheckRetrieveFinished(pShow)) { if (mndCheckRetrieveFinished(pShow)) {
mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows); mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);