fix:error by coverity scan
This commit is contained in:
parent
8a925342cc
commit
91d5df423f
|
@ -192,7 +192,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
||||||
(*pRequest)->sqlLen = sqlLen;
|
(*pRequest)->sqlLen = sqlLen;
|
||||||
(*pRequest)->validateOnly = validateSql;
|
(*pRequest)->validateOnly = validateSql;
|
||||||
|
|
||||||
SSyncQueryParam* newpParam;
|
SSyncQueryParam* newpParam = NULL;
|
||||||
if (param == NULL) {
|
if (param == NULL) {
|
||||||
newpParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
|
newpParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
|
||||||
if (newpParam == NULL) {
|
if (newpParam == NULL) {
|
||||||
|
@ -1545,7 +1545,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->code =
|
pRequest->code =
|
||||||
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
|
setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -1016,7 +1016,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->code =
|
pRequest->code =
|
||||||
setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true);
|
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
|
|
|
@ -480,6 +480,9 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
|
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(code != 0){
|
||||||
|
taosMemoryFree(pRes);
|
||||||
|
}
|
||||||
tFreeSShowVariablesRsp(&rsp);
|
tFreeSShowVariablesRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1154,13 +1154,16 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
SSmlLineInfo *elements = info->lines + i;
|
SSmlLineInfo *elements = info->lines + i;
|
||||||
SSmlTableInfo *tinfo = NULL;
|
SSmlTableInfo *tinfo = NULL;
|
||||||
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
||||||
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
|
SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
|
||||||
|
if(tmp) tinfo = *tmp;
|
||||||
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
||||||
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
|
SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
|
||||||
elements->measureLen + elements->tagsLen);
|
elements->measureLen + elements->tagsLen);
|
||||||
|
if(tmp) tinfo = *tmp;
|
||||||
} else {
|
} else {
|
||||||
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
|
SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
|
||||||
elements->measureLen + elements->tagsLen);
|
elements->measureLen + elements->tagsLen);
|
||||||
|
if(tmp) tinfo = *tmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tinfo == NULL) {
|
if (tinfo == NULL) {
|
||||||
|
|
|
@ -1279,10 +1279,11 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
|
||||||
if (cnt >= payloadNum) {
|
if (cnt >= payloadNum) {
|
||||||
payloadNum = payloadNum << 1;
|
payloadNum = payloadNum << 1;
|
||||||
void *tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
|
void *tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
|
||||||
if (tmp != NULL) {
|
if (tmp == NULL) {
|
||||||
info->lines = (SSmlLineInfo *)tmp;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
|
|
||||||
}
|
}
|
||||||
|
info->lines = (SSmlLineInfo *)tmp;
|
||||||
|
memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
|
||||||
}
|
}
|
||||||
ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt);
|
ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt);
|
||||||
if ((info->lines + cnt)->measure == NULL) break;
|
if ((info->lines + cnt)->measure == NULL) break;
|
||||||
|
|
|
@ -674,7 +674,7 @@ SUBSCRIBE_OVER:
|
||||||
taosMemoryFree(pConsumerNew);
|
taosMemoryFree(pConsumerNew);
|
||||||
}
|
}
|
||||||
// TODO: replace with destroy subscribe msg
|
// TODO: replace with destroy subscribe msg
|
||||||
if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -115,7 +115,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
|
||||||
|
|
||||||
if (pStream->fixedSinkVgId == 0) {
|
if (pStream->fixedSinkVgId == 0) {
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
|
||||||
if (pDb->cfg.numOfVgroups > 1) {
|
if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
|
||||||
isShuffle = true;
|
isShuffle = true;
|
||||||
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
|
||||||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
|
|
|
@ -134,7 +134,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
|
||||||
showObj.pMnode = pMnode;
|
showObj.pMnode = pMnode;
|
||||||
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
|
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
|
||||||
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
|
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
|
||||||
strncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
|
tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
|
||||||
|
|
||||||
int32_t keepTime = tsShellActivityTimer * 6 * 1000;
|
int32_t keepTime = tsShellActivityTimer * 6 * 1000;
|
||||||
SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
|
SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
|
||||||
|
|
|
@ -1335,12 +1335,8 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ref = atomic_sub_fetch_32(pRef, 1);
|
blockDataDestroy(pDelBlock);
|
||||||
/*A(ref >= 0);*/
|
taosMemoryFree(pRef);
|
||||||
if (ref == 0) {
|
|
||||||
blockDataDestroy(pDelBlock);
|
|
||||||
taosMemoryFree(pRef);
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||||
|
|
|
@ -269,6 +269,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
|
int code = 0;
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -290,7 +291,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
|
|
||||||
handle.pRef = walOpenRef(pTq->pVnode->pWal);
|
handle.pRef = walOpenRef(pTq->pVnode->pWal);
|
||||||
if (handle.pRef == NULL) {
|
if (handle.pRef == NULL) {
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
walRefVer(handle.pRef, handle.snapshotVer);
|
walRefVer(handle.pRef, handle.snapshotVer);
|
||||||
|
|
||||||
|
@ -307,16 +309,21 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL);
|
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL);
|
||||||
if (handle.execHandle.task == NULL) {
|
if (handle.execHandle.task == NULL) {
|
||||||
tqError("cannot create exec task for %s", handle.subKey);
|
tqError("cannot create exec task for %s", handle.subKey);
|
||||||
return -1;
|
code = -1;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
void* scanner = NULL;
|
void* scanner = NULL;
|
||||||
qExtractStreamScanner(handle.execHandle.task, &scanner);
|
qExtractStreamScanner(handle.execHandle.task, &scanner);
|
||||||
if (scanner == NULL) {
|
if (scanner == NULL) {
|
||||||
tqError("cannot extract stream scanner for %s", handle.subKey);
|
tqError("cannot extract stream scanner for %s", handle.subKey);
|
||||||
|
code = -1;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
|
||||||
if (handle.execHandle.pExecReader == NULL) {
|
if (handle.execHandle.pExecReader == NULL) {
|
||||||
tqError("cannot extract exec reader for %s", handle.subKey);
|
tqError("cannot extract exec reader for %s", handle.subKey);
|
||||||
|
code = -1;
|
||||||
|
goto end;
|
||||||
}
|
}
|
||||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
|
@ -347,8 +354,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -370,11 +370,6 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_error:
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
tqError("failed to encode submit req since %s", terrstr());
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
||||||
|
@ -441,9 +436,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
for (int32_t rowId = 0; rowId < rows; rowId++) {
|
for (int32_t rowId = 0; rowId < rows; rowId++) {
|
||||||
SVCreateTbReq createTbReq = {0};
|
SVCreateTbReq createTbReq = {0};
|
||||||
SVCreateTbReq* pCreateTbReq = &createTbReq;
|
SVCreateTbReq* pCreateTbReq = &createTbReq;
|
||||||
if (!pCreateTbReq) {
|
|
||||||
goto _end;
|
|
||||||
}
|
|
||||||
|
|
||||||
// set const
|
// set const
|
||||||
pCreateTbReq->flags = 0;
|
pCreateTbReq->flags = 0;
|
||||||
|
@ -460,6 +452,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
if (size == 2) {
|
if (size == 2) {
|
||||||
tagArray = taosArrayInit(1, sizeof(STagVal));
|
tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||||
if (!tagArray) {
|
if (!tagArray) {
|
||||||
|
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
STagVal tagVal = {
|
STagVal tagVal = {
|
||||||
|
@ -477,6 +470,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
} else {
|
} else {
|
||||||
tagArray = taosArrayInit(size - 1, sizeof(STagVal));
|
tagArray = taosArrayInit(size - 1, sizeof(STagVal));
|
||||||
if (!tagArray) {
|
if (!tagArray) {
|
||||||
|
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
|
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
|
||||||
|
@ -503,6 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
tTagNew(tagArray, 1, false, &pTag);
|
tTagNew(tagArray, 1, false, &pTag);
|
||||||
tagArray = taosArrayDestroy(tagArray);
|
tagArray = taosArrayDestroy(tagArray);
|
||||||
if (pTag == NULL) {
|
if (pTag == NULL) {
|
||||||
|
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -558,6 +553,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
SVCreateTbReq* pCreateTbReq = NULL;
|
SVCreateTbReq* pCreateTbReq = NULL;
|
||||||
|
|
||||||
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
|
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
|
||||||
|
taosMemoryFree(ctbName);
|
||||||
goto _end;
|
goto _end;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -574,6 +570,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
// set tag content
|
// set tag content
|
||||||
tagArray = taosArrayInit(1, sizeof(STagVal));
|
tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||||
if (!tagArray) {
|
if (!tagArray) {
|
||||||
|
taosMemoryFree(ctbName);
|
||||||
|
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
STagVal tagVal = {
|
STagVal tagVal = {
|
||||||
|
@ -589,6 +587,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
tagArray = taosArrayDestroy(tagArray);
|
tagArray = taosArrayDestroy(tagArray);
|
||||||
if (pTag == NULL) {
|
if (pTag == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosMemoryFree(ctbName);
|
||||||
|
tdDestroySVCreateTbReq(pCreateTbReq);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
|
||||||
|
@ -620,7 +620,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
", actual suid %" PRId64 "",
|
", actual suid %" PRId64 "",
|
||||||
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
|
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
taosMemoryFree(ctbName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tbData.uid = mr.me.uid;
|
tbData.uid = mr.me.uid;
|
||||||
|
@ -631,6 +630,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
// rows
|
// rows
|
||||||
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
|
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
|
||||||
taosArrayDestroy(tbData.aRowP);
|
taosArrayDestroy(tbData.aRowP);
|
||||||
|
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -681,6 +681,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
|
|
||||||
SSubmitReq2 submitReq = {0};
|
SSubmitReq2 submitReq = {0};
|
||||||
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
|
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
|
||||||
|
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -694,6 +695,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
len += sizeof(SSubmitReq2Msg);
|
len += sizeof(SSubmitReq2Msg);
|
||||||
pBuf = rpcMallocCont(len);
|
pBuf = rpcMallocCont(len);
|
||||||
if (NULL == pBuf) {
|
if (NULL == pBuf) {
|
||||||
|
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
|
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
|
||||||
|
@ -705,6 +707,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
|
||||||
tqError("failed to encode submit req since %s", terrstr());
|
tqError("failed to encode submit req since %s", terrstr());
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
rpcFreeCont(pBuf);
|
rpcFreeCont(pBuf);
|
||||||
|
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
|
@ -215,8 +215,8 @@ int smlProcess_json3_Test() {
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
const char *sql[] = {
|
const char *sql[] = {
|
||||||
"[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\","
|
"[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\",\"dc\":\"lga\"}}]"
|
||||||
"\"dc\":\"lga\"}}]"};
|
};
|
||||||
char *sql1[1] = {0};
|
char *sql1[1] = {0};
|
||||||
for (int i = 0; i < 1; i++) {
|
for (int i = 0; i < 1; i++) {
|
||||||
sql1[i] = taosMemoryCalloc(1, 1024);
|
sql1[i] = taosMemoryCalloc(1, 1024);
|
||||||
|
|
|
@ -855,7 +855,9 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
|
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
|
||||||
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
|
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
|
||||||
|
|
||||||
taosFsyncFile(pInfo->pConsumeRowsFile);
|
if(taosFsyncFile(pInfo->pConsumeRowsFile) < 0){
|
||||||
|
printf("taosFsyncFile error:%s", strerror(errno));
|
||||||
|
}
|
||||||
taosCloseFile(&pInfo->pConsumeRowsFile);
|
taosCloseFile(&pInfo->pConsumeRowsFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue