fix(common): check the return value.

This commit is contained in:
Haojun Liao 2024-07-27 18:55:34 +08:00
parent 777c3898db
commit 7be2ecbe1f
40 changed files with 571 additions and 271 deletions

View File

@ -254,13 +254,22 @@ void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc);
SSDataBlock* createDataBlock();
#define QRY_OPTR_CHECK(_o) \
do { \
if ((_o) == NULL) { \
return TSDB_CODE_INVALID_PARA; \
} else { \
*(_o) = NULL; \
} \
} while(0)
int32_t createDataBlock(SSDataBlock** pResBlock);
void blockDataDestroy(SSDataBlock* pBlock);
void blockDataFreeRes(SSDataBlock* pBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
SSDataBlock* createSpecialDataBlock(EStreamType type);
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock);
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock);
SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx);
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock);
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);

View File

@ -1018,14 +1018,12 @@ void returnToUser(SRequestObj* pRequest) {
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock**pBlock) {
int64_t lastTs = 0;
int32_t code = TSDB_CODE_SUCCESS;
TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
*pBlock = createDataBlock();
if (NULL == *pBlock) {
return terrno;
int32_t code = createDataBlock(pBlock);
if (code) {
return code;
}
for(int32_t i = 0; i < numOfFields; ++i) {

View File

@ -21,8 +21,6 @@
#define MALLOC_ALIGN_BYTES 32
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
if (pColumnInfoData->reassigned) {
@ -134,7 +132,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
uint32_t len = pColumnInfoData->varmeta.length;
pColumnInfoData->varmeta.offset[rowIndex] = len;
memmove(pColumnInfoData->pData + len, pData, dataLen);
(void) memmove(pColumnInfoData->pData + len, pData, dataLen);
pColumnInfoData->varmeta.length += dataLen;
} else {
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
@ -290,7 +288,7 @@ int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t curren
pColumnInfoData->reassigned = true;
}
return TSDB_CODE_SUCCESS;
return code;
}
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
@ -371,7 +369,7 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2) {
if (pColumnInfoData->info.type != pSource->info.type) {
return TSDB_CODE_FAILED;
return TSDB_CODE_INVALID_PARA;
}
if (numOfRow2 == 0) {
@ -823,12 +821,15 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
}
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) {
int32_t code = 0;
if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
return NULL;
}
SSDataBlock* pDst = createOneDataBlock(pBlock, false);
if (pDst == NULL) {
SSDataBlock* pDst = NULL;
code = createOneDataBlock(pBlock, false, &pDst);
if (code) {
terrno = code;
return NULL;
}
@ -859,7 +860,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
colDataSetNULL(pDstCol, j - startIndex);
} else {
char* p = colDataGetData(pColData, j);
colDataSetVal(pDstCol, j - startIndex, p, false);
code = colDataSetVal(pDstCol, j - startIndex, p, false);
}
}
}
@ -1600,53 +1601,112 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* createSpecialDataBlock(EStreamType type) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.id.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = type;
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
pBlock->info.watermark = INT64_MIN;
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
QRY_OPTR_CHECK(pBlock);
int32_t code = 0;
SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (p == NULL) {
return terrno;
}
p->info.hasVarCol = false;
p->info.id.groupId = 0;
p->info.rows = 0;
p->info.type = type;
p->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
p->info.watermark = INT64_MIN;
p->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
if (p->pDataBlock == NULL) {
taosMemoryFree(p);
return terrno;
}
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY);
// window start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
void* px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
// window end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t);
// uid
taosArrayPush(pBlock->pDataBlock, &infoData);
px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
// group id
taosArrayPush(pBlock->pDataBlock, &infoData);
px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY);
// calculate start ts
taosArrayPush(pBlock->pDataBlock, &infoData);
px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
// calculate end ts
taosArrayPush(pBlock->pDataBlock, &infoData);
px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
// table name
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
*pBlock = p;
return code;
_err:
taosArrayDestroy(p->pDataBlock);
taosMemoryFree(p);
return code;
}
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
if (pDataBlock == NULL) {
return NULL;
return TSDB_CODE_INVALID_PARA;
}
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
}
SSDataBlock* pBlock = createDataBlock();
pBlock->info = pDataBlock->info;
pBlock->info.rows = 0;
pBlock->info.capacity = 0;
@ -1658,11 +1718,10 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
blockDataAppendColInfo(pBlock, &colInfo);
}
int32_t code = blockDataEnsureCapacity(pBlock, 1);
code = blockDataEnsureCapacity(pBlock, 1);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pBlock);
return NULL;
return code;
}
for (int32_t i = 0; i < numOfCols; ++i) {
@ -1670,13 +1729,17 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
void* pData = NULL;
if (!isNull) pData = colDataGetData(pSrc, rowIdx);
colDataSetVal(pDst, 0, pData, isNull);
if (!isNull) {
pData = colDataGetData(pSrc, rowIdx);
}
code = colDataSetVal(pDst, 0, pData, isNull);
}
pBlock->info.rows = 1;
return pBlock;
*pResBlock = pBlock;
return code;
}
void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
@ -1699,12 +1762,18 @@ void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
memcpy(p->pData, pDst->pks[1].pData, p->nData);
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
if (pDataBlock == NULL) {
return NULL;
return TSDB_CODE_INVALID_PARA;
}
SSDataBlock* pDstBlock = NULL;
int32_t code = createDataBlock(&pDstBlock);
if (code) {
return code;
}
SSDataBlock* pDstBlock = createDataBlock();
pDstBlock->info = pDataBlock->info;
pDstBlock->info.rows = 0;
@ -1723,11 +1792,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
copyPkVal(&pDstBlock->info, &pDataBlock->info);
if (copyData) {
int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock);
return NULL;
return code;
}
for (int32_t i = 0; i < numOfCols; ++i) {
@ -1740,24 +1808,26 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pDstBlock->info.capacity = pDataBlock->info.rows;
}
return pDstBlock;
*pResBlock = pDstBlock;
return code;
}
SSDataBlock* createDataBlock() {
int32_t createDataBlock(SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return terrno;
}
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
if (pBlock->pDataBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t code = terrno;
taosMemoryFree(pBlock);
return NULL;
return code;
}
return pBlock;
*pResBlock = pBlock;
return 0;
}
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) {
@ -1771,7 +1841,6 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat
void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
@ -1863,7 +1932,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
int32_t newLen = BitmapLen(total - n);
if (n % 8 == 0) {
memmove(nullBitmap, nullBitmap + n / 8, newLen);
(void) memmove(nullBitmap, nullBitmap + n / 8, newLen);
} else {
int32_t tail = n % 8;
int32_t i = 0;
@ -1924,23 +1993,23 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s
}
if (dataOffset > 0) {
memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
(void) memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
}
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
(void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
return dataLen;
}
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
// pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
(void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
// clear the offset value of the unused entries.
memset(&pColInfoData->varmeta.offset[total - n], 0, n);
} else {
int32_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
(void) memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
doShiftBitmap(pColInfoData->nullbitmap, n, total);
}
}
@ -2258,18 +2327,19 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
int64_t uid, int32_t vgId, tb_uid_t suid) {
SSubmitReq2* pReq = *ppReq;
SArray* pVals = NULL;
int32_t numOfBlks = 0;
int32_t sz = 1;
terrno = TSDB_CODE_SUCCESS;
int32_t code = 0;
*ppReq = NULL;
terrno = 0;
if (NULL == pReq) {
if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _end;
}
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
code = terrno;
goto _end;
}
}
@ -2317,13 +2387,23 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
isStartKey = true;
ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId);
SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var}));
taosArrayPush(pVals, &cv);
void* px = taosArrayPush(pVals, &cv);
if (px == NULL) {
return terrno;
}
} else if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
void* px = taosArrayPush(pVals, &cv);
if (px == NULL) {
return terrno;
}
} else {
SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(int64_t*)var}));
taosArrayPush(pVals, &cv);
void* px = taosArrayPush(pVals, &cv);
if (px == NULL) {
return terrno;
}
}
break;
case TSDB_DATA_TYPE_NCHAR:
@ -2394,29 +2474,38 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
}
}
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
if ((code = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
taosArrayPush(tbData.aRowP, &pRow);
void* px = taosArrayPush(tbData.aRowP, &pRow);
if (px == NULL) {
code = terrno;
goto _end;
}
}
taosArrayPush(pReq->aSubmitTbData, &tbData);
void* px = taosArrayPush(pReq->aSubmitTbData, &tbData);
if (px == NULL) {
code = terrno;
goto _end;
}
}
_end:
taosArrayDestroy(pVals);
if (terrno != 0) {
*ppReq = NULL;
if (code != 0) {
if (pReq) {
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pReq);
}
return TSDB_CODE_FAILED;
}
} else {
*ppReq = pReq;
return TSDB_CODE_SUCCESS;
}
return code;
}
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId) {
@ -2579,14 +2668,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
}
colSizes[col] += colSize;
dataLen += colSize;
memmove(data, pColData, colSize);
(void) memmove(data, pColData, colSize);
data += colSize;
}
} else {
colSizes[col] = colDataGetLength(pColRes, numOfRows);
dataLen += colSizes[col];
if (pColRes->pData != NULL) {
memmove(data, pColRes->pData, colSizes[col]);
(void) memmove(data, pColRes->pData, colSizes[col]);
}
data += colSizes[col];
}

View File

@ -236,7 +236,9 @@ TEST(testCase, toInteger_test) {
}
TEST(testCase, Datablock_test) {
SSDataBlock* b = createDataBlock();
SSDataBlock* b = NULL;
int32_t code = createDataBlock(&b);
ASSERT(code == 0);
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1);
taosArrayPush(b->pDataBlock, &infoData);
@ -361,7 +363,9 @@ TEST(testCase, non_var_dataBlock_split_test) {
TEST(testCase, var_dataBlock_split_test) {
int32_t numOfRows = 1000000;
SSDataBlock* b = createDataBlock();
SSDataBlock* b = NULL;
int32_t code = createDataBlock(&b);
ASSERT(code == 0);
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1);
blockDataAppendColInfo(b, &infoData);

View File

@ -289,7 +289,12 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
int32_t numOfCols = pShow->pMeta->numOfColumns;
SSDataBlock *pBlock = createDataBlock();
SSDataBlock *pBlock = NULL;
code = createDataBlock(&pBlock);
if (code) {
TAOS_RETURN(code);
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {0};

View File

@ -289,7 +289,13 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
pReader->cachedSchemaSuid = 0;
pReader->pSchemaWrapper = NULL;
pReader->tbIdHash = NULL;
pReader->pResBlock = createDataBlock();
pReader->pResBlock = NULL;
int32_t code = createDataBlock(&pReader->pResBlock);
if (code) {
terrno = code;
}
return pReader;
}

View File

@ -117,8 +117,13 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
}
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
pHandle->block = createOneDataBlock(pDataBlock, true);
TSDB_CHECK_NULL(pDataBlock, code, line, END, terrno);
pHandle->block = NULL;
code = createOneDataBlock(pDataBlock, true, &pHandle->block);
if (code) {
return code;
}
pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset);
code = getDataBlock(task, pHandle, vgId, &pDataBlock);

View File

@ -576,8 +576,10 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
goto END;
}
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
TSDB_CHECK_NULL(pDelBlock, code, line, END, terrno)
SSDataBlock* pDelBlock = NULL;
code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock);
TSDB_CHECK_CODE(code, line, END);
code = blockDataEnsureCapacity(pDelBlock, numOfTables);
TSDB_CHECK_CODE(code, line, END);

View File

@ -402,28 +402,30 @@ static void initReaderStatus(SReaderStatus* pStatus) {
}
static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) {
*pResBlock = createDataBlock();
if (*pResBlock == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
QRY_OPTR_CHECK(pResBlock);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code != 0) {
return code;
}
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info = pCond->colList[i];
int32_t code = blockDataAppendColInfo(*pResBlock, &colInfo);
code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(*pResBlock);
*pResBlock = NULL;
taosMemoryFree(pBlock);
return code;
}
}
int32_t code = blockDataEnsureCapacity(*pResBlock, capacity);
code = blockDataEnsureCapacity(pBlock, capacity);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(*pResBlock);
*pResBlock = NULL;
taosMemoryFree(pBlock);
}
*pResBlock = pBlock;
return code;
}

View File

@ -74,13 +74,16 @@ static int32_t getSchemaBytes(const SSchema* pSchema) {
}
static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock();
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
QRY_OPTR_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
}
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_FIELD_LEN, 1);
int32_t code = blockDataAppendColInfo(pBlock, &infoData);
code = blockDataAppendColInfo(pBlock, &infoData);
if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_TYPE_LEN, 2);
code = blockDataAppendColInfo(pBlock, &infoData);
@ -229,13 +232,16 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp**
static int32_t execResetQueryCache() { return catalogClearCache(); }
static int32_t buildCreateDBResultDataBlock(SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock();
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
QRY_OPTR_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
}
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_COLS, 1);
int32_t code = blockDataAppendColInfo(pBlock, &infoData);
code = blockDataAppendColInfo(pBlock, &infoData);
if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_FIELD2_LEN, 2);
code = blockDataAppendColInfo(pBlock, &infoData);
@ -418,13 +424,16 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT
}
static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock();
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
QRY_OPTR_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
}
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD1_LEN, 1);
int32_t code = blockDataAppendColInfo(pBlock, &infoData);
code = blockDataAppendColInfo(pBlock, &infoData);
if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD2_LEN, 2);
code = blockDataAppendColInfo(pBlock, &infoData);
@ -439,13 +448,16 @@ static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
}
static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock();
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
QRY_OPTR_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
}
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD1_LEN, 1);
int32_t code = blockDataAppendColInfo(pBlock, &infoData);
code = blockDataAppendColInfo(pBlock, &infoData);
if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD2_LEN, 2);
code = blockDataAppendColInfo(pBlock, &infoData);
@ -894,9 +906,12 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
}
static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock();
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
QRY_OPTR_CHECK(pOutput);
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
}
SNode* pProj = NULL;
@ -912,8 +927,9 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p
}
QRY_ERR_RET(blockDataAppendColInfo(pBlock, &infoData));
}
*pOutput = pBlock;
return TSDB_CODE_SUCCESS;
return code;
}
int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {

View File

@ -1942,6 +1942,7 @@ _return:
int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
int32_t code = 0;
SSDataBlock *pBlock = NULL;
SExplainCtx *pCtx = (SExplainCtx *)ctx;
int32_t rowNum = taosArrayGetSize(pCtx->rows);
if (rowNum <= 0) {
@ -1949,7 +1950,9 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(TSDB_CODE_APP_ERROR);
}
SSDataBlock *pBlock = createDataBlock();
code = createDataBlock(&pBlock);
QRY_ERR_JRET(code);
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_EXPLAIN_RESULT_ROW_SIZE, 1);
QRY_ERR_JRET(blockDataAppendColInfo(pBlock, &infoData));
QRY_ERR_JRET(blockDataEnsureCapacity(pBlock, rowNum));

View File

@ -20,15 +20,6 @@
extern "C" {
#endif
#define QRY_OPTR_CHECK(_o) \
do { \
if ((_o) == NULL) { \
return TSDB_CODE_INVALID_PARA; \
} else { \
*(_o) = NULL; \
} \
} while(0)
typedef struct SOperatorCostInfo {
double openCost;
double totalCost;

View File

@ -364,7 +364,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
return TSDB_CODE_SUCCESS;
}
SSDataBlock* pBlock = createDataBlock();
SSDataBlock* pBlock = NULL;
code = createDataBlock(&pBlock);
if (code) {
return code;
}
pBlock->info.rows = 1;
pBlock->info.capacity = 0;

View File

@ -181,7 +181,10 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
capacity = TMIN(totalTables, 4096);
pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false);
pInfo->pBufferedRes = NULL;
code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes);
QUERY_CHECK_CODE(code, lino, _error);
setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -672,7 +672,10 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
pStart += sizeof(SSysTableSchema);
}
SSDataBlock* pBlock = createDataBlock();
SSDataBlock* pBlock = NULL;
code = createDataBlock(&pBlock);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
code = blockDataAppendColInfo(pBlock, &idata);
@ -791,8 +794,8 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
blockDataCleanup(pb);
} else {
pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
QUERY_CHECK_NULL(pb, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
}
int32_t compLen = *(int32_t*)pStart;

View File

@ -252,8 +252,12 @@ SArray* createSortInfo(SNodeList* pNodeList) {
SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
SSDataBlock* pBlock = createDataBlock();
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
terrno = code;
return NULL;
}
pBlock->info.id.blockId = pNode->dataBlockId;
pBlock->info.type = STREAM_INVALID;
@ -267,7 +271,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
idata.info.scale = pDescNode->dataType.scale;
idata.info.precision = pDescNode->dataType.precision;
int32_t code = blockDataAppendColInfo(pBlock, &idata);
code = blockDataAppendColInfo(pBlock, &idata);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
blockDataDestroy(pBlock);
@ -1029,11 +1033,9 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
SStorageAPI* pStorageAPI) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSDataBlock* pResBlock = createDataBlock();
if (pResBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SSDataBlock* pResBlock = NULL;
code = createDataBlock(&pResBlock);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
SColumnInfoData colInfo = {0};

View File

@ -695,9 +695,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
while (pRes != NULL) {
SSDataBlock* p = NULL;
if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
SSDataBlock* p1 = createOneDataBlock(pRes, true);
SSDataBlock* p1 = NULL;
code = createOneDataBlock(pRes, true, &p1);
void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
p = p1;
} else {
p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);

View File

@ -513,7 +513,13 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
goto _error;
}
pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
pInfo->pFinalRes = NULL;
code = createOneDataBlock(pInfo->pRes, false, &pInfo->pFinalRes);
if (code) {
goto _error;
}
code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
if (code != TSDB_CODE_SUCCESS) {
goto _error;

View File

@ -613,7 +613,10 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc
return NULL;
}
SSDataBlock* pDstBlock = createDataBlock();
SSDataBlock* pDstBlock = NULL;
code = createDataBlock(&pDstBlock);
QUERY_CHECK_CODE(code, lino, _end);
pDstBlock->info = pDataBlock->info;
pDstBlock->info.id.blockId = pOperator->resultDataBlockId;
pDstBlock->info.capacity = 0;
@ -1319,7 +1322,10 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
SSDataBlock* pTmpBlock = NULL;
code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock);
QUERY_CHECK_CODE(code, lino, _end);
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
pTmpBlock->info.id.groupId = groupId;
char* tbName = pSrcBlock->info.parTbName;
@ -1708,8 +1714,9 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
taosHashSetFreeFp(pInfo->pPartitions, freePartItem);
pInfo->tsColIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
QUERY_CHECK_NULL(pInfo->pDelRes, code, lino, _error, terrno);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);

View File

@ -1157,22 +1157,23 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN
int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode));
if (TSDB_CODE_SUCCESS != code) {
QRY_ERR_RET(terrno);
QRY_ERR_RET(code);
}
if (NULL != pJoin->pPreFilter) {
pJoin->midBlk = createOneDataBlock(pJoin->finBlk, false);
if (NULL == pJoin->finBlk) {
QRY_ERR_RET(terrno);
pJoin->midBlk = NULL;
code = createOneDataBlock(pJoin->finBlk, false, &pJoin->midBlk);
if (code) {
QRY_ERR_RET(code);
}
code = blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity);
if (TSDB_CODE_SUCCESS != code) {
QRY_ERR_RET(terrno);
QRY_ERR_RET(code);
}
}
pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO;
return TSDB_CODE_SUCCESS;
}

View File

@ -2248,10 +2248,12 @@ static int32_t mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInf
} while (true);
if (buildGot && NULL == pCtx->cache.outBlk) {
pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false);
if (NULL == pCtx->cache.outBlk) {
MJ_ERR_RET(terrno);
pCtx->cache.outBlk = NULL;
int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk);
if (code) {
MJ_ERR_RET(code);
}
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
}
@ -2678,10 +2680,12 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo
if (buildGot && pJoin->build->newBlk) {
if (NULL == pCtx->cache.outBlk) {
pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false);
if (NULL == pCtx->cache.outBlk) {
MJ_ERR_RET(terrno);
pCtx->cache.outBlk = NULL;
int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk);
if (code) {
MJ_ERR_RET(code);
}
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
}
@ -2833,7 +2837,11 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) {
if (!pGrp->clonedBlk) {
if (0 == pGrp->beginIdx) {
pGrp->blk = createOneDataBlock(pGrp->blk, true);
pGrp->blk = NULL;
int32_t code = createOneDataBlock(pGrp->blk, true, &pGrp->blk);
if (code) {
MJ_ERR_RET(code);
}
} else {
pGrp->blk = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx);
pGrp->endIdx -= pGrp->beginIdx;
@ -3672,9 +3680,10 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));
if (pJoin->pFPreFilter) {
pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false);
if (NULL == pCtx->midBlk) {
MJ_ERR_RET(terrno);
pCtx->midBlk = NULL;
int32_t code = createOneDataBlock(pCtx->finBlk, false, &pCtx->midBlk);
if (code) {
MJ_ERR_RET(code);
}
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity));
}

View File

@ -1331,10 +1331,12 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
}
if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) {
pGrp->blk = createOneDataBlock(pTable->blk, true);
if (NULL == pGrp->blk) {
MJ_ERR_RET(terrno);
pGrp->blk = NULL;
code = createOneDataBlock(pTable->blk, true, &pGrp->blk);
if (code) {
MJ_ERR_RET(code);
}
if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
MJ_ERR_RET(terrno);
}

View File

@ -207,7 +207,6 @@ int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
if (pSortMergeInfo->pIntermediateBlock == NULL) {
pSortMergeInfo->pIntermediateBlock = NULL;
code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock);
if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) {
return code;

View File

@ -27,6 +27,8 @@
#include "querytask.h"
#include "storageapi.h"
#include "tdatablock.h"
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,

View File

@ -114,7 +114,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->pFinalRes = NULL;
code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
TSDB_CHECK_CODE(code, lino, _error);
pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;

View File

@ -1747,8 +1747,9 @@ static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t ts
}
if (pInfo->partitionSup.needCalc) {
SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
QUERY_CHECK_NULL(tmpBlock, code, lino, _end, terrno);
SSDataBlock* tmpBlock = NULL;
code = createOneDataBlock(pResult, true, &tmpBlock);
QUERY_CHECK_CODE(code, lino, _end);
blockDataCleanup(pResult);
for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
@ -3141,7 +3142,9 @@ FETCH_NEXT_BLOCK:
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo));
SSDataBlock* pDelBlock = NULL;
if (pInfo->tqReader) {
pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock);
QUERY_CHECK_CODE(code, lino, _end);
code = filterDelBlockByUid(pDelBlock, pBlock, pInfo);
QUERY_CHECK_CODE(code, lino, _end);
} else {
@ -3782,7 +3785,8 @@ _end:
}
int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS;
@ -3945,15 +3949,21 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
}
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0;
pInfo->pStreamScanOp = pOperator;
pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
code = createSpecialDataBlock(STREAM_DELETE_DATA, &pInfo->pDeleteDataRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes);
QUERY_CHECK_CODE(code, lino, _error);
if (hasPrimaryKeyCol(pInfo)) {
code = addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes);
QUERY_CHECK_CODE(code, lino, _error);
@ -3961,6 +3971,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
pInfo->pkColType = pkType.type;
pInfo->pkColLen = pkType.bytes;
}
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->partitionSup.needCalc = false;
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
@ -3969,7 +3980,9 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
pInfo->pState = pTaskInfo->streamInfo.pState;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
// for stream
if (pTaskInfo->streamInfo.pState) {
@ -4770,10 +4783,16 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
pInput->type = SUB_TABLE_MEM_BLOCK;
code = dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond);
QUERY_CHECK_CODE(code, lino, _end);
pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false);
code = createOneDataBlock(pInfo->pResBlock, false, &pInput->pReaderBlock);
QUERY_CHECK_CODE(code, lino, _end);
code = createOneDataBlock(pInfo->pResBlock, false, &pInput->pPageBlock);
QUERY_CHECK_CODE(code, lino, _end);
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
pInput->pKeyInfo = keyInfo;
@ -5183,7 +5202,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) {
if (!bSkipped) {
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true);
int32_t code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]);
if (code) {
terrno = code;
return NULL;
}
++pInfo->numNextDurationBlocks;
if (pInfo->numNextDurationBlocks > 2) {
qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo),
@ -5685,8 +5709,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
code = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order, &pInfo->pSortInfo);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
QUERY_CHECK_NULL(pInfo->pReaderBlock, code, lino, _error, terrno);
code = createOneDataBlock(pInfo->pResBlock, false, &pInfo->pReaderBlock);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
@ -5696,7 +5721,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
// start one reader variable
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
code = createOneDataBlock(pInfo->pResBlock, false, &pInfo->pSortInputBlock);
QUERY_CHECK_CODE(code, lino, _error);
if (!tsExperimental) {
pInfo->filesetDelimited = false;

View File

@ -858,7 +858,10 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->ignoreExpiredData = pCountNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL;
@ -870,7 +873,9 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
}
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey;

View File

@ -900,7 +900,9 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pEventNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
@ -922,8 +924,8 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->pAllUpdated = NULL;
}
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
QUERY_CHECK_NULL(pInfo->pCheckpointRes, code, lino, _error, terrno);
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->reCkBlock = false;
pInfo->recvGetAll = false;

View File

@ -1394,11 +1394,8 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
}
}
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
if (!pInfo->pDelRes) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
}
code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -1918,10 +1918,15 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pFinalPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
code = createSpecialDataBlock(STREAM_RETRIEVE, &pInfo->pPullDataRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delKey.ts = INT64_MAX;
@ -1936,11 +1941,16 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
pInfo->dataVersion = 0;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvRetrive = false;
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->recvPullover = false;
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->recvRetrive = false;
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
code = createSpecialDataBlock(STREAM_MID_RETRIEVE, &pInfo->pMidRetriveRes);
QUERY_CHECK_CODE(code, lino, _error);
code = createSpecialDataBlock(STREAM_MID_RETRIEVE, &pInfo->pMidPulloverRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->clearState = false;
pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey));
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
@ -2101,13 +2111,18 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
SStorageAPI* pApi, int32_t tsIndex) {
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock);
if (code) {
return code;
}
pSup->gap = gap;
pSup->stateKeySize = keySize;
pSup->stateKeyType = keyType;
pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pSup->pDummyCtx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pSup->stateStore = *pStore;
@ -2129,7 +2144,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
}
pSup->pSessionAPI = pApi;
return TSDB_CODE_SUCCESS;
}
@ -3718,7 +3732,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pChildren = NULL;
pInfo->pPhyNode = pPhyNode;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
@ -3734,7 +3750,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->isHistoryOp = pHandle->fillHistory;
}
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->clearState = false;
pInfo->recvGetAll = false;
pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey;
@ -3771,6 +3789,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
*pOptrInfo = pOperator;
return code;
_error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo);
@ -4847,7 +4866,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
@ -4856,14 +4878,17 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->dataVersion = 0;
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
if (!pInfo->historyWins) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
QUERY_CHECK_CODE(code, lino, _error);
}
if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory;
}
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey;
@ -5157,7 +5182,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->invertible = false;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pPhyNode = NULL; // create new child
@ -5187,7 +5215,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);

View File

@ -1176,11 +1176,17 @@ static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
}
}
SSDataBlock* pBlock = createDataBlock();
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
terrno = code;
return NULL;
}
for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
SColumnInfoData colInfoData =
createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
int32_t code = blockDataAppendColInfo(pBlock, &colInfoData);
code = blockDataAppendColInfo(pBlock, &colInfoData);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
blockDataDestroy(pBlock);

View File

@ -168,22 +168,11 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
}
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) {
if (pBlock == NULL) {
*pBlock = NULL;
return TSDB_CODE_SUCCESS;
}
if (pSortHandle->pDataBlock == NULL) {
*pBlock = NULL;
return TSDB_CODE_SUCCESS;
}
*pBlock = createOneDataBlock(pSortHandle->pDataBlock, false);
if (*pBlock == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
}
return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock);
}
#define AllocatedTupleType 0
@ -271,9 +260,8 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
pSortHandle->forceUsePQSort = false;
if (pBlock != NULL) {
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
if (pSortHandle->pDataBlock == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = createOneDataBlock(pBlock, false, &pSortHandle->pDataBlock);
if (code) {
goto _err;
}
}
@ -500,7 +488,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
SSDataBlock* pBlock = NULL;
int32_t code = createOneDataBlock(pDataBlock, false, &pBlock);
if (code) {
return code;
}
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
}
@ -994,7 +987,14 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
tMergeTreeDestroy(&pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
SSDataBlock* pBlock = NULL;
code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
if (code) {
taosArrayDestroy(pResList);
return code;
}
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
@ -1481,12 +1481,19 @@ static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSou
static int32_t initRowIdSort(SSortHandle* pHandle) {
SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL;
SColumnInfoData* extPkCol = (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL;
SColumnInfoData* extPkCol =
(pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL;
SColumnInfoData pkCol = {0};
SSDataBlock* pSortInput = createDataBlock();
SSDataBlock* pSortInput = NULL;
int32_t code = createDataBlock(&pSortInput);
if (code) {
return code;
}
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
int32_t code = blockDataAppendColInfo(pSortInput, &tsCol);
code = blockDataAppendColInfo(pSortInput, &tsCol);
if (code) {
blockDataDestroy(pSortInput);
return code;
@ -1928,7 +1935,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
blockDataCleanup(pHandle->pDataBlock);
}
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
SSDataBlock* pMemSrcBlk = NULL;
code = createOneDataBlock(pHandle->pDataBlock, false, &pMemSrcBlk);
if (code) {
cleanupMergeSup(&sup);
tMergeTreeDestroy(&pTree);
return code;
}
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
cleanupMergeSup(&sup);
@ -2054,7 +2068,16 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
blockDataDestroy(pBlk);
}
} else {
SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true);
SSDataBlock* tBlk = NULL;
if (bExtractedBlock) {
tBlk = pBlk;
} else {
code = createOneDataBlock(pBlk, true, &tBlk);
if (code) {
return code;
}
}
code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
if (code) {
return code;
@ -2175,7 +2198,11 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
// todo, number of pages are set according to the total available sort buffer
pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
if (code) {
freeSSortSource(source);
return code;
}
}
if (pHandle->beforeFp != NULL) {
@ -2453,11 +2480,10 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
}
if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
int32_t code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
if (code) {
return code;
}
if (pHandle->pDataBlock == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
size_t colNum = blockDataGetNumOfCols(pBlock);

View File

@ -71,7 +71,9 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
}
if (pInfo->pBlock == NULL) {
pInfo->pBlock = createDataBlock();
pInfo->pBlock = NULL;
int32_t code = createDataBlock(&pInfo->pBlock);
ASSERT(code == 0);
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pInfo->pBlock, &colInfo);
@ -129,7 +131,10 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
}
if (pInfo->pBlock == NULL) {
pInfo->pBlock = createDataBlock();
pInfo->pBlock = NULL;
int32_t code = createDataBlock(&pInfo->pBlock);
ASSERT(code == 0);
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1);
blockDataAppendColInfo(pInfo->pBlock, &colInfo);

View File

@ -928,8 +928,9 @@ SExecTaskInfo* createDummyTaskInfo(char* taskId) {
}
SSDataBlock* createDummyBlock(int32_t blkId) {
SSDataBlock* p = createDataBlock();
assert(p);
SSDataBlock* p = NULL;
int32_t code = createDataBlock(&p);
assert(code == 0);
p->info.id.blockId = blkId;
p->info.type = STREAM_INVALID;

View File

@ -60,7 +60,12 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
return NULL;
}
SSDataBlock* pBlock = createDataBlock();
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return NULL;
}
SColumnInfoData colInfo = {0};
colInfo.info.type = pInfo->type;
@ -348,7 +353,10 @@ TEST(testCase, ordered_merge_sort_Test) {
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi);
SSDataBlock* pBlock = createDataBlock();
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
ASSERT(code == 0);
for (int32_t i = 0; i < 1; ++i) {
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pBlock, &colInfo);

View File

@ -1254,8 +1254,13 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t numOfCols = pInput->numOfInputCols;
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
SSDataBlock *pTempBlock = NULL;
int32_t code = createDataBlock(&pTempBlock);
if (code) {
return code;
}
SSDataBlock *pTempBlock = createDataBlock();
pTempBlock->info.rows = pInput->totalRows;
pTempBlock->info.id.uid = pInput->uid;
for (int32_t i = 0; i < numOfCols; ++i) {

View File

@ -88,7 +88,13 @@ int aggregateFuncTest() {
return -1;
}
SSDataBlock *pBlock = createDataBlock();
SSDataBlock *pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pBlock, &colInfo);

View File

@ -112,7 +112,10 @@ int32_t flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType,
}
if (NULL == *block) {
SSDataBlock *res = createDataBlock();
SSDataBlock *res = NULL;
int32_t code = createDataBlock(&res);
ASSERT(code == 0);
for (int32_t i = 0; i < 2; ++i) {
SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, 10, 1 + i);
FLT_ERR_RET(blockDataAppendColInfo(res, &idata));

View File

@ -91,14 +91,16 @@ void scltInitLogFile() {
int32_t scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *slotId, bool newBlock, int32_t rows,
SColumnInfo *colInfo) {
if (newBlock) {
SSDataBlock *res = createDataBlock();
if (NULL == res || NULL == res->pDataBlock) {
SSDataBlock *res = NULL;
int32_t code = createDataBlock(&res);
if (code != 0 || NULL == res->pDataBlock) {
SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SColumnInfoData idata = {0};
idata.info = *colInfo;
int32_t code = colInfoDataEnsureCapacity(&idata, rows, true);
code = colInfoDataEnsureCapacity(&idata, rows, true);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(&idata);
SCL_ERR_RET(code);
@ -185,7 +187,11 @@ int32_t scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType,
}
if (NULL == *block) {
SSDataBlock *res = createDataBlock();
SSDataBlock *res = NULL;
int32_t code = createDataBlock(&res);
ASSERT(code == 0);
for (int32_t i = 0; i < 2; ++i) {
SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_INT, 10, i + 1);
code = colInfoDataEnsureCapacity(&idata, rowNum, true);

View File

@ -252,7 +252,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci
}
int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type, int8_t lvl) {
size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl);
size_t len = 0;//FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl);
if (len > inputSize) {
output[0] = 0;
memcpy(output + 1, input, inputSize);
@ -264,7 +264,7 @@ int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char
int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) {
if (input[0] == 1) {
return FL2_decompress(output, outputSize, input + 1, compressedSize - 1);
return 0;//FL2_decompress(output, outputSize, input + 1, compressedSize - 1);
} else if (input[0] == 0) {
memcpy(output, input + 1, compressedSize - 1);
return compressedSize - 1;