Merge pull request #27435 from taosdata/fix/blockEncodeReturnVal
fix: check blockEncode result
This commit is contained in:
commit
0f91df1b68
|
@ -576,8 +576,8 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
|
|||
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
||||
*pRsp = taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == *pRsp) {
|
||||
blockDataDestroy(pBlock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
(*pRsp)->useconds = 0;
|
||||
|
@ -589,6 +589,11 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
|
|||
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
|
||||
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, SHOW_VARIABLES_RESULT_COLS);
|
||||
if(len < 0) {
|
||||
uError("buildShowVariablesRsp error, len:%d", len);
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
blockDataDestroy(pBlock);
|
||||
|
||||
SET_PAYLOAD_LEN((*pRsp)->data, len, len);
|
||||
|
@ -600,10 +605,21 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
|
|||
if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
|
||||
uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
|
||||
(uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
code = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_exit:
|
||||
if(*pRsp) {
|
||||
taosMemoryFree(*pRsp);
|
||||
*pRsp = NULL;
|
||||
}
|
||||
if(pBlock) {
|
||||
blockDataDestroy(pBlock);
|
||||
pBlock = NULL;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
|
@ -711,8 +727,8 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
|
|||
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
||||
*pRsp = taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == *pRsp) {
|
||||
blockDataDestroy(pBlock);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
(*pRsp)->useconds = 0;
|
||||
|
@ -725,6 +741,11 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
|
|||
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
|
||||
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, COMPACT_DB_RESULT_COLS);
|
||||
if(len < 0) {
|
||||
uError("buildRetriveTableRspForCompactDb error, len:%d", len);
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
blockDataDestroy(pBlock);
|
||||
|
||||
SET_PAYLOAD_LEN((*pRsp)->data, len, len);
|
||||
|
@ -736,10 +757,21 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
|
|||
if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
|
||||
uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
|
||||
(uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
code = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_exit:
|
||||
if(*pRsp) {
|
||||
taosMemoryFree(*pRsp);
|
||||
*pRsp = NULL;
|
||||
}
|
||||
if(pBlock) {
|
||||
blockDataDestroy(pBlock);
|
||||
pBlock = NULL;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -2888,6 +2888,7 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
|
|||
return code;
|
||||
}
|
||||
|
||||
// return length of encoded data, return -1 if failed
|
||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
||||
int32_t dataLen = 0;
|
||||
|
||||
|
@ -2921,7 +2922,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if (pColInfoData == NULL) {
|
||||
return terrno;
|
||||
return -1;
|
||||
}
|
||||
|
||||
*((int8_t*)data) = pColInfoData->info.type;
|
||||
|
@ -2940,7 +2941,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
for (int32_t col = 0; col < numOfCols; ++col) {
|
||||
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
|
||||
if (pColRes == NULL) {
|
||||
return terrno;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// copy the null bitmap
|
||||
|
@ -2991,7 +2992,6 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
|
||||
*actualLen = dataLen;
|
||||
*groupId = pBlock->info.id.groupId;
|
||||
ASSERT(dataLen > 0);
|
||||
return dataLen;
|
||||
}
|
||||
|
||||
|
|
|
@ -479,6 +479,12 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t len = blockEncode(pBlock, pStart, numOfCols);
|
||||
if(len < 0) {
|
||||
dError("failed to retrieve data since %s", tstrerror(code));
|
||||
blockDataDestroy(pBlock);
|
||||
rpcFreeCont(pRsp);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pRsp->numOfRows = htonl(pBlock->info.rows);
|
||||
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
|
||||
|
|
|
@ -330,11 +330,9 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
|
||||
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
|
||||
if (pRsp == NULL) {
|
||||
mndReleaseShowObj(pShow, false);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code));
|
||||
blockDataDestroy(pBlock);
|
||||
TAOS_RETURN(code);
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pRsp->handle = htobe64(pShow->id);
|
||||
|
@ -356,6 +354,11 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
|
||||
if(len < 0){
|
||||
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code));
|
||||
code = terrno;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
pRsp->numOfRows = htonl(rowsRead);
|
||||
|
@ -374,6 +377,13 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
|
||||
blockDataDestroy(pBlock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_exit:
|
||||
mndReleaseShowObj(pShow, false);
|
||||
blockDataDestroy(pBlock);
|
||||
if(pRsp) {
|
||||
rpcFreeCont(pRsp);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static bool mndCheckRetrieveFinished(SShowObj *pShow) {
|
||||
|
|
|
@ -29,6 +29,10 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, void* pRsp, int32_t numOf
|
|||
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
||||
if(actualLen < 0){
|
||||
taosMemoryFree(buf);
|
||||
return terrno;
|
||||
}
|
||||
actualLen += sizeof(SRetrieveTableRspForTmq);
|
||||
if (taosArrayPush(((SMqDataRspCommon*)pRsp)->blockDataLen, &actualLen) == NULL){
|
||||
taosMemoryFree(buf);
|
||||
|
|
|
@ -50,6 +50,10 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
|
|||
(*pRsp)->numOfCols = htonl(numOfCols);
|
||||
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, numOfCols);
|
||||
if(len < 0) {
|
||||
taosMemoryFree(*pRsp);
|
||||
return terrno;
|
||||
}
|
||||
SET_PAYLOAD_LEN((*pRsp)->data, len, len);
|
||||
|
||||
int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
|
||||
|
|
|
@ -1978,6 +1978,10 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
rsp->numOfRows = htobe64((int64_t)rowNum);
|
||||
|
||||
int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, taosArrayGetSize(pBlock->pDataBlock));
|
||||
if(len < 0) {
|
||||
qError("qExplainGetRspFromCtx: blockEncode failed");
|
||||
QRY_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
rsp->compLen = htonl(len);
|
||||
rsp->payloadLen = htonl(len);
|
||||
|
|
|
@ -106,6 +106,10 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
|
|||
}
|
||||
|
||||
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, numOfCols);
|
||||
if(dataLen < 0) {
|
||||
qError("failed to encode data block, code: %d", dataLen);
|
||||
return terrno;
|
||||
}
|
||||
int32_t len =
|
||||
tsCompressString(pHandle->pCompressBuf, dataLen, 1, pEntry->data, pBuf->allocSize, ONE_STAGE_COMP, NULL, 0);
|
||||
if (len < dataLen) {
|
||||
|
@ -120,6 +124,10 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
|
|||
}
|
||||
} else {
|
||||
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
|
||||
if(pEntry->dataLen < 0) {
|
||||
qError("failed to encode data block, code: %d", pEntry->dataLen);
|
||||
return terrno;
|
||||
}
|
||||
pEntry->rawLen = pEntry->dataLen;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -548,7 +548,7 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
|
|||
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = terrno;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
|
|
|
@ -158,6 +158,10 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl
|
|||
pRetrieve->version = htobe64(pBlock->info.version);
|
||||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, numOfCols);
|
||||
if(actualLen < 0) {
|
||||
taosMemoryFree(pRetrieve);
|
||||
return terrno;
|
||||
}
|
||||
SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
|
||||
|
||||
int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
|
||||
|
@ -1064,7 +1068,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
|||
|
||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
||||
|
@ -1084,6 +1088,10 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
|||
pRetrieve->numOfCols = htonl(numOfCols);
|
||||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, numOfCols);
|
||||
if(actualLen < 0) {
|
||||
taosMemoryFree(buf);
|
||||
return terrno;
|
||||
}
|
||||
SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen);
|
||||
|
||||
int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN;
|
||||
|
|
Loading…
Reference in New Issue