fix(query): fix the invalid decode.

This commit is contained in:
Haojun Liao 2022-12-16 00:43:19 +08:00
parent b5f611328a
commit 28559dc41c
1 changed files with 15 additions and 14 deletions

View File

@ -27,7 +27,7 @@ extern SDataSinkStat gDataSinkStat;
typedef struct SSubmitRes { typedef struct SSubmitRes {
int64_t affectedRows; int64_t affectedRows;
int32_t code; int32_t code;
SSubmitRsp* pRsp; SSubmitRsp2* pRsp;
} SSubmitRes; } SSubmitRes;
typedef struct SDataInserterHandle { typedef struct SDataInserterHandle {
@ -61,38 +61,39 @@ int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp)); pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp));
SDecoder coder = {0}; SDecoder coder = {0};
tDecoderInit(&coder, pMsg->pData, pMsg->len); tDecoderInit(&coder, pMsg->pData, pMsg->len);
code = tDecodeSSubmitRsp(&coder, pInserter->submitRes.pRsp); code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
if (code) { if (code) {
tFreeSSubmitRsp(pInserter->submitRes.pRsp); // tFreeSSubmitRsp(pInserter->submitRes.pRsp);
pInserter->submitRes.code = code; pInserter->submitRes.code = code;
goto _return; goto _return;
} }
if (pInserter->submitRes.pRsp->nBlocks > 0) { if (pInserter->submitRes.pRsp->affectedRows > 0) {
for (int32_t i = 0; i < pInserter->submitRes.pRsp->nBlocks; ++i) { SArray* pCreateTbList = pInserter->submitRes.pRsp->aCreateTbRsp;
SSubmitBlkRsp* blk = pInserter->submitRes.pRsp->pBlocks + i; int32_t numOfTables = taosArrayGetSize(pCreateTbList);
if (TSDB_CODE_SUCCESS != blk->code) {
code = blk->code; for (int32_t i = 0; i < numOfTables; ++i) {
tFreeSSubmitRsp(pInserter->submitRes.pRsp); SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i);
if (TSDB_CODE_SUCCESS != pRsp->code) {
code = pRsp->code;
// tFreeSSubmitRsp(pInserter->submitRes.pRsp);
pInserter->submitRes.code = code; pInserter->submitRes.code = code;
goto _return; goto _return;
} }
} }
} }
pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows; // pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
// pInserter->submitRes.affectedRows += pInserter->submitRes.
qDebug("submit rsp received, affectedRows:%d, total:%"PRId64, pInserter->submitRes.pRsp->affectedRows, qDebug("submit rsp received, affectedRows:%d, total:%"PRId64, pInserter->submitRes.pRsp->affectedRows,
pInserter->submitRes.affectedRows); pInserter->submitRes.affectedRows);
tFreeSSubmitRsp(pInserter->submitRes.pRsp); // tFreeSSubmitRsp(pInserter->submitRes.pRsp);
} }
_return: _return:
tsem_post(&pInserter->ready); tsem_post(&pInserter->ready);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }