fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-08-05 11:06:20 +08:00
parent 183f33af87
commit 9a2ee54719
1 changed files with 107 additions and 5 deletions

View File

@ -644,6 +644,10 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
SDataBlockInfo* pInfo = &pDataBlock->info; SDataBlockInfo* pInfo = &pDataBlock->info;
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex);
if (pColInfoData == NULL) {
return terrno;
}
if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) { if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) {
return 0; return 0;
} }
@ -685,6 +689,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
if (pCol1 == NULL || pCol2 == NULL) {
return terrno;
}
capacity = pDest->info.capacity; capacity = pDest->info.capacity;
int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
@ -709,6 +716,9 @@ int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
if (pCol2 == NULL || pCol1 == NULL) {
return terrno;
}
code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows); code = colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows);
if (code) { if (code) {
@ -729,6 +739,10 @@ void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (pCol == NULL) {
continue;
}
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows]; pCol->varmeta.length = pCol->varmeta.offset[pBlock->info.rows - numOfRows];
memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows); memset(pCol->varmeta.offset + pBlock->info.rows - numOfRows, 0, sizeof(*pCol->varmeta.offset) * numOfRows);
@ -760,6 +774,10 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (pColInfoData == NULL) {
continue;
}
total += colDataGetFullLength(pColInfoData, pBlock->info.rows); total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
} }
@ -861,6 +879,10 @@ int32_t blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t r
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
if (pColData == NULL || pDstCol == NULL) {
continue;
}
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
bool isNull = false; bool isNull = false;
if (pBlock->pBlockAgg == NULL) { if (pBlock->pBlockAgg == NULL) {
@ -908,6 +930,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (pCol == NULL) {
continue;
}
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t)); memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
pStart += numOfRows * sizeof(int32_t); pStart += numOfRows * sizeof(int32_t);
@ -958,6 +984,9 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (pCol == NULL) {
continue;
}
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
size_t metaSize = pBlock->info.rows * sizeof(int32_t); size_t metaSize = pBlock->info.rows * sizeof(int32_t);
@ -965,6 +994,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
if (tmp == NULL) { if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pCol->varmeta.offset = (int32_t*)tmp; pCol->varmeta.offset = (int32_t*)tmp;
memcpy(pCol->varmeta.offset, pStart, metaSize); memcpy(pCol->varmeta.offset, pStart, metaSize);
pStart += metaSize; pStart += metaSize;
@ -1039,6 +1069,10 @@ int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity)
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (pCol == NULL) {
continue;
}
pCol->hasNull = true; pCol->hasNull = true;
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
@ -1087,6 +1121,10 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
if (pColInfo == NULL) {
continue;
}
rowSize += pColInfo->info.bytes; rowSize += pColInfo->info.bytes;
} }
@ -1114,8 +1152,11 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
rowSize += pColInfo->info.bytes; if (pColInfo == NULL) {
continue;
}
rowSize += pColInfo->info.bytes;
if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
rowSize += sizeof(int32_t); rowSize += sizeof(int32_t);
} else { } else {
@ -1193,6 +1234,9 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = &pCols[i]; SColumnInfoData* pDst = &pCols[i];
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
if (pSrc == NULL) {
continue;
}
if (IS_VAR_DATA_TYPE(pSrc->info.type)) { if (IS_VAR_DATA_TYPE(pSrc->info.type)) {
if (pSrc->varmeta.length != 0) { if (pSrc->varmeta.length != 0) {
@ -1228,8 +1272,11 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
pCols[i].info = pColInfoData->info; if (pColInfoData == NULL) {
continue;
}
pCols[i].info = pColInfoData->info;
if (IS_VAR_DATA_TYPE(pCols[i].info.type)) { if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t)); pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length); pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
@ -1256,8 +1303,11 @@ static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
pColInfoData->info = pCols[i].info; if (pColInfoData == NULL) {
continue;
}
pColInfoData->info = pCols[i].info;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
taosMemoryFreeClear(pColInfoData->varmeta.offset); taosMemoryFreeClear(pColInfoData->varmeta.offset);
pColInfoData->varmeta = pCols[i].varmeta; pColInfoData->varmeta = pCols[i].varmeta;
@ -1301,8 +1351,15 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
if (pInfo == NULL) {
continue;
}
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
if (pColInfoData == NULL) {
continue;
}
if (pColInfoData->hasNull) { if (pColInfoData->hasNull) {
sortColumnHasNull = true; sortColumnHasNull = true;
} }
@ -1319,6 +1376,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
if (!varTypeSort) { if (!varTypeSort) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0); SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0);
if (pColInfoData == NULL || pOrder == NULL) {
return errno;
}
int64_t p0 = taosGetTimestampUs(); int64_t p0 = taosGetTimestampUs();
@ -1346,7 +1406,14 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
if (pInfo == NULL) {
continue;
}
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
if (pInfo->pColData == NULL) {
continue;
}
pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order); pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
} }
@ -1399,6 +1466,10 @@ void blockDataEmpty(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (p == NULL) {
continue;
}
colInfoDataCleanup(p, pInfo->capacity); colInfoDataCleanup(p, pInfo->capacity);
} }
@ -1417,6 +1488,10 @@ void blockDataReset(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (p == NULL) {
continue;
}
p->hasNull = false; p->hasNull = false;
p->reassigned = false; p->reassigned = false;
if (IS_VAR_DATA_TYPE(p->info.type)) { if (IS_VAR_DATA_TYPE(p->info.type)) {
@ -1527,6 +1602,10 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
if (p == NULL) {
return terrno;
}
code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false); code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
if (code) { if (code) {
return code; return code;
@ -1544,6 +1623,10 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
if (pColInfoData == NULL) {
continue;
}
colDataDestroy(pColInfoData); colDataDestroy(pColInfoData);
} }
@ -1579,6 +1662,10 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
size_t numOfCols = taosArrayGetSize(src->pDataBlock); size_t numOfCols = taosArrayGetSize(src->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(src->pDataBlock, i); SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
if (p == NULL) {
return terrno;
}
SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; SColumnInfoData colInfo = {.hasNull = true, .info = p->info};
code = blockDataAppendColInfo(dst, &colInfo); code = blockDataAppendColInfo(dst, &colInfo);
if (code) { if (code) {
@ -1594,7 +1681,7 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
if (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type))) { if (pSrc == NULL || pDst == NULL || (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type)))) {
continue; continue;
} }
@ -1622,6 +1709,10 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i); SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, i);
if (pDstCol == NULL || pSrcCol == NULL) {
continue;
}
int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info); int32_t ret = colDataAssign(pDstCol, pSrcCol, pSrc->info.rows, &pSrc->info);
if (ret < 0) { if (ret < 0) {
code = ret; code = ret;
@ -3149,15 +3240,26 @@ int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
if (!pDataBlock || !pOrderInfo) return 0; if (!pDataBlock || !pOrderInfo) return 0;
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i); SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
if (pOrder == NULL) {
continue;
}
pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId); pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
if (pOrder->pColData == NULL) {
continue;
}
pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order); pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
} }
SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock}; SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
int32_t rowIdx = 0, nextRowIdx = 1;
int32_t rowIdx = 0, nextRowIdx = 1;
for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) { for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) { if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
break; break;
} }
} }
return nextRowIdx; return nextRowIdx;
} }