fix:converity scan defects

This commit is contained in:
wangmm0220 2023-03-17 14:54:27 +08:00
parent a5e6bac876
commit 0a54d00cd2
12 changed files with 80 additions and 33 deletions

View File

@ -192,7 +192,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->sqlLen = sqlLen; (*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql; (*pRequest)->validateOnly = validateSql;
SSyncQueryParam* newpParam; SSyncQueryParam* newpParam = NULL;
if (param == NULL) { if (param == NULL) {
newpParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); newpParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (newpParam == NULL) { if (newpParam == NULL) {

View File

@ -506,6 +506,9 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true); code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
} }
if(code != 0){
taosMemoryFree(pRes);
}
tFreeSShowVariablesRsp(&rsp); tFreeSShowVariablesRsp(&rsp);
} }

View File

@ -117,7 +117,7 @@ int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, u
if (unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)) { if (unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)) {
int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS]; int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
if (unit > INT64_MAX / tsInt64) { if (tsInt64 != 0 && unit > INT64_MAX / tsInt64) {
return -1; return -1;
} }
tsInt64 *= unit; tsInt64 *= unit;
@ -637,7 +637,10 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
for (int j = 0; j < taosArrayGetSize(cols); ++j) { for (int j = 0; j < taosArrayGetSize(cols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j); SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
ESchemaAction action = SCHEMA_ACTION_NULL; ESchemaAction action = SCHEMA_ACTION_NULL;
smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info); int code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
if(code != 0){
return code;
}
if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) { if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
SField field = {0}; SField field = {0};
field.type = kv->type; field.type = kv->type;
@ -646,6 +649,10 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
taosArrayPush(results, &field); taosArrayPush(results, &field);
} else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) { } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen); uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
if(index == NULL){
uError("smlBuildFieldsList get error, key:%s", kv->key);
return TSDB_CODE_SML_INVALID_DATA;
}
uint16_t newIndex = *index; uint16_t newIndex = *index;
if (isTag) newIndex -= numOfCols; if (isTag) newIndex -= numOfCols;
SField *field = (SField *)taosArrayGet(results, newIndex); SField *field = (SField *)taosArrayGet(results, newIndex);
@ -774,9 +781,16 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) { if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField)); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField)); SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true); code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false); if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBuildFieldsList tag1 failed. %s", info->id, pName.tname);
goto end;
}
code = smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBuildFieldsList col1 failed. %s", info->id, pName.tname);
goto end;
}
code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE); code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname); uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
@ -820,8 +834,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
taosArrayPush(pTags, &field); taosArrayPush(pTags, &field);
} }
} }
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags, code = smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
pTableMeta->tableInfo.numOfColumns, true); pTableMeta->tableInfo.numOfColumns, true);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBuildFieldsList tag2 failed. %s", info->id, pName.tname);
goto end;
}
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action); code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -868,8 +886,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
} }
} }
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns, code = smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
pTableMeta->tableInfo.numOfColumns, false); pTableMeta->tableInfo.numOfColumns, false);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBuildFieldsList col2 failed. %s", info->id, pName.tname);
goto end;
}
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action); code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1097,6 +1119,9 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
} }
if (taos != NULL) { if (taos != NULL) {
info->taos = acquireTscObj(*(int64_t *)taos); info->taos = acquireTscObj(*(int64_t *)taos);
if(info->taos == NULL){
goto cleanup;
}
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code); uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
@ -1151,13 +1176,16 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlLineInfo *elements = info->lines + i; SSmlLineInfo *elements = info->lines + i;
SSmlTableInfo *tinfo = NULL; SSmlTableInfo *tinfo = NULL;
if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen); SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
if(tmp) tinfo = *tmp;
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
elements->measureLen + elements->tagsLen); elements->measureLen + elements->tagsLen);
if(tmp) tinfo = *tmp;
} else { } else {
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
elements->measureLen + elements->tagsLen); elements->measureLen + elements->tagsLen);
if(tmp) tinfo = *tmp;
} }
if (tinfo == NULL) { if (tinfo == NULL) {

View File

@ -1237,10 +1237,12 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
if (cnt >= payloadNum) { if (cnt >= payloadNum) {
payloadNum = payloadNum << 1; payloadNum = payloadNum << 1;
void *tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo)); void *tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
if (tmp != NULL) { if (tmp == NULL) {
info->lines = (SSmlLineInfo *)tmp; ret = TSDB_CODE_OUT_OF_MEMORY;
memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo)); return ret;
} }
info->lines = (SSmlLineInfo *)tmp;
memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
} }
ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt); ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt);
if ((info->lines + cnt)->measure == NULL) break; if ((info->lines + cnt)->measure == NULL) break;

View File

@ -292,6 +292,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
info->currSTableMeta->uid = tinfo->uid; info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) { if (tinfo->tableDataCtx == NULL) {
smlDestroyTableInfo(info, tinfo);
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }

View File

@ -292,7 +292,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (info->dataFormat) { if (info->dataFormat && info->currSTableMeta != NULL) {
if (needConverTime) { if (needConverTime) {
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision); kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
} }

View File

@ -115,7 +115,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
if (pDb->cfg.numOfVgroups > 1) { if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
isShuffle = true; isShuffle = true;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;

View File

@ -134,7 +134,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
showObj.pMnode = pMnode; showObj.pMnode = pMnode;
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb)); showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
strncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN); tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000; int32_t keepTime = tsShellActivityTimer * 6 * 1000;
SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime); SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);

View File

@ -269,6 +269,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
} }
int32_t tqMetaRestoreHandle(STQ* pTq) { int32_t tqMetaRestoreHandle(STQ* pTq) {
int code = 0;
TBC* pCur = NULL; TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
return -1; return -1;
@ -290,7 +291,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.pRef = walOpenRef(pTq->pVnode->pWal); handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) { if (handle.pRef == NULL) {
return -1; code = -1;
goto end;
} }
walRefVer(handle.pRef, handle.snapshotVer); walRefVer(handle.pRef, handle.snapshotVer);
@ -307,16 +309,21 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL); qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL);
if (handle.execHandle.task == NULL) { if (handle.execHandle.task == NULL) {
tqError("cannot create exec task for %s", handle.subKey); tqError("cannot create exec task for %s", handle.subKey);
return -1; code = -1;
goto end;
} }
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.task, &scanner); qExtractStreamScanner(handle.execHandle.task, &scanner);
if (scanner == NULL) { if (scanner == NULL) {
tqError("cannot extract stream scanner for %s", handle.subKey); tqError("cannot extract stream scanner for %s", handle.subKey);
code = -1;
goto end;
} }
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
if (handle.execHandle.pExecReader == NULL) { if (handle.execHandle.pExecReader == NULL) {
tqError("cannot extract exec reader for %s", handle.subKey); tqError("cannot extract exec reader for %s", handle.subKey);
code = -1;
goto end;
} }
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
@ -347,8 +354,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
} }
end:
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur); tdbTbcClose(pCur);
return 0; return code;
} }

View File

@ -370,11 +370,6 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error:
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
return TSDB_CODE_OUT_OF_MEMORY;
} }
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
@ -441,9 +436,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
for (int32_t rowId = 0; rowId < rows; rowId++) { for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SVCreateTbReq* pCreateTbReq = &createTbReq; SVCreateTbReq* pCreateTbReq = &createTbReq;
if (!pCreateTbReq) {
goto _end;
}
// set const // set const
pCreateTbReq->flags = 0; pCreateTbReq->flags = 0;
@ -460,6 +452,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
if (size == 2) { if (size == 2) {
tagArray = taosArrayInit(1, sizeof(STagVal)); tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) { if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end; goto _end;
} }
STagVal tagVal = { STagVal tagVal = {
@ -477,6 +470,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} else { } else {
tagArray = taosArrayInit(size - 1, sizeof(STagVal)); tagArray = taosArrayInit(size - 1, sizeof(STagVal));
if (!tagArray) { if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end; goto _end;
} }
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
@ -503,6 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray); tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) { if (pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end; goto _end;
} }
@ -556,6 +551,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SVCreateTbReq* pCreateTbReq = NULL; SVCreateTbReq* pCreateTbReq = NULL;
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
taosMemoryFree(ctbName);
goto _end; goto _end;
}; };
@ -572,6 +568,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
// set tag content // set tag content
tagArray = taosArrayInit(1, sizeof(STagVal)); tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) { if (!tagArray) {
taosMemoryFree(ctbName);
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end; goto _end;
} }
STagVal tagVal = { STagVal tagVal = {
@ -586,6 +584,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray); tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) { if (pTag == NULL) {
taosMemoryFree(ctbName);
tdDestroySVCreateTbReq(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end; goto _end;
} }
@ -630,6 +630,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
// rows // rows
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(tbData.aRowP); taosArrayDestroy(tbData.aRowP);
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
goto _end; goto _end;
} }
@ -680,6 +681,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SSubmitReq2 submitReq = {0}; SSubmitReq2 submitReq = {0};
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
@ -693,6 +695,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
len += sizeof(SSubmitReq2Msg); len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len); pBuf = rpcMallocCont(len);
if (NULL == pBuf) { if (NULL == pBuf) {
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode); ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
@ -704,6 +707,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqError("failed to encode submit req since %s", terrstr()); tqError("failed to encode submit req since %s", terrstr());
tEncoderClear(&encoder); tEncoderClear(&encoder);
rpcFreeCont(pBuf); rpcFreeCont(pBuf);
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE);
continue; continue;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);

View File

@ -245,8 +245,7 @@ int smlProcess_json3_Test() {
taos_free_result(pRes); taos_free_result(pRes);
const char *sql[] = { const char *sql[] = {
"[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\"," "[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\",\"dc\":\"lga\"}}]"};
"\"dc\":\"lga\"}}]"};
char *sql1[1] = {0}; char *sql1[1] = {0};
for (int i = 0; i < 1; i++) { for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024); sql1[i] = taosMemoryCalloc(1, 1024);

View File

@ -858,7 +858,9 @@ void loop_consume(SThreadInfo* pInfo) {
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n", taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt); pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
taosFsyncFile(pInfo->pConsumeRowsFile); if(taosFsyncFile(pInfo->pConsumeRowsFile) < 0){
printf("taosFsyncFile error:%s", strerror(errno));
}
taosCloseFile(&pInfo->pConsumeRowsFile); taosCloseFile(&pInfo->pConsumeRowsFile);
} }