enh:[TD-31063] Handling return value in qnode.c

This commit is contained in:
sima 2024-07-22 10:58:55 +08:00
parent 7e11384fad
commit 32faa6fccc
6 changed files with 151 additions and 61 deletions

View File

@ -34,9 +34,10 @@ typedef struct {
* @brief Start one Qnode in Dnode. * @brief Start one Qnode in Dnode.
* *
* @param pOption Option of the qnode. * @param pOption Option of the qnode.
* @return SQnode* The qnode object. * @param pQnode The qnode object.
* @return int32_t The error code.
*/ */
SQnode *qndOpen(const SQnodeOpt *pOption); int32_t qndOpen(const SQnodeOpt *pOption, SQnode **pQnode);
/** /**
* @brief Stop Qnode in Dnode. * @brief Stop Qnode in Dnode.

View File

@ -34,11 +34,8 @@ static void qmClose(SQnodeMgmt *pMgmt) {
} }
static int32_t qndOpenWrapper(SQnodeOpt *pOption, SQnode **pQnode) { static int32_t qndOpenWrapper(SQnodeOpt *pOption, SQnode **pQnode) {
*pQnode = qndOpen(pOption); int32_t code = qndOpen(pOption, pQnode);
if (*pQnode == NULL) { return code;
return terrno;
}
return 0;
} }
static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
int32_t code = 0; int32_t code = 0;
@ -62,7 +59,7 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
if (code != 0) { if (code != 0) {
dError("failed to open qnode since %s", tstrerror(code)); dError("failed to open qnode since %s", tstrerror(code));
qmClose(pMgmt); qmClose(pMgmt);
return -1; return code;
} }
tmsgReportStartup("qnode-impl", "initialized"); tmsgReportStartup("qnode-impl", "initialized");

View File

@ -48,8 +48,8 @@ static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
} }
int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
qndPreprocessQueryMsg(pMgmt->pQnode, pMsg); int32_t code = qndPreprocessQueryMsg(pMgmt->pQnode, pMsg);
if (code) return code;
return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
} }

View File

@ -19,21 +19,22 @@
#include "query.h" #include "query.h"
#include "qworker.h" #include "qworker.h"
SQnode *qndOpen(const SQnodeOpt *pOption) { int32_t qndOpen(const SQnodeOpt *pOption, SQnode **pQnode) {
SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode)); *pQnode = taosMemoryCalloc(1, sizeof(SQnode));
if (NULL == pQnode) { if (NULL == *pQnode) {
qError("calloc SQnode failed"); qError("calloc SQnode failed");
return NULL; return TSDB_CODE_OUT_OF_MEMORY;
} }
pQnode->qndId = QNODE_HANDLE; (*pQnode)->qndId = QNODE_HANDLE;
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) { int32_t code = qWorkerInit(NODE_TYPE_QNODE, (*pQnode)->qndId, (void **)&(*pQnode)->pQuery, &pOption->msgCb);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFreeClear(pQnode); taosMemoryFreeClear(pQnode);
return NULL; return code;
} }
pQnode->msgCb = pOption->msgCb; (*pQnode)->msgCb = pOption->msgCb;
return pQnode; return TSDB_CODE_SUCCESS;
} }
void qndClose(SQnode *pQnode) { void qndClose(SQnode *pQnode) {

View File

@ -1542,9 +1542,14 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t
// slow path: search TDB // slow path: search TDB
int64_t ctbNum = 0; int64_t ctbNum = 0;
int32_t colNum = 0; int32_t colNum = 0;
vnodeGetCtbNum(pVnode, uid, &ctbNum); code = vnodeGetCtbNum(pVnode, uid, &ctbNum);
vnodeGetStbColumnNum(pVnode, uid, &colNum); if (TSDB_CODE_SUCCESS == code) {
code = vnodeGetStbColumnNum(pVnode, uid, &colNum);
}
metaULock(pVnodeObj->pMeta); metaULock(pVnodeObj->pMeta);
if (TSDB_CODE_SUCCESS != code) {
goto _exit;
}
if (numOfTables) *numOfTables = ctbNum; if (numOfTables) *numOfTables = ctbNum;
if (numOfCols) *numOfCols = colNum; if (numOfCols) *numOfCols = colNum;

View File

@ -47,7 +47,7 @@ int32_t fillTableColCmpr(SMetaReader *reader, SSchemaExt *pExt, int32_t numOfCol
return 0; return 0;
} }
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
STableInfoReq infoReq = {0}; STableInfoReq infoReq = {0};
STableMetaRsp metaRsp = {0}; STableMetaRsp metaRsp = {0};
SMetaReader mer1 = {0}; SMetaReader mer1 = {0};
@ -67,10 +67,10 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
} }
metaRsp.dbId = pVnode->config.dbId; metaRsp.dbId = pVnode->config.dbId;
strcpy(metaRsp.tbName, infoReq.tbName); (void)strcpy(metaRsp.tbName, infoReq.tbName);
memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName)); (void)memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName); (void)sprintf(tableFName, "%s.%s", infoReq.dbFName, infoReq.tbName);
code = vnodeValidateTableHash(pVnode, tableFName); code = vnodeValidateTableHash(pVnode, tableFName);
if (code) { if (code) {
goto _exit4; goto _exit4;
@ -89,7 +89,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
metaRsp.tuid = mer1.me.uid; metaRsp.tuid = mer1.me.uid;
if (mer1.me.type == TSDB_SUPER_TABLE) { if (mer1.me.type == TSDB_SUPER_TABLE) {
strcpy(metaRsp.stbName, mer1.me.name); (void)strcpy(metaRsp.stbName, mer1.me.name);
schema = mer1.me.stbEntry.schemaRow; schema = mer1.me.stbEntry.schemaRow;
schemaTag = mer1.me.stbEntry.schemaTag; schemaTag = mer1.me.stbEntry.schemaTag;
metaRsp.suid = mer1.me.uid; metaRsp.suid = mer1.me.uid;
@ -97,7 +97,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2; if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2;
strcpy(metaRsp.stbName, mer2.me.name); (void)strcpy(metaRsp.stbName, mer2.me.name);
metaRsp.suid = mer2.me.uid; metaRsp.suid = mer2.me.uid;
schema = mer2.me.stbEntry.schemaRow; schema = mer2.me.stbEntry.schemaRow;
schemaTag = mer2.me.stbEntry.schemaTag; schemaTag = mer2.me.stbEntry.schemaTag;
@ -114,10 +114,13 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
metaRsp.tversion = schemaTag.version; metaRsp.tversion = schemaTag.version;
metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags)); metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));
metaRsp.pSchemaExt = (SSchemaExt *)taosMemoryCalloc(metaRsp.numOfColumns, sizeof(SSchemaExt)); metaRsp.pSchemaExt = (SSchemaExt *)taosMemoryCalloc(metaRsp.numOfColumns, sizeof(SSchemaExt));
if (NULL == metaRsp.pSchemas || NULL == metaRsp.pSchemaExt) {
memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols); code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
(void)memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
if (schemaTag.nCols) { if (schemaTag.nCols) {
memcpy(metaRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols); (void)memcpy(metaRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
} }
if (metaRsp.pSchemaExt) { if (metaRsp.pSchemaExt) {
SMetaReader *pReader = mer1.me.type == TSDB_CHILD_TABLE ? &mer2 : &mer1; SMetaReader *pReader = mer1.me.type == TSDB_CHILD_TABLE ? &mer2 : &mer1;
@ -148,7 +151,12 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
rspLen = tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
if (rspLen < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
_exit: _exit:
taosMemoryFree(metaRsp.pSchemas); taosMemoryFree(metaRsp.pSchemas);
@ -174,10 +182,10 @@ _exit4:
*pMsg = rpcMsg; *pMsg = rpcMsg;
} }
return TSDB_CODE_SUCCESS; return code;
} }
int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
STableCfgReq cfgReq = {0}; STableCfgReq cfgReq = {0};
STableCfgRsp cfgRsp = {0}; STableCfgRsp cfgRsp = {0};
SMetaReader mer1 = {0}; SMetaReader mer1 = {0};
@ -196,10 +204,10 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
goto _exit; goto _exit;
} }
strcpy(cfgRsp.tbName, cfgReq.tbName); (void)strcpy(cfgRsp.tbName, cfgReq.tbName);
memcpy(cfgRsp.dbFName, cfgReq.dbFName, sizeof(cfgRsp.dbFName)); (void)memcpy(cfgRsp.dbFName, cfgReq.dbFName, sizeof(cfgRsp.dbFName));
sprintf(tableFName, "%s.%s", cfgReq.dbFName, cfgReq.tbName); (void)sprintf(tableFName, "%s.%s", cfgReq.dbFName, cfgReq.tbName);
code = vnodeValidateTableHash(pVnode, tableFName); code = vnodeValidateTableHash(pVnode, tableFName);
if (code) { if (code) {
goto _exit; goto _exit;
@ -222,24 +230,36 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_LOCK); metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_LOCK);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
strcpy(cfgRsp.stbName, mer2.me.name); (void)strcpy(cfgRsp.stbName, mer2.me.name);
schema = mer2.me.stbEntry.schemaRow; schema = mer2.me.stbEntry.schemaRow;
schemaTag = mer2.me.stbEntry.schemaTag; schemaTag = mer2.me.stbEntry.schemaTag;
cfgRsp.ttl = mer1.me.ctbEntry.ttlDays; cfgRsp.ttl = mer1.me.ctbEntry.ttlDays;
cfgRsp.commentLen = mer1.me.ctbEntry.commentLen; cfgRsp.commentLen = mer1.me.ctbEntry.commentLen;
if (mer1.me.ctbEntry.commentLen > 0) { if (mer1.me.ctbEntry.commentLen > 0) {
cfgRsp.pComment = taosStrdup(mer1.me.ctbEntry.comment); cfgRsp.pComment = taosStrdup(mer1.me.ctbEntry.comment);
if (NULL == cfgRsp.pComment) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
} }
STag *pTag = (STag *)mer1.me.ctbEntry.pTags; STag *pTag = (STag *)mer1.me.ctbEntry.pTags;
cfgRsp.tagsLen = pTag->len; cfgRsp.tagsLen = pTag->len;
cfgRsp.pTags = taosMemoryMalloc(cfgRsp.tagsLen); cfgRsp.pTags = taosMemoryMalloc(cfgRsp.tagsLen);
memcpy(cfgRsp.pTags, pTag, cfgRsp.tagsLen); if (NULL == cfgRsp.pTags) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
(void)memcpy(cfgRsp.pTags, pTag, cfgRsp.tagsLen);
} else if (mer1.me.type == TSDB_NORMAL_TABLE) { } else if (mer1.me.type == TSDB_NORMAL_TABLE) {
schema = mer1.me.ntbEntry.schemaRow; schema = mer1.me.ntbEntry.schemaRow;
cfgRsp.ttl = mer1.me.ntbEntry.ttlDays; cfgRsp.ttl = mer1.me.ntbEntry.ttlDays;
cfgRsp.commentLen = mer1.me.ntbEntry.commentLen; cfgRsp.commentLen = mer1.me.ntbEntry.commentLen;
if (mer1.me.ntbEntry.commentLen > 0) { if (mer1.me.ntbEntry.commentLen > 0) {
cfgRsp.pComment = taosStrdup(mer1.me.ntbEntry.comment); cfgRsp.pComment = taosStrdup(mer1.me.ntbEntry.comment);
if (NULL == cfgRsp.pComment) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
} }
} else { } else {
ASSERT(0); ASSERT(0);
@ -250,9 +270,13 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags)); cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags));
cfgRsp.pSchemaExt = (SSchemaExt *)taosMemoryMalloc(cfgRsp.numOfColumns * sizeof(SSchemaExt)); cfgRsp.pSchemaExt = (SSchemaExt *)taosMemoryMalloc(cfgRsp.numOfColumns * sizeof(SSchemaExt));
memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols); if (NULL == cfgRsp.pSchemas || NULL == cfgRsp.pSchemaExt) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
(void)memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
if (schemaTag.nCols) { if (schemaTag.nCols) {
memcpy(cfgRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols); (void)memcpy(cfgRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
} }
// if (useCompress(cfgRsp.tableType)) { // if (useCompress(cfgRsp.tableType)) {
@ -285,7 +309,12 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp);
rspLen = tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp);
if (rspLen < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
_exit: _exit:
rpcMsg.info = pMsg->info; rpcMsg.info = pMsg->info;
@ -307,7 +336,7 @@ _exit:
tFreeSTableCfgRsp(&cfgRsp); tFreeSTableCfgRsp(&cfgRsp);
metaReaderClear(&mer2); metaReaderClear(&mer2);
metaReaderClear(&mer1); metaReaderClear(&mer1);
return TSDB_CODE_SUCCESS; return code;
} }
static FORCE_INLINE void vnodeFreeSBatchRspMsg(void *p) { static FORCE_INLINE void vnodeFreeSBatchRspMsg(void *p) {
@ -352,6 +381,10 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
for (int32_t i = 0; i < msgNum; ++i) { for (int32_t i = 0; i < msgNum; ++i) {
req = taosArrayGet(batchReq.pMsgs, i); req = taosArrayGet(batchReq.pMsgs, i);
if (req == NULL) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
reqMsg.msgType = req->msgType; reqMsg.msgType = req->msgType;
reqMsg.pCont = req->msg; reqMsg.pCont = req->msg;
@ -359,13 +392,16 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
switch (req->msgType) { switch (req->msgType) {
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
vnodeGetTableMeta(pVnode, &reqMsg, false); // error code has been set into reqMsg, no need to handle it here.
(void)vnodeGetTableMeta(pVnode, &reqMsg, false);
break; break;
case TDMT_VND_TABLE_CFG: case TDMT_VND_TABLE_CFG:
vnodeGetTableCfg(pVnode, &reqMsg, false); // error code has been set into reqMsg, no need to handle it here.
(void)vnodeGetTableCfg(pVnode, &reqMsg, false);
break; break;
case TDMT_VND_GET_STREAM_PROGRESS: case TDMT_VND_GET_STREAM_PROGRESS:
vnodeGetStreamProgress(pVnode, &reqMsg, false); // error code has been set into reqMsg, no need to handle it here.
(void)vnodeGetStreamProgress(pVnode, &reqMsg, false);
break; break;
default: default:
qError("invalid req msgType %d", req->msgType); qError("invalid req msgType %d", req->msgType);
@ -381,7 +417,11 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
rsp.rspCode = reqMsg.code; rsp.rspCode = reqMsg.code;
rsp.msg = reqMsg.pCont; rsp.msg = reqMsg.pCont;
taosArrayPush(batchRsp.pRsps, &rsp); if (NULL == taosArrayPush(batchRsp.pRsps, &rsp)) {
qError("taosArrayPush failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
} }
rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp); rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);
@ -501,7 +541,12 @@ int32_t vnodeGetTableList(void *pVnode, int8_t type, SArray *pList) {
} }
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) { int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
int32_t code = TSDB_CODE_SUCCESS;
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, uid, 1); SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, uid, 1);
if (NULL == pCur) {
qError("vnode get all table list failed");
return TSDB_CODE_FAILED;
}
while (1) { while (1) {
tb_uid_t id = metaCtbCursorNext(pCur); tb_uid_t id = metaCtbCursorNext(pCur);
@ -510,11 +555,15 @@ int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
} }
STableKeyInfo info = {uid = id}; STableKeyInfo info = {uid = id};
taosArrayPush(list, &info); if (NULL == taosArrayPush(list, &info)) {
qError("taosArrayPush failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
} }
}
_exit:
metaCloseCtbCursor(pCur); metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg) { int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg) {
@ -522,8 +571,13 @@ int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo
} }
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) { int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
int32_t code = TSDB_CODE_SUCCESS;
SVnode *pVnodeObj = pVnode; SVnode *pVnodeObj = pVnode;
SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj, suid, 1); SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj, suid, 1);
if (NULL == pCur) {
qError("vnode get all table list failed");
return TSDB_CODE_FAILED;
}
while (1) { while (1) {
tb_uid_t id = metaCtbCursorNext(pCur); tb_uid_t id = metaCtbCursorNext(pCur);
@ -531,14 +585,20 @@ int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
break; break;
} }
taosArrayPush(list, &id); if (NULL == taosArrayPush(list, &id)) {
qError("taosArrayPush failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
} }
_exit:
metaCloseCtbCursor(pCur); metaCloseCtbCursor(pCur);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list) { int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
int32_t code = TSDB_CODE_SUCCESS;
SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid); SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid);
if (!pCur) { if (!pCur) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -550,15 +610,21 @@ int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
break; break;
} }
taosArrayPush(list, &id); if (NULL == taosArrayPush(list, &id)) {
qError("taosArrayPush failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
} }
_exit:
metaCloseStbCursor(pCur); metaCloseStbCursor(pCur);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg, void *arg1), int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg, void *arg1),
void *arg) { void *arg) {
int32_t code = TSDB_CODE_SUCCESS;
SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid); SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid);
if (!pCur) { if (!pCur) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -574,11 +640,16 @@ int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo
continue; continue;
} }
taosArrayPush(list, &id); if (NULL == taosArrayPush(list, &id)) {
qError("taosArrayPush failed");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
} }
_exit:
metaCloseStbCursor(pCur); metaCloseStbCursor(pCur);
return TSDB_CODE_SUCCESS; return code;
} }
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
@ -700,16 +771,20 @@ int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) {
*num = 0; *num = 0;
int64_t arrSize = taosArrayGetSize(suidList); int64_t arrSize = taosArrayGetSize(suidList);
int32_t code = TSDB_CODE_SUCCESS;
for (int64_t i = 0; i < arrSize; ++i) { for (int64_t i = 0; i < arrSize; ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
int64_t ctbNum = 0; int64_t ctbNum = 0;
int32_t numOfCols = 0; int32_t numOfCols = 0;
metaGetStbStats(pVnode, suid, &ctbNum, &numOfCols); code = metaGetStbStats(pVnode, suid, &ctbNum, &numOfCols);
if (TSDB_CODE_SUCCESS != code) {
goto _exit;
}
*num += ctbNum * (numOfCols - 1); *num += ctbNum * (numOfCols - 1);
} }
_exit:
taosArrayDestroy(suidList); taosArrayDestroy(suidList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -728,7 +803,11 @@ int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num) {
} }
int64_t ctbNum = 0; int64_t ctbNum = 0;
vnodeGetCtbNum(pVnode, id, &ctbNum); int32_t code = vnodeGetCtbNum(pVnode, id, &ctbNum);
if (TSDB_CODE_SUCCESS != code) {
metaCloseStbCursor(pCur);
return code;
}
*num += ctbNum; *num += ctbNum;
} }
@ -771,14 +850,17 @@ int32_t vnodeGetStreamProgress(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
rsp.vgId = req.vgId; rsp.vgId = req.vgId;
rsp.streamId = req.streamId; rsp.streamId = req.streamId;
rspLen = tSerializeStreamProgressRsp(0, 0, &rsp); rspLen = tSerializeStreamProgressRsp(0, 0, &rsp);
if (rspLen < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
if (direct) { if (direct) {
buf = rpcMallocCont(rspLen); buf = rpcMallocCont(rspLen);
} else { } else {
buf = taosMemoryCalloc(1, rspLen); buf = taosMemoryCalloc(1, rspLen);
} }
if (!buf) { if (!buf) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER; goto _OVER;
} }
} }
@ -787,7 +869,11 @@ int32_t vnodeGetStreamProgress(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
code = tqGetStreamExecInfo(pVnode, req.streamId, &rsp.progressDelay, &rsp.fillHisFinished); code = tqGetStreamExecInfo(pVnode, req.streamId, &rsp.progressDelay, &rsp.fillHisFinished);
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
tSerializeStreamProgressRsp(buf, rspLen, &rsp); rspLen = tSerializeStreamProgressRsp(buf, rspLen, &rsp);
if (rspLen < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
rpcMsg.pCont = buf; rpcMsg.pCont = buf;
buf = NULL; buf = NULL;
rpcMsg.contLen = rspLen; rpcMsg.contLen = rspLen;