fix: fix coverity issues
This commit is contained in:
parent
1992925d8d
commit
0fe14f0b16
|
@ -498,6 +498,7 @@ enum {
|
||||||
#define MAX_NUM_STR_SIZE 40
|
#define MAX_NUM_STR_SIZE 40
|
||||||
|
|
||||||
#define MAX_META_MSG_IN_BATCH 1048576
|
#define MAX_META_MSG_IN_BATCH 1048576
|
||||||
|
#define MAX_META_BATCH_RSP_SIZE (1 * 1048576 * 1024)
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,7 +199,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
||||||
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
|
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
|
||||||
if (TSDB_CODE_SUCCESS !=
|
if (TSDB_CODE_SUCCESS !=
|
||||||
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
|
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
|
||||||
tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
|
tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self,
|
||||||
(*pRequest)->requestId, pTscObj->id, sql);
|
(*pRequest)->requestId, pTscObj->id, sql);
|
||||||
|
|
||||||
destroyRequest(*pRequest);
|
destroyRequest(*pRequest);
|
||||||
|
@ -955,7 +955,12 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
||||||
switch (pQuery->execMode) {
|
switch (pQuery->execMode) {
|
||||||
case QUERY_EXEC_MODE_LOCAL:
|
case QUERY_EXEC_MODE_LOCAL:
|
||||||
if (!pRequest->validateOnly) {
|
if (!pRequest->validateOnly) {
|
||||||
code = execLocalCmd(pRequest, pQuery);
|
if (NULL == pQuery->pRoot) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
code = terrno;
|
||||||
|
} else {
|
||||||
|
code = execLocalCmd(pRequest, pQuery);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case QUERY_EXEC_MODE_RPC:
|
case QUERY_EXEC_MODE_RPC:
|
||||||
|
@ -997,7 +1002,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
||||||
|
|
||||||
handleQueryExecRsp(pRequest);
|
handleQueryExecRsp(pRequest);
|
||||||
|
|
||||||
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
pRequest->code = terrno;
|
pRequest->code = terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2253,7 +2258,10 @@ void syncQueryFn(void* param, void* res, int32_t code) {
|
||||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
|
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
|
||||||
if (sql == NULL || NULL == fp) {
|
if (sql == NULL || NULL == fp) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
fp(param, NULL, terrno);
|
if (fp) {
|
||||||
|
fp(param, NULL, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -944,7 +944,6 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
if (pResultInfo->completed) {
|
if (pResultInfo->completed) {
|
||||||
// it is a local executed query, no need to do async fetch
|
// it is a local executed query, no need to do async fetch
|
||||||
if (QUERY_EXEC_MODE_LOCAL == pRequest->body.execMode) {
|
if (QUERY_EXEC_MODE_LOCAL == pRequest->body.execMode) {
|
||||||
ASSERT(pResultInfo->numOfRows >= 0);
|
|
||||||
if (pResultInfo->localResultFetched) {
|
if (pResultInfo->localResultFetched) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
pResultInfo->current = 0;
|
pResultInfo->current = 0;
|
||||||
|
|
|
@ -288,8 +288,10 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
|
tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
|
||||||
|
|
||||||
struct SCatalog* pCatalog = NULL;
|
struct SCatalog* pCatalog = NULL;
|
||||||
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
|
@ -393,6 +395,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
|
||||||
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||||
*pRsp = taosMemoryCalloc(1, rspSize);
|
*pRsp = taosMemoryCalloc(1, rspSize);
|
||||||
if (NULL == *pRsp) {
|
if (NULL == *pRsp) {
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -152,7 +152,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
|
||||||
pStmt->bInfo.tbType = pTableMeta->tableType;
|
pStmt->bInfo.tbType = pTableMeta->tableType;
|
||||||
pStmt->bInfo.boundTags = tags;
|
pStmt->bInfo.boundTags = tags;
|
||||||
pStmt->bInfo.tagsCached = false;
|
pStmt->bInfo.tagsCached = false;
|
||||||
strcpy(pStmt->bInfo.stbFName, sTableName);
|
tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,14 +90,39 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < msgNum; ++i) {
|
for (int32_t i = 0; i < msgNum; ++i) {
|
||||||
|
if (offset >= pMsg->contLen) {
|
||||||
|
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
|
||||||
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
taosArrayDestroy(batchRsp);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
req.msgIdx = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
||||||
offset += sizeof(req.msgIdx);
|
offset += sizeof(req.msgIdx);
|
||||||
|
if (offset >= pMsg->contLen) {
|
||||||
|
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
|
||||||
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
taosArrayDestroy(batchRsp);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
req.msgType = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
||||||
offset += sizeof(req.msgType);
|
offset += sizeof(req.msgType);
|
||||||
|
if (offset >= pMsg->contLen) {
|
||||||
|
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
|
||||||
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
taosArrayDestroy(batchRsp);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
req.msgLen = ntohl(*(int32_t *)((char *)pMsg->pCont + offset));
|
||||||
offset += sizeof(req.msgLen);
|
offset += sizeof(req.msgLen);
|
||||||
|
if (offset >= pMsg->contLen) {
|
||||||
|
mError("offset %d is bigger than contLen %d", offset, pMsg->contLen);
|
||||||
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
taosArrayDestroy(batchRsp);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
req.msg = (char *)pMsg->pCont + offset;
|
req.msg = (char *)pMsg->pCont + offset;
|
||||||
offset += req.msgLen;
|
offset += req.msgLen;
|
||||||
|
|
|
@ -2553,12 +2553,17 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
||||||
|
|
||||||
char rollup[160 + VARSTR_HEADER_SIZE] = {0};
|
char rollup[160 + VARSTR_HEADER_SIZE] = {0};
|
||||||
int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
|
int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
|
||||||
|
char *sep = ", ";
|
||||||
|
int32_t sepLen = strlen(sep);
|
||||||
|
int32_t rollupLen = sizeof(rollup) - 2;
|
||||||
for (int32_t i = 0; i < rollupNum; ++i) {
|
for (int32_t i = 0; i < rollupNum; ++i) {
|
||||||
char *funcName = taosArrayGet(pStb->pFuncs, i);
|
char *funcName = taosArrayGet(pStb->pFuncs, i);
|
||||||
if (i) {
|
if (i) {
|
||||||
strcat(varDataVal(rollup), ", ");
|
strncat(varDataVal(rollup), sep, rollupLen);
|
||||||
|
rollupLen -= sepLen;
|
||||||
}
|
}
|
||||||
strcat(varDataVal(rollup), funcName);
|
strncat(varDataVal(rollup), funcName, rollupLen);
|
||||||
|
rollupLen -= strlen(funcName);
|
||||||
}
|
}
|
||||||
varDataSetLen(rollup, strlen(varDataVal(rollup)));
|
varDataSetLen(rollup, strlen(varDataVal(rollup)));
|
||||||
|
|
||||||
|
|
|
@ -330,6 +330,11 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
rspSize += sizeof(int32_t);
|
rspSize += sizeof(int32_t);
|
||||||
offset = 0;
|
offset = 0;
|
||||||
|
|
||||||
|
if (rspSize > MAX_META_BATCH_RSP_SIZE) {
|
||||||
|
code = TSDB_CODE_INVALID_MSG_LEN;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
pRsp = rpcMallocCont(rspSize);
|
pRsp = rpcMallocCont(rspSize);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -302,9 +302,11 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
taosMemoryFreeClear(output->tbMeta);
|
if (output) {
|
||||||
taosMemoryFreeClear(output);
|
taosMemoryFreeClear(output->tbMeta);
|
||||||
|
taosMemoryFreeClear(output);
|
||||||
|
}
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -252,7 +252,7 @@ int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||||
|
|
||||||
SCtgIndexCtx* ctx = task.taskCtx;
|
SCtgIndexCtx* ctx = task.taskCtx;
|
||||||
|
|
||||||
strcpy(ctx->indexFName, name);
|
tstrncpy(ctx->indexFName, name, sizeof(ctx->indexFName));
|
||||||
|
|
||||||
taosArrayPush(pJob->pTasks, &task);
|
taosArrayPush(pJob->pTasks, &task);
|
||||||
|
|
||||||
|
@ -277,7 +277,7 @@ int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||||
|
|
||||||
SCtgUdfCtx* ctx = task.taskCtx;
|
SCtgUdfCtx* ctx = task.taskCtx;
|
||||||
|
|
||||||
strcpy(ctx->udfName, name);
|
tstrncpy(ctx->udfName, name, sizeof(ctx->udfName));
|
||||||
|
|
||||||
taosArrayPush(pJob->pTasks, &task);
|
taosArrayPush(pJob->pTasks, &task);
|
||||||
|
|
||||||
|
|
|
@ -660,7 +660,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->pCtg = pCtg;
|
msg->pCtg = pCtg;
|
||||||
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||||
msg->dbId = dbId;
|
msg->dbId = dbId;
|
||||||
|
|
||||||
op->data = msg;
|
op->data = msg;
|
||||||
|
@ -693,7 +693,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->pCtg = pCtg;
|
msg->pCtg = pCtg;
|
||||||
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||||
|
|
||||||
op->data = msg;
|
op->data = msg;
|
||||||
|
|
||||||
|
@ -721,8 +721,8 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->pCtg = pCtg;
|
msg->pCtg = pCtg;
|
||||||
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||||
strncpy(msg->stbName, stbName, sizeof(msg->stbName));
|
tstrncpy(msg->stbName, stbName, sizeof(msg->stbName));
|
||||||
msg->dbId = dbId;
|
msg->dbId = dbId;
|
||||||
msg->suid = suid;
|
msg->suid = suid;
|
||||||
|
|
||||||
|
@ -751,8 +751,8 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->pCtg = pCtg;
|
msg->pCtg = pCtg;
|
||||||
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||||
strncpy(msg->tbName, tbName, sizeof(msg->tbName));
|
tstrncpy(msg->tbName, tbName, sizeof(msg->tbName));
|
||||||
msg->dbId = dbId;
|
msg->dbId = dbId;
|
||||||
|
|
||||||
op->data = msg;
|
op->data = msg;
|
||||||
|
@ -785,7 +785,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
|
||||||
dbFName = p + 1;
|
dbFName = p + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||||
msg->pCtg = pCtg;
|
msg->pCtg = pCtg;
|
||||||
msg->dbId = dbId;
|
msg->dbId = dbId;
|
||||||
msg->dbInfo = dbInfo;
|
msg->dbInfo = dbInfo;
|
||||||
|
@ -817,7 +817,8 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy
|
||||||
|
|
||||||
char *p = strchr(output->dbFName, '.');
|
char *p = strchr(output->dbFName, '.');
|
||||||
if (p && IS_SYS_DBNAME(p + 1)) {
|
if (p && IS_SYS_DBNAME(p + 1)) {
|
||||||
memmove(output->dbFName, p + 1, strlen(p + 1));
|
int32_t len = strlen(p + 1);
|
||||||
|
memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->pCtg = pCtg;
|
msg->pCtg = pCtg;
|
||||||
|
@ -852,7 +853,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
|
||||||
}
|
}
|
||||||
|
|
||||||
msg->pCtg = pCtg;
|
msg->pCtg = pCtg;
|
||||||
strcpy(msg->dbFName, dbFName);
|
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||||
msg->vgId = vgId;
|
msg->vgId = vgId;
|
||||||
msg->epSet = *pEpSet;
|
msg->epSet = *pEpSet;
|
||||||
|
|
||||||
|
@ -1215,7 +1216,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
|
||||||
CTG_CACHE_STAT_INC(numOfDb, 1);
|
CTG_CACHE_STAT_INC(numOfDb, 1);
|
||||||
|
|
||||||
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
|
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
|
||||||
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
||||||
|
|
||||||
ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
|
ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
|
||||||
|
|
||||||
|
@ -1331,8 +1332,8 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uin
|
||||||
metaRent.smaVer = pCache->pIndex->version;
|
metaRent.smaVer = pCache->pIndex->version;
|
||||||
}
|
}
|
||||||
|
|
||||||
strcpy(metaRent.dbFName, dbFName);
|
tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
|
||||||
strcpy(metaRent.stbName, tbName);
|
tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
|
||||||
|
|
||||||
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
|
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
|
||||||
ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
|
ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
|
||||||
|
@ -1416,8 +1417,10 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
|
||||||
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
|
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
|
||||||
meta->tableType);
|
meta->tableType);
|
||||||
|
|
||||||
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
|
if (pCache) {
|
||||||
|
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1584,7 +1587,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
||||||
|
|
||||||
dbCache = NULL;
|
dbCache = NULL;
|
||||||
|
|
||||||
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
|
||||||
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
|
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
|
||||||
ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
|
ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
|
||||||
|
|
||||||
|
@ -1674,9 +1677,9 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
|
||||||
|
|
||||||
if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
|
if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
|
||||||
int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
|
int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
|
||||||
CTG_ERR_JRET(
|
code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize);
|
||||||
ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize));
|
|
||||||
pMeta->tbMeta = NULL;
|
pMeta->tbMeta = NULL;
|
||||||
|
CTG_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
|
if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
|
||||||
|
@ -1691,10 +1694,8 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
if (pMeta) {
|
taosMemoryFreeClear(pMeta->tbMeta);
|
||||||
taosMemoryFreeClear(pMeta->tbMeta);
|
taosMemoryFreeClear(pMeta);
|
||||||
taosMemoryFreeClear(pMeta);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFreeClear(msg);
|
taosMemoryFreeClear(msg);
|
||||||
|
|
||||||
|
|
|
@ -317,7 +317,12 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
|
||||||
SArray* pTagVals = NULL;
|
SArray* pTagVals = NULL;
|
||||||
STag* pTag = (STag*)pCfg->pTags;
|
STag* pTag = (STag*)pCfg->pTags;
|
||||||
|
|
||||||
if (pCfg->pTags && tTagIsJson(pTag)) {
|
if (NULL == pCfg->pTags || pCfg->numOfTags <= 0) {
|
||||||
|
qError("tag missed in table cfg, pointer:%p, numOfTags:%d", pCfg->pTags, pCfg->numOfTags);
|
||||||
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tTagIsJson(pTag)) {
|
||||||
char* pJson = parseTagDatatoJson(pTag);
|
char* pJson = parseTagDatatoJson(pTag);
|
||||||
if (pJson) {
|
if (pJson) {
|
||||||
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);
|
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);
|
||||||
|
|
|
@ -143,9 +143,15 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
|
||||||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
|
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
|
||||||
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
|
if (NULL == pBuf) {
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!allocBuf(pDispatcher, pInput, pBuf)) {
|
||||||
|
taosFreeQitem(pBuf);
|
||||||
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
toDataCacheEntry(pDispatcher, pInput, pBuf);
|
toDataCacheEntry(pDispatcher, pInput, pBuf);
|
||||||
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
|
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
|
||||||
*pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
|
*pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
|
||||||
|
|
|
@ -323,7 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
||||||
int32_t code =
|
int32_t code =
|
||||||
tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
|
tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
|
||||||
if (code) {
|
if (code) {
|
||||||
destroyDataSinker((SDataSinkHandle*)pInserterNode);
|
destroyDataSinker((SDataSinkHandle*)inserter);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -357,8 +357,7 @@ char* parseTagDatatoJson(void* p) {
|
||||||
for (int j = 0; j < nCols; ++j) {
|
for (int j = 0; j < nCols; ++j) {
|
||||||
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
|
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
|
||||||
// json key encode by binary
|
// json key encode by binary
|
||||||
memset(tagJsonKey, 0, sizeof(tagJsonKey));
|
tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
|
||||||
memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
|
|
||||||
// json value
|
// json value
|
||||||
char type = pTagVal->type;
|
char type = pTagVal->type;
|
||||||
if (type == TSDB_DATA_TYPE_NULL) {
|
if (type == TSDB_DATA_TYPE_NULL) {
|
||||||
|
|
|
@ -173,7 +173,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
SDbCfgReq dbCfgReq = {0};
|
SDbCfgReq dbCfgReq = {0};
|
||||||
strcpy(dbCfgReq.db, input);
|
strncpy(dbCfgReq.db, input, sizeof(dbCfgReq.db) - 1);
|
||||||
|
|
||||||
int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq);
|
int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq);
|
||||||
void *pBuf = (*mallcFp)(bufLen);
|
void *pBuf = (*mallcFp)(bufLen);
|
||||||
|
@ -191,7 +191,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
SUserIndexReq indexReq = {0};
|
SUserIndexReq indexReq = {0};
|
||||||
strcpy(indexReq.indexFName, input);
|
strncpy(indexReq.indexFName, input, sizeof(indexReq.indexFName) - 1);
|
||||||
|
|
||||||
int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq);
|
int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq);
|
||||||
void *pBuf = (*mallcFp)(bufLen);
|
void *pBuf = (*mallcFp)(bufLen);
|
||||||
|
@ -233,7 +233,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
|
||||||
}
|
}
|
||||||
|
|
||||||
SGetUserAuthReq req = {0};
|
SGetUserAuthReq req = {0};
|
||||||
strncpy(req.user, input, sizeof(req.user));
|
strncpy(req.user, input, sizeof(req.user) - 1);
|
||||||
|
|
||||||
int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req);
|
int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req);
|
||||||
void *pBuf = (*mallcFp)(bufLen);
|
void *pBuf = (*mallcFp)(bufLen);
|
||||||
|
@ -251,7 +251,7 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_
|
||||||
}
|
}
|
||||||
|
|
||||||
STableIndexReq indexReq = {0};
|
STableIndexReq indexReq = {0};
|
||||||
strcpy(indexReq.tbFName, input);
|
strncpy(indexReq.tbFName, input, sizeof(indexReq.tbFName) - 1);
|
||||||
|
|
||||||
int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq);
|
int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq);
|
||||||
void *pBuf = (*mallcFp)(bufLen);
|
void *pBuf = (*mallcFp)(bufLen);
|
||||||
|
@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
|
||||||
SBuildTableInput *pInput = input;
|
SBuildTableInput *pInput = input;
|
||||||
STableCfgReq cfgReq = {0};
|
STableCfgReq cfgReq = {0};
|
||||||
cfgReq.header.vgId = pInput->vgId;
|
cfgReq.header.vgId = pInput->vgId;
|
||||||
strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName));
|
strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName) - 1);
|
||||||
strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName));
|
strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName) - 1);
|
||||||
|
|
||||||
int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
|
int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
|
||||||
void *pBuf = (*mallcFp)(bufLen);
|
void *pBuf = (*mallcFp)(bufLen);
|
||||||
|
|
|
@ -412,7 +412,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
|
||||||
while (true) {
|
while (true) {
|
||||||
paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
|
paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
|
||||||
if (paramIdx == tListLen(gQwMgmt.param)) {
|
if (paramIdx == tListLen(gQwMgmt.param)) {
|
||||||
newParamIdx = 0;
|
newParamIdx = 1;
|
||||||
} else {
|
} else {
|
||||||
newParamIdx = paramIdx + 1;
|
newParamIdx = paramIdx + 1;
|
||||||
}
|
}
|
||||||
|
@ -422,6 +422,10 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (paramIdx == tListLen(gQwMgmt.param)) {
|
||||||
|
paramIdx = 0;
|
||||||
|
}
|
||||||
|
|
||||||
gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
|
gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
|
||||||
gQwMgmt.param[paramIdx].refId = refId;
|
gQwMgmt.param[paramIdx].refId = refId;
|
||||||
|
|
||||||
|
|
|
@ -398,7 +398,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
||||||
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||||
QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
|
QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase));
|
||||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
|
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||||
|
|
|
@ -430,7 +430,8 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||||
if (NULL == pData->pEpSet) {
|
if (NULL == pData->pEpSet) {
|
||||||
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
|
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
|
||||||
SCH_ERR_JRET(rspCode);
|
code = rspCode;
|
||||||
|
goto _return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue