block buf len check
This commit is contained in:
parent
d8218b3872
commit
d01a3481bc
|
@ -266,7 +266,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId)
|
|||
int32_t bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index, SColumnInfoData** pColInfoData);
|
||||
|
||||
int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
|
||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
|
||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataLen, int32_t numOfCols);
|
||||
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos);
|
||||
|
||||
// for debug
|
||||
|
|
|
@ -2355,12 +2355,22 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t checkResultInfo(SReqResultInfo* pResultInfo) {
|
||||
int32_t resultInfoSafeCheck(SReqResultInfo* pResultInfo) {
|
||||
if (pResultInfo->totalRows < pResultInfo->numOfRows) {
|
||||
tscError("checkResultInfo error: totalRows:%" PRId64 " < numOfRows:%" PRId64, pResultInfo->totalRows,
|
||||
pResultInfo->numOfRows);
|
||||
return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
}
|
||||
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
|
||||
if (pResultInfo->fields[i].bytes <= 0) {
|
||||
tscError("checkResultInfo error: bytes:%d <= 0", pResultInfo->fields[i].bytes);
|
||||
return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
}
|
||||
if(!IS_VAR_DATA_TYPE(pResultInfo->fields[i].type) && TYPE_BYTES[pResultInfo->fields[i].type] != pResultInfo->fields[i].bytes) {
|
||||
tscError("checkResultInfo error: type:%d bytes:%d != %d", pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, TYPE_BYTES[pResultInfo->fields[i].type]);
|
||||
return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -2472,7 +2482,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) {
|
|||
if (convertUcs4) {
|
||||
code = doConvertUCS4(pResultInfo, colLength);
|
||||
}
|
||||
code = checkResultInfo(pResultInfo);
|
||||
code = resultInfoSafeCheck(pResultInfo);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -588,7 +588,8 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
|
|||
return code;
|
||||
}
|
||||
|
||||
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
||||
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
|
||||
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
|
||||
*pRsp = taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == *pRsp) {
|
||||
code = terrno;
|
||||
|
@ -603,7 +604,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
|
|||
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
|
||||
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, SHOW_VARIABLES_RESULT_COLS);
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, SHOW_VARIABLES_RESULT_COLS);
|
||||
if(len < 0) {
|
||||
uError("buildShowVariablesRsp error, len:%d", len);
|
||||
code = terrno;
|
||||
|
@ -741,7 +742,8 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
|
|||
return code;
|
||||
}
|
||||
|
||||
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
||||
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
|
||||
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
|
||||
*pRsp = taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == *pRsp) {
|
||||
code = terrno;
|
||||
|
@ -757,7 +759,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
|
|||
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
|
||||
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, COMPACT_DB_RESULT_COLS);
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, COMPACT_DB_RESULT_COLS);
|
||||
if(len < 0) {
|
||||
uError("buildRetriveTableRspForCompactDb error, len:%d", len);
|
||||
code = terrno;
|
||||
|
|
|
@ -3041,7 +3041,7 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
|
|||
}
|
||||
|
||||
// return length of encoded data, return -1 if failed
|
||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
|
||||
blockDataCheck(pBlock, false);
|
||||
|
||||
int32_t dataLen = 0;
|
||||
|
@ -3106,9 +3106,11 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
size_t metaSize = 0;
|
||||
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
||||
metaSize = numOfRows * sizeof(int32_t);
|
||||
if(dataLen + metaSize > dataBuflen) goto _exit;
|
||||
memcpy(data, pColRes->varmeta.offset, metaSize);
|
||||
} else {
|
||||
metaSize = BitmapLen(numOfRows);
|
||||
if(dataLen + metaSize > dataBuflen) goto _exit;
|
||||
memcpy(data, pColRes->nullbitmap, metaSize);
|
||||
}
|
||||
|
||||
|
@ -3127,12 +3129,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
}
|
||||
colSizes[col] += colSize;
|
||||
dataLen += colSize;
|
||||
if(dataLen > dataBuflen) goto _exit;
|
||||
(void) memmove(data, pColData, colSize);
|
||||
data += colSize;
|
||||
}
|
||||
} else {
|
||||
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
||||
dataLen += colSizes[col];
|
||||
if(dataLen > dataBuflen) goto _exit;
|
||||
if (pColRes->pData != NULL) {
|
||||
(void) memmove(data, pColRes->pData, colSizes[col]);
|
||||
}
|
||||
|
@ -3156,7 +3160,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
|||
|
||||
*actualLen = dataLen;
|
||||
*groupId = pBlock->info.id.groupId;
|
||||
if (dataLen > dataBuflen) goto _exit;
|
||||
|
||||
return dataLen;
|
||||
|
||||
_exit:
|
||||
uError("blockEncode dataLen:%d, dataBuflen:%" PRIx64, dataLen, dataBuflen);
|
||||
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos) {
|
||||
|
|
|
@ -548,8 +548,8 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols +
|
||||
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(numOfCols);
|
||||
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
|
||||
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeSize;
|
||||
|
||||
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
|
||||
if (pRsp == NULL) {
|
||||
|
@ -574,7 +574,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
pStart += sizeof(SSysTableSchema);
|
||||
}
|
||||
|
||||
int32_t len = blockEncode(pBlock, pStart, numOfCols);
|
||||
int32_t len = blockEncode(pBlock, pStart, dataEncodeSize, numOfCols);
|
||||
if (len < 0) {
|
||||
dError("failed to retrieve data since %s", tstrerror(code));
|
||||
blockDataDestroy(pBlock);
|
||||
|
|
|
@ -333,8 +333,8 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
|
||||
}
|
||||
|
||||
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
|
||||
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock));
|
||||
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
|
||||
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns + dataEncodeSize;
|
||||
|
||||
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
|
||||
if (pRsp == NULL) {
|
||||
|
@ -361,7 +361,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
|||
pStart += sizeof(SSysTableSchema);
|
||||
}
|
||||
|
||||
int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
|
||||
int32_t len = blockEncode(pBlock, pStart, dataEncodeSize, pShow->pMeta->numOfColumns);
|
||||
if(len < 0){
|
||||
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code));
|
||||
code = terrno;
|
||||
|
|
|
@ -16,7 +16,8 @@
|
|||
#include "tq.h"
|
||||
|
||||
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
|
||||
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + blockGetEncodeSize(pBlock);
|
||||
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
|
||||
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeSize;
|
||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||
if (buf == NULL) {
|
||||
return terrno;
|
||||
|
@ -28,7 +29,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
|
|||
pRetrieve->compressed = 0;
|
||||
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeSize, numOfCols);
|
||||
if(actualLen < 0){
|
||||
taosMemoryFree(buf);
|
||||
return terrno;
|
||||
|
|
|
@ -35,7 +35,8 @@
|
|||
extern SConfig* tsCfg;
|
||||
|
||||
static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRetrieveTableRsp** pRsp) {
|
||||
size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
||||
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
|
||||
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
|
||||
*pRsp = taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == *pRsp) {
|
||||
return terrno;
|
||||
|
@ -49,7 +50,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
|
|||
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||
(*pRsp)->numOfCols = htonl(numOfCols);
|
||||
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, numOfCols);
|
||||
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, numOfCols);
|
||||
if(len < 0) {
|
||||
taosMemoryFree(*pRsp);
|
||||
return terrno;
|
||||
|
|
|
@ -1966,7 +1966,8 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
|
||||
pBlock->info.rows = rowNum;
|
||||
|
||||
int32_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
||||
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
|
||||
int32_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
|
||||
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == rsp) {
|
||||
|
@ -1977,7 +1978,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
rsp->completed = 1;
|
||||
rsp->numOfRows = htobe64((int64_t)rowNum);
|
||||
|
||||
int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, taosArrayGetSize(pBlock->pDataBlock));
|
||||
int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock));
|
||||
if(len < 0) {
|
||||
qError("qExplainGetRspFromCtx: blockEncode failed");
|
||||
QRY_ERR_JRET(terrno);
|
||||
|
|
|
@ -84,17 +84,18 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
|
|||
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||
|
||||
{
|
||||
size_t dataEncodeSize = pBuf->allocSize + 8;
|
||||
if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
|
||||
if (pHandle->pCompressBuf == NULL) {
|
||||
// allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
|
||||
pHandle->pCompressBuf = taosMemoryMalloc(pBuf->allocSize + 8);
|
||||
pHandle->pCompressBuf = taosMemoryMalloc(dataEncodeSize);
|
||||
if (NULL == pHandle->pCompressBuf) {
|
||||
QRY_RET(terrno);
|
||||
}
|
||||
pHandle->bufSize = pBuf->allocSize + 8;
|
||||
pHandle->bufSize = dataEncodeSize;
|
||||
} else {
|
||||
if (pHandle->bufSize < pBuf->allocSize + 8) {
|
||||
pHandle->bufSize = pBuf->allocSize + 8;
|
||||
if (pHandle->bufSize < dataEncodeSize) {
|
||||
pHandle->bufSize = dataEncodeSize;
|
||||
void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
|
||||
if (p != NULL) {
|
||||
pHandle->pCompressBuf = p;
|
||||
|
@ -105,7 +106,7 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
|
|||
}
|
||||
}
|
||||
|
||||
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, numOfCols);
|
||||
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, dataEncodeSize, numOfCols);
|
||||
if(dataLen < 0) {
|
||||
qError("failed to encode data block, code: %d", dataLen);
|
||||
return terrno;
|
||||
|
@ -123,7 +124,7 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
|
|||
TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen);
|
||||
}
|
||||
} else {
|
||||
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
|
||||
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, dataEncodeSize, numOfCols);
|
||||
if(pEntry->dataLen < 0) {
|
||||
qError("failed to encode data block, code: %d", pEntry->dataLen);
|
||||
return terrno;
|
||||
|
|
|
@ -145,7 +145,8 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
|
|||
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req) {
|
||||
SRetrieveTableRsp* pRetrieve = NULL;
|
||||
|
||||
int32_t len = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
||||
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
|
||||
int32_t len = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
|
||||
|
||||
pRetrieve = taosMemoryCalloc(1, len);
|
||||
if (pRetrieve == NULL) return terrno;
|
||||
|
@ -162,7 +163,7 @@ static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBl
|
|||
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
||||
pRetrieve->version = htobe64(pBlock->info.version);
|
||||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, numOfCols);
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, numOfCols);
|
||||
if (actualLen < 0) {
|
||||
taosMemoryFree(pRetrieve);
|
||||
return terrno;
|
||||
|
@ -1203,7 +1204,8 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
|
||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
|
||||
size_t dataEncodeSize = blockGetEncodeSize(pBlock);
|
||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN;
|
||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||
if (buf == NULL) {
|
||||
return terrno;
|
||||
|
@ -1225,7 +1227,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
|||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
||||
pRetrieve->numOfCols = htonl(numOfCols);
|
||||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, numOfCols);
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, numOfCols);
|
||||
if (actualLen < 0) {
|
||||
taosMemoryFree(buf);
|
||||
return terrno;
|
||||
|
|
Loading…
Reference in New Issue