fix: fix coverity check issues
This commit is contained in:
parent
77cafebd63
commit
cf9abe7258
|
@ -239,6 +239,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_MAX_TAGS 128
|
||||
#define TSDB_MAX_TAG_CONDITIONS 1024
|
||||
|
||||
#define TSDB_MAX_COL_TAG_NUM (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)
|
||||
#define TSDB_MAX_JSON_TAG_LEN 16384
|
||||
#define TSDB_MAX_JSON_KEY_LEN 256
|
||||
|
||||
|
@ -496,6 +497,8 @@ enum {
|
|||
|
||||
#define MAX_NUM_STR_SIZE 40
|
||||
|
||||
#define MAX_META_MSG_IN_BATCH 1048576
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -173,7 +173,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
|
||||
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
|
||||
pTscObj->connId = pRsp->query->connId;
|
||||
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
|
||||
tscTrace("conn %d hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
|
||||
pTscObj->pAppInfo->totalDnodes);
|
||||
|
||||
if (pRsp->query->killRid) {
|
||||
|
@ -440,6 +440,7 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
|
|||
}
|
||||
|
||||
if (userNum <= 0) {
|
||||
taosMemoryFree(users);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -476,6 +477,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
|||
}
|
||||
|
||||
if (dbNum <= 0) {
|
||||
taosMemoryFree(dbs);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -514,6 +516,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
|
|||
}
|
||||
|
||||
if (stbNum <= 0) {
|
||||
taosMemoryFree(stbs);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define MAX_META_MSG_IN_BATCH 1048576
|
||||
|
||||
int32_t mndInitQuery(SMnode *pMnode);
|
||||
void mndCleanupQuery(SMnode *pMnode);
|
||||
|
||||
|
|
|
@ -369,7 +369,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
|
|||
taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
|
||||
}
|
||||
|
||||
void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
|
||||
SAppObj *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
|
||||
SAppObj *pApp = NULL;
|
||||
bool hasNext = taosCacheIterNext(pIter);
|
||||
if (hasNext) {
|
||||
|
|
|
@ -77,6 +77,12 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
|||
void *pRsp = NULL;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
|
||||
if (msgNum >= MAX_META_MSG_IN_BATCH) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
mError("too many msgs %d in mnode batch meta req", msgNum);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
||||
if (NULL == batchRsp) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -106,6 +112,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
|||
if (fp == NULL) {
|
||||
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
taosArrayDestroy(batchRsp);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -272,6 +272,12 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
SRpcMsg rspMsg = {0};
|
||||
void *pRsp = NULL;
|
||||
|
||||
if (msgNum >= MAX_META_MSG_IN_BATCH) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
qError("too many msgs %d in vnode batch meta req", msgNum);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
||||
if (NULL == batchRsp) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define CTG_MAX_REQ_IN_BATCH 1048576
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -173,15 +173,19 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx*
|
|||
CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncReq));
|
||||
code = ctgUpdateTbMetaEnqueue(pCtg, output, syncReq);
|
||||
output = NULL;
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(output->tbMeta);
|
||||
taosMemoryFreeClear(output);
|
||||
|
||||
if (output) {
|
||||
taosMemoryFreeClear(output->tbMeta);
|
||||
taosMemoryFreeClear(output);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -290,7 +294,9 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
|
|||
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta));
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncOp));
|
||||
code = ctgUpdateTbMetaEnqueue(pCtg, output, syncOp);
|
||||
output = NULL;
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
|||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
|
||||
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
|
||||
ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
|||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
|
||||
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
|
||||
ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1056,7 +1056,6 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
@ -1223,7 +1222,6 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
@ -1309,7 +1307,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
@ -1346,7 +1343,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
@ -1382,7 +1378,6 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
||||
|
@ -1672,7 +1667,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
|
|||
int32_t baseResIdx = 0;
|
||||
for (int32_t i = 0; i < dbNum; ++i) {
|
||||
STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
|
||||
ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, taosArrayGetSize(pReq->pTables));
|
||||
ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, (int32_t)taosArrayGetSize(pReq->pTables));
|
||||
CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
|
||||
baseResIdx += taosArrayGetSize(pReq->pTables);
|
||||
}
|
||||
|
@ -2010,10 +2005,6 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) {
|
|||
|
||||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
|
|
@ -597,6 +597,8 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
|
|||
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
|
||||
if (NULL == node) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
|
||||
taosMemoryFree(operation->data);
|
||||
taosMemoryFree(operation);
|
||||
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -648,6 +650,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
|
|||
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -668,7 +671,6 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(op->data);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -681,6 +683,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
|
|||
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -700,7 +703,6 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(op->data);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -714,6 +716,7 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
|||
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -731,7 +734,6 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(op->data);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -744,6 +746,7 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
|||
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -760,7 +763,6 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(op->data);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -773,6 +775,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
|
|||
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
|
||||
taosMemoryFree(op);
|
||||
ctgFreeVgInfo(dbInfo);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -796,8 +799,6 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
|
|||
_return:
|
||||
|
||||
ctgFreeVgInfo(dbInfo);
|
||||
taosMemoryFreeClear(op->data);
|
||||
taosMemoryFreeClear(op);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -810,6 +811,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy
|
|||
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -834,8 +836,6 @@ _return:
|
|||
taosMemoryFree(output);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -847,6 +847,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
|
|||
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -863,8 +864,6 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -877,6 +876,7 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp
|
|||
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -892,7 +892,6 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp
|
|||
_return:
|
||||
|
||||
tFreeSGetUserAuthRsp(pAuth);
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
@ -906,6 +905,7 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncO
|
|||
SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -923,7 +923,6 @@ _return:
|
|||
|
||||
taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
|
||||
taosMemoryFreeClear(*pIndex);
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
@ -937,6 +936,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
|
|||
SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -952,8 +952,6 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -968,6 +966,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
|
|||
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -981,8 +980,6 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -1483,7 +1480,9 @@ int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool sync
|
|||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
|
||||
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq));
|
||||
code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
|
||||
pOutput = NULL;
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "tname.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
#include "ctgRemote.h"
|
||||
|
||||
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
@ -579,6 +580,11 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
|
|||
int32_t num = taosArrayGetSize(pBatch->pMsgs);
|
||||
SBatchReq* pBatchReq = (SBatchReq*)(*msg);
|
||||
|
||||
if (num >= CTG_MAX_REQ_IN_BATCH) {
|
||||
qError("too many msgs %d in one batch request", num);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pBatchReq->header.vgId = htonl(vgId);
|
||||
pBatchReq->msgNum = htonl(num);
|
||||
offset += sizeof(SBatchReq);
|
||||
|
|
|
@ -89,8 +89,8 @@ extern "C" {
|
|||
#define EXPLAIN_STRING_TYPE_FORMAT "%s"
|
||||
#define EXPLAIN_INPUT_ORDER_FORMAT "input_order=%s"
|
||||
#define EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT "output_order=%s"
|
||||
#define EXPLAIN_OFFSET_FORMAT "offset=%d"
|
||||
#define EXPLAIN_SOFFSET_FORMAT "soffset=%d"
|
||||
#define EXPLAIN_OFFSET_FORMAT "offset=%" PRId64
|
||||
#define EXPLAIN_SOFFSET_FORMAT "soffset=%" PRId64
|
||||
#define EXPLAIN_PARTITIONS_FORMAT "partitions=%d"
|
||||
|
||||
#define COMMAND_RESET_LOG "resetLog"
|
||||
|
|
|
@ -485,6 +485,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs
|
|||
SSDataBlock* pBlock = buildCreateTbResultDataBlock();
|
||||
int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->pDbCfg, pStmt->tableName, pStmt->pTableCfg);
|
||||
if (code) {
|
||||
blockDataDestroy(pBlock);
|
||||
return code;
|
||||
}
|
||||
return buildRetrieveTableRsp(pBlock, SHOW_CREATE_TB_RESULT_COLS, pRsp);
|
||||
|
|
|
@ -1598,6 +1598,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == rsp) {
|
||||
qError("malloc SRetrieveTableRsp failed, size:%d", rspSize);
|
||||
blockDataDestroy(pBlock);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
|
|
@ -267,6 +267,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
|
|||
taosThreadMutexInit(&deleter->mutex, NULL);
|
||||
if (NULL == deleter->pDataBlocks) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
destroyDataSinker((SDataSinkHandle*)deleter);
|
||||
taosMemoryFree(deleter);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
*pHandle = deleter;
|
||||
|
|
|
@ -323,6 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
|||
int32_t code =
|
||||
tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
|
||||
if (code) {
|
||||
destroyDataSinker((SDataSinkHandle*)pInserterNode);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -424,6 +424,12 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if ((pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) > TSDB_MAX_COL_TAG_NUM) {
|
||||
*pDst = NULL;
|
||||
qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
|
||||
*pDst = taosMemoryMalloc(metaSize);
|
||||
if (NULL == *pDst) {
|
||||
|
|
|
@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
SBuildTableInput *pInput = input;
|
||||
STableCfgReq cfgReq = {0};
|
||||
cfgReq.header.vgId = pInput->vgId;
|
||||
strcpy(cfgReq.dbFName, pInput->dbFName);
|
||||
strcpy(cfgReq.tbName, pInput->tbName);
|
||||
strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName));
|
||||
strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName));
|
||||
|
||||
int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
|
@ -615,6 +615,8 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
|||
STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp));
|
||||
if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) {
|
||||
qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize);
|
||||
tFreeSTableCfgRsp(out);
|
||||
taosMemoryFree(out);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
|
|
|
@ -258,7 +258,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
|||
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
|
||||
|
||||
if (len < 0) {
|
||||
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
|
||||
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
@ -292,7 +292,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
|||
}
|
||||
|
||||
// Got data from sink
|
||||
QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
|
||||
QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64, len);
|
||||
|
||||
*dataLen += len;
|
||||
|
||||
|
@ -408,7 +408,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||
break;
|
||||
}
|
||||
|
||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
|
||||
|
|
|
@ -122,7 +122,6 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
|
|||
break;
|
||||
case JOB_TASK_STATUS_DROP:
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
|
||||
break;
|
||||
|
||||
default:
|
||||
SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
|
||||
|
|
|
@ -169,7 +169,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
|
||||
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||
if (NULL == pJob->execRes.res) {
|
||||
pJob->execRes.res = taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
|
||||
pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
|
||||
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
|
||||
}
|
||||
|
||||
|
@ -386,7 +386,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
// NEVER REACH HERE
|
||||
SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_LINK_BROKEN:
|
||||
SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
|
||||
|
@ -981,7 +980,7 @@ _return:
|
|||
}
|
||||
|
||||
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
|
||||
uint32_t msgSize = 0;
|
||||
int32_t msgSize = 0;
|
||||
void *msg = NULL;
|
||||
int32_t code = 0;
|
||||
bool isCandidateAddr = false;
|
||||
|
@ -1133,12 +1132,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
default:
|
||||
SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
break;
|
||||
}
|
||||
|
||||
#if 1
|
||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
msg = NULL;
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
|
|
|
@ -904,7 +904,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
} else if (tsQueryPlannerTrace) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
qSubPlanToString(plan, &msg, &msgLen);
|
||||
SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
|
||||
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
|
||||
taosMemoryFree(msg);
|
||||
}
|
||||
|
@ -926,6 +926,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
SArray *explainRes = NULL;
|
||||
int32_t code = 0;
|
||||
SQWMsg qwMsg = {0};
|
||||
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
|
||||
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
|
||||
|
@ -938,14 +939,21 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||
}
|
||||
|
||||
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
pTask->execId, &qwMsg, explainRes));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_RET(schHandleExplainRes(explainRes));
|
||||
explainRes = NULL;
|
||||
}
|
||||
|
||||
SCH_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
|
||||
_return:
|
||||
|
||||
taosArrayDestroy(explainRes);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schLaunchTaskImpl(void *param) {
|
||||
|
@ -1097,22 +1105,28 @@ int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
|
|||
|
||||
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
|
||||
void *pRsp = NULL;
|
||||
int32_t code = 0;
|
||||
SArray *explainRes = NULL;
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||
}
|
||||
|
||||
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
pTask->execId, &pRsp, explainRes));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_RET(schHandleExplainRes(explainRes));
|
||||
explainRes = NULL;
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_return:
|
||||
|
||||
taosArrayDestroy(explainRes);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
// Note: no more error processing, handled in function internal
|
||||
|
|
|
@ -242,7 +242,7 @@ uint64_t schGenUUID(void) {
|
|||
|
||||
if (hashId == 0) {
|
||||
char uid[64] = {0};
|
||||
int32_t code = taosGetSystemUUID(uid, tListLen(uid));
|
||||
int32_t code = taosGetSystemUUID(uid, tListLen(uid) - 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue