Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-30987-21

This commit is contained in:
Hongze Cheng 2024-07-28 16:26:38 +08:00
commit 0435bdffe2
54 changed files with 743 additions and 358 deletions

View File

@ -254,14 +254,23 @@ void blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc); int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc);
SSDataBlock* createDataBlock(); #define QRY_OPTR_CHECK(_o) \
void blockDataDestroy(SSDataBlock* pBlock); do { \
void blockDataFreeRes(SSDataBlock* pBlock); if ((_o) == NULL) { \
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); return TSDB_CODE_INVALID_PARA; \
SSDataBlock* createSpecialDataBlock(EStreamType type); } else { \
*(_o) = NULL; \
} \
} while(0)
SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx); int32_t createDataBlock(SSDataBlock** pResBlock);
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); void blockDataDestroy(SSDataBlock* pBlock);
void blockDataFreeRes(SSDataBlock* pBlock);
int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock);
int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock);
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock);
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId); SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);

View File

@ -2899,7 +2899,6 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
} }
pRebInfo->newConsumers = taosArrayInit(0, sizeof(int64_t)); pRebInfo->newConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebInfo->newConsumers == NULL) { if (pRebInfo->newConsumers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
return pRebInfo; return pRebInfo;
@ -3455,7 +3454,9 @@ static FORCE_INLINE void* taosDecodeSMqTopicInfoMsg(void* buf, SMqTopicInfo* pTo
buf = taosDecodeStringTo(buf, pTopicInfo->name); buf = taosDecodeStringTo(buf, pTopicInfo->name);
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqReportVgInfo)); if ((pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqReportVgInfo))) == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqReportVgInfo vgInfo; SMqReportVgInfo vgInfo;
buf = taosDecodeSMqVgInfo(buf, &vgInfo); buf = taosDecodeSMqVgInfo(buf, &vgInfo);
@ -3493,7 +3494,9 @@ static FORCE_INLINE void* taosDecodeSMqReportMsg(void* buf, SMqReportReq* pMsg)
buf = taosDecodeFixedI64(buf, &pMsg->consumerId); buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqTopicInfo)); if ((pMsg->pTopics = taosArrayInit(sz, sizeof(SMqTopicInfo))) == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqTopicInfo topicInfo; SMqTopicInfo topicInfo;
buf = taosDecodeSMqTopicInfoMsg(buf, &topicInfo); buf = taosDecodeSMqTopicInfoMsg(buf, &topicInfo);

View File

@ -1017,15 +1017,13 @@ void returnToUser(SRequestObj* pRequest) {
} }
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock**pBlock) { static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock**pBlock) {
int64_t lastTs = 0; int64_t lastTs = 0;
int32_t code = TSDB_CODE_SUCCESS;
TAOS_FIELD* pResFields = taos_fetch_fields(pRes); TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
*pBlock = createDataBlock(); int32_t code = createDataBlock(pBlock);
if (NULL == *pBlock) { if (code) {
return terrno; return code;
} }
for(int32_t i = 0; i < numOfFields; ++i) { for(int32_t i = 0; i < numOfFields; ++i) {

View File

@ -21,8 +21,6 @@
#define MALLOC_ALIGN_BYTES 32 #define MALLOC_ALIGN_BYTES 32
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
if (pColumnInfoData->reassigned) { if (pColumnInfoData->reassigned) {
@ -134,7 +132,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
uint32_t len = pColumnInfoData->varmeta.length; uint32_t len = pColumnInfoData->varmeta.length;
pColumnInfoData->varmeta.offset[rowIndex] = len; pColumnInfoData->varmeta.offset[rowIndex] = len;
memmove(pColumnInfoData->pData + len, pData, dataLen); (void) memmove(pColumnInfoData->pData + len, pData, dataLen);
pColumnInfoData->varmeta.length += dataLen; pColumnInfoData->varmeta.length += dataLen;
} else { } else {
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes); memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
@ -290,7 +288,7 @@ int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t curren
pColumnInfoData->reassigned = true; pColumnInfoData->reassigned = true;
} }
return TSDB_CODE_SUCCESS; return code;
} }
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows, int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows,
@ -371,7 +369,7 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2) { const SColumnInfoData* pSource, int32_t numOfRow2) {
if (pColumnInfoData->info.type != pSource->info.type) { if (pColumnInfoData->info.type != pSource->info.type) {
return TSDB_CODE_FAILED; return TSDB_CODE_INVALID_PARA;
} }
if (numOfRow2 == 0) { if (numOfRow2 == 0) {
@ -679,20 +677,24 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
} }
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
int32_t code = 0;
int32_t capacity = pDest->info.capacity; int32_t capacity = pDest->info.capacity;
size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
capacity = pDest->info.capacity; capacity = pDest->info.capacity;
colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows); int32_t ret = colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
if (ret < 0) { // error occurs
code = ret;
return code;
}
} }
pDest->info.capacity = capacity; pDest->info.capacity = capacity;
pDest->info.rows += pSrc->info.rows; pDest->info.rows += pSrc->info.rows;
return TSDB_CODE_SUCCESS; return code;
} }
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) { int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) {
@ -823,12 +825,15 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
} }
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) { SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) {
int32_t code = 0;
if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) { if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
return NULL; return NULL;
} }
SSDataBlock* pDst = createOneDataBlock(pBlock, false); SSDataBlock* pDst = NULL;
if (pDst == NULL) { code = createOneDataBlock(pBlock, false, &pDst);
if (code) {
terrno = code;
return NULL; return NULL;
} }
@ -859,7 +864,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
colDataSetNULL(pDstCol, j - startIndex); colDataSetNULL(pDstCol, j - startIndex);
} else { } else {
char* p = colDataGetData(pColData, j); char* p = colDataGetData(pColData, j);
colDataSetVal(pDstCol, j - startIndex, p, false); code = colDataSetVal(pDstCol, j - startIndex, p, false);
} }
} }
} }
@ -1600,53 +1605,112 @@ int32_t copyDataBlock(SSDataBlock* pDst, const SSDataBlock* pSrc) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* createSpecialDataBlock(EStreamType type) { int32_t createSpecialDataBlock(EStreamType type, SSDataBlock** pBlock) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); QRY_OPTR_CHECK(pBlock);
pBlock->info.hasVarCol = false;
pBlock->info.id.groupId = 0; int32_t code = 0;
pBlock->info.rows = 0; SSDataBlock* p = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.type = type; if (p == NULL) {
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + return terrno;
sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; }
pBlock->info.watermark = INT64_MIN;
p->info.hasVarCol = false;
p->info.id.groupId = 0;
p->info.rows = 0;
p->info.type = type;
p->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
p->info.watermark = INT64_MIN;
p->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
if (p->pDataBlock == NULL) {
taosMemoryFree(p);
return terrno;
}
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0}; SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY); infoData.info.bytes = sizeof(TSKEY);
// window start ts // window start ts
taosArrayPush(pBlock->pDataBlock, &infoData); void* px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
// window end ts // window end ts
taosArrayPush(pBlock->pDataBlock, &infoData); px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
infoData.info.type = TSDB_DATA_TYPE_UBIGINT; infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
infoData.info.bytes = sizeof(uint64_t); infoData.info.bytes = sizeof(uint64_t);
// uid // uid
taosArrayPush(pBlock->pDataBlock, &infoData); px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
// group id // group id
taosArrayPush(pBlock->pDataBlock, &infoData); px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
infoData.info.bytes = sizeof(TSKEY); infoData.info.bytes = sizeof(TSKEY);
// calculate start ts // calculate start ts
taosArrayPush(pBlock->pDataBlock, &infoData); px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
// calculate end ts // calculate end ts
taosArrayPush(pBlock->pDataBlock, &infoData); px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
code = errno;
goto _err;
}
// table name // table name
infoData.info.type = TSDB_DATA_TYPE_VARCHAR; infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN; infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData); px = taosArrayPush(p->pDataBlock, &infoData);
if (px == NULL) {
return pBlock; code = errno;
} goto _err;
}
SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
if (pDataBlock == NULL) { *pBlock = p;
return NULL; return code;
_err:
taosArrayDestroy(p->pDataBlock);
taosMemoryFree(p);
return code;
}
int32_t blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
if (pDataBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
} }
SSDataBlock* pBlock = createDataBlock();
pBlock->info = pDataBlock->info; pBlock->info = pDataBlock->info;
pBlock->info.rows = 0; pBlock->info.rows = 0;
pBlock->info.capacity = 0; pBlock->info.capacity = 0;
@ -1658,11 +1722,10 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
blockDataAppendColInfo(pBlock, &colInfo); blockDataAppendColInfo(pBlock, &colInfo);
} }
int32_t code = blockDataEnsureCapacity(pBlock, 1); code = blockDataEnsureCapacity(pBlock, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
return NULL; return code;
} }
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
@ -1670,13 +1733,17 @@ SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL); bool isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
void* pData = NULL; void* pData = NULL;
if (!isNull) pData = colDataGetData(pSrc, rowIdx); if (!isNull) {
colDataSetVal(pDst, 0, pData, isNull); pData = colDataGetData(pSrc, rowIdx);
}
code = colDataSetVal(pDst, 0, pData, isNull);
} }
pBlock->info.rows = 1; pBlock->info.rows = 1;
return pBlock; *pResBlock = pBlock;
return code;
} }
void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) { void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
@ -1699,12 +1766,18 @@ void copyPkVal(SDataBlockInfo* pDst, const SDataBlockInfo* pSrc) {
memcpy(p->pData, pDst->pks[1].pData, p->nData); memcpy(p->pData, pDst->pks[1].pData, p->nData);
} }
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { int32_t createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData, SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
return NULL; return TSDB_CODE_INVALID_PARA;
}
SSDataBlock* pDstBlock = NULL;
int32_t code = createDataBlock(&pDstBlock);
if (code) {
return code;
} }
SSDataBlock* pDstBlock = createDataBlock();
pDstBlock->info = pDataBlock->info; pDstBlock->info = pDataBlock->info;
pDstBlock->info.rows = 0; pDstBlock->info.rows = 0;
@ -1723,11 +1796,10 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
copyPkVal(&pDstBlock->info, &pDataBlock->info); copyPkVal(&pDstBlock->info, &pDataBlock->info);
if (copyData) { if (copyData) {
int32_t code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows); code = blockDataEnsureCapacity(pDstBlock, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code;
blockDataDestroy(pDstBlock); blockDataDestroy(pDstBlock);
return NULL; return code;
} }
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
@ -1740,24 +1812,26 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pDstBlock->info.capacity = pDataBlock->info.rows; pDstBlock->info.capacity = pDataBlock->info.rows;
} }
return pDstBlock; *pResBlock = pDstBlock;
return code;
} }
SSDataBlock* createDataBlock() { int32_t createDataBlock(SSDataBlock** pResBlock) {
QRY_OPTR_CHECK(pResBlock);
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) { if (pBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno;
return NULL;
} }
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
if (pBlock->pDataBlock == NULL) { if (pBlock->pDataBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t code = terrno;
taosMemoryFree(pBlock); taosMemoryFree(pBlock);
return NULL; return code;
} }
return pBlock; *pResBlock = pBlock;
return 0;
} }
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) { int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) {
@ -1771,7 +1845,6 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat
void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData); void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData);
if (p == NULL) { if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
@ -1863,7 +1936,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
int32_t newLen = BitmapLen(total - n); int32_t newLen = BitmapLen(total - n);
if (n % 8 == 0) { if (n % 8 == 0) {
memmove(nullBitmap, nullBitmap + n / 8, newLen); (void) memmove(nullBitmap, nullBitmap + n / 8, newLen);
} else { } else {
int32_t tail = n % 8; int32_t tail = n % 8;
int32_t i = 0; int32_t i = 0;
@ -1924,23 +1997,23 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s
} }
if (dataOffset > 0) { if (dataOffset > 0) {
memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen); (void) memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
} }
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t)); (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
return dataLen; return dataLen;
} }
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) { static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
// pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total); // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t)); (void) memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
// clear the offset value of the unused entries. // clear the offset value of the unused entries.
memset(&pColInfoData->varmeta.offset[total - n], 0, n); memset(&pColInfoData->varmeta.offset[total - n], 0, n);
} else { } else {
int32_t bytes = pColInfoData->info.bytes; int32_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes); (void) memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
doShiftBitmap(pColInfoData->nullbitmap, n, total); doShiftBitmap(pColInfoData->nullbitmap, n, total);
} }
} }
@ -2258,18 +2331,19 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
int64_t uid, int32_t vgId, tb_uid_t suid) { int64_t uid, int32_t vgId, tb_uid_t suid) {
SSubmitReq2* pReq = *ppReq; SSubmitReq2* pReq = *ppReq;
SArray* pVals = NULL; SArray* pVals = NULL;
int32_t numOfBlks = 0;
int32_t sz = 1; int32_t sz = 1;
int32_t code = 0;
terrno = TSDB_CODE_SUCCESS; *ppReq = NULL;
terrno = 0;
if (NULL == pReq) { if (NULL == pReq) {
if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) { if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
goto _end; goto _end;
} }
if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
code = terrno;
goto _end; goto _end;
} }
} }
@ -2317,13 +2391,23 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
isStartKey = true; isStartKey = true;
ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId); ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId);
SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var})); SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var}));
taosArrayPush(pVals, &cv); void* px = taosArrayPush(pVals, &cv);
if (px == NULL) {
return terrno;
}
} else if (colDataIsNull_s(pColInfoData, j)) { } else if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv); void* px = taosArrayPush(pVals, &cv);
if (px == NULL) {
return terrno;
}
} else { } else {
SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(int64_t*)var})); SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(int64_t*)var}));
taosArrayPush(pVals, &cv); void* px = taosArrayPush(pVals, &cv);
if (px == NULL) {
return terrno;
}
} }
break; break;
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
@ -2394,29 +2478,38 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
} }
} }
SRow* pRow = NULL; SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) { if ((code = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end; goto _end;
} }
ASSERT(pRow); ASSERT(pRow);
taosArrayPush(tbData.aRowP, &pRow); void* px = taosArrayPush(tbData.aRowP, &pRow);
if (px == NULL) {
code = terrno;
goto _end;
}
} }
taosArrayPush(pReq->aSubmitTbData, &tbData); void* px = taosArrayPush(pReq->aSubmitTbData, &tbData);
if (px == NULL) {
code = terrno;
goto _end;
}
} }
_end: _end:
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
if (terrno != 0) { if (code != 0) {
*ppReq = NULL;
if (pReq) { if (pReq) {
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
} }
} else {
return TSDB_CODE_FAILED; *ppReq = pReq;
} }
*ppReq = pReq;
return TSDB_CODE_SUCCESS; return code;
} }
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId) { void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId) {
@ -2579,14 +2672,14 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
} }
colSizes[col] += colSize; colSizes[col] += colSize;
dataLen += colSize; dataLen += colSize;
memmove(data, pColData, colSize); (void) memmove(data, pColData, colSize);
data += colSize; data += colSize;
} }
} else { } else {
colSizes[col] = colDataGetLength(pColRes, numOfRows); colSizes[col] = colDataGetLength(pColRes, numOfRows);
dataLen += colSizes[col]; dataLen += colSizes[col];
if (pColRes->pData != NULL) { if (pColRes->pData != NULL) {
memmove(data, pColRes->pData, colSizes[col]); (void) memmove(data, pColRes->pData, colSizes[col]);
} }
data += colSizes[col]; data += colSizes[col];
} }

View File

@ -459,7 +459,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
SColVal colVal; SColVal colVal;
if ((colValArray = taosArrayInit(numOfInfos, sizeof(SColVal))) == NULL) { if ((colValArray = taosArrayInit(numOfInfos, sizeof(SColVal))) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
for (int32_t iRow = 0; iRow < numOfRows; iRow++) { for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
@ -670,7 +670,7 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart,
// merge // merge
aColVal = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); aColVal = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
if (aColVal == NULL) { if (aColVal == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
goto _exit; goto _exit;
} }
@ -1748,7 +1748,7 @@ int32_t tTagToValArray(const STag *pTag, SArray **ppArray) {
(*ppArray) = taosArrayInit(pTag->nTag + 1, sizeof(STagVal)); (*ppArray) = taosArrayInit(pTag->nTag + 1, sizeof(STagVal));
if (*ppArray == NULL) { if (*ppArray == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
goto _err; goto _err;
} }

View File

@ -236,7 +236,9 @@ TEST(testCase, toInteger_test) {
} }
TEST(testCase, Datablock_test) { TEST(testCase, Datablock_test) {
SSDataBlock* b = createDataBlock(); SSDataBlock* b = NULL;
int32_t code = createDataBlock(&b);
ASSERT(code == 0);
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1);
taosArrayPush(b->pDataBlock, &infoData); taosArrayPush(b->pDataBlock, &infoData);
@ -361,7 +363,9 @@ TEST(testCase, non_var_dataBlock_split_test) {
TEST(testCase, var_dataBlock_split_test) { TEST(testCase, var_dataBlock_split_test) {
int32_t numOfRows = 1000000; int32_t numOfRows = 1000000;
SSDataBlock* b = createDataBlock(); SSDataBlock* b = NULL;
int32_t code = createDataBlock(&b);
ASSERT(code == 0);
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 1);
blockDataAppendColInfo(b, &infoData); blockDataAppendColInfo(b, &infoData);

View File

@ -289,7 +289,12 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
int32_t numOfCols = pShow->pMeta->numOfColumns; int32_t numOfCols = pShow->pMeta->numOfColumns;
SSDataBlock *pBlock = createDataBlock(); SSDataBlock *pBlock = NULL;
code = createDataBlock(&pBlock);
if (code) {
TAOS_RETURN(code);
}
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {0}; SColumnInfoData idata = {0};

View File

@ -806,14 +806,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
} }
// schedule stream task for stream obj // schedule stream task for stream obj
if ((code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList)) < 0) { code = mndScheduleStream(pMnode, &streamObj, createReq.lastTs, createReq.pVgroupVerList);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to schedule since %s", createReq.name, terrstr()); mError("stream:%s, failed to schedule since %s", createReq.name, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
} }
// add stream to trans // add stream to trans
if ((code = mndPersistStream(pTrans, &streamObj)) < 0) { code = mndPersistStream(pTrans, &streamObj);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to persist since %s", createReq.name, terrstr()); mError("stream:%s, failed to persist since %s", createReq.name, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
@ -837,7 +839,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
// execute creation // execute creation
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
@ -848,12 +851,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SName dbname = {0}; SName dbname = {0};
code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); code = tNameFromString(&dbname, createReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (code) { if (code) {
mError("invalid source dbname:%s in create stream, code:%s", createReq.sourceDB, tstrerror(code));
goto _OVER; goto _OVER;
} }
SName name = {0}; SName name = {0};
code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); code = tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_TABLE);
if (code) { if (code) {
mError("invalid stream name:%s in create strem, code:%s", createReq.name, tstrerror(code));
goto _OVER; goto _OVER;
} }
@ -868,7 +873,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
_OVER: _OVER:
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createReq.name, terrstr()); mError("stream:%s, failed to create since %s", createReq.name, terrstr(code));
} else {
mDebug("stream:%s create stream completed", createReq.name);
} }
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);

View File

@ -1001,7 +1001,9 @@ static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) {
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0; int32_t code = 0;
if (pTrans == NULL) return -1; if (pTrans == NULL) {
return TSDB_CODE_INVALID_PARA;
}
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans)); TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
@ -1583,6 +1585,7 @@ _OVER:
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = 0; int32_t code = 0;
terrno = 0;
if (pTrans->exec == TRN_EXEC_SERIAL) { if (pTrans->exec == TRN_EXEC_SERIAL) {
code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf); code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf);

View File

@ -1069,9 +1069,12 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
if (count > TRY_ERROR_LIMIT) break; if (count > TRY_ERROR_LIMIT) break;
int32_t cmp = (*param->filterFunc)((void *)&p->btime, (void *)&pBtimeKey->btime, param->type); int32_t cmp = (*param->filterFunc)((void *)&p->btime, (void *)&pBtimeKey->btime, param->type);
if (cmp == 0) if (cmp == 0) {
taosArrayPush(pUids, &p->uid); if (taosArrayPush(pUids, &p->uid) == NULL) {
else { ret = terrno;
break;
}
} else {
if (param->equal == true) { if (param->equal == true) {
if (count > TRY_ERROR_LIMIT) break; if (count > TRY_ERROR_LIMIT) break;
count++; count++;
@ -1132,7 +1135,10 @@ int32_t metaFilterTableName(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type); cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type);
if (cmp == 0) { if (cmp == 0) {
tb_uid_t tuid = *(tb_uid_t *)pEntryVal; tb_uid_t tuid = *(tb_uid_t *)pEntryVal;
taosArrayPush(pUids, &tuid); if (taosArrayPush(pUids, &tuid) == NULL) {
ret = terrno;
goto END;
}
} else { } else {
if (param->equal == true) { if (param->equal == true) {
if (count > TRY_ERROR_LIMIT) break; if (count > TRY_ERROR_LIMIT) break;
@ -1328,7 +1334,10 @@ int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
} else { } else {
tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes); tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
} }
taosArrayPush(pUids, &tuid); if (taosArrayPush(pUids, &tuid) == NULL) {
ret = terrno;
break;
}
found = true; found = true;
} else { } else {
if (param->equal == true) { if (param->equal == true) {
@ -1432,7 +1441,11 @@ int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal}; STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
info.pTagVal = taosMemoryMalloc(pCur->vLen); info.pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(info.pTagVal, pCur->pVal, pCur->vLen); memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
taosArrayPush(pUidTagInfo, &info); if (taosArrayPush(pUidTagInfo, &info) == NULL) {
metaCloseCtbCursor(pCur);
taosHashCleanup(pSepecifiedUidMap);
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
} else { // only the specified tables need to be added } else { // only the specified tables need to be added
while (1) { while (1) {

View File

@ -341,7 +341,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
break; break;
} }
taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid)); (void)taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid));
} }
tdbTbcClose(pCtbIdxc); tdbTbcClose(pCtbIdxc);
@ -405,7 +405,7 @@ static void metaGetSubtables(SMeta *pMeta, int64_t suid, SArray *uids) {
break; break;
} }
taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid)); (void)taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid));
} }
tdbFree(pKey); tdbFree(pKey);
@ -1033,7 +1033,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
} }
if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) { if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) {
taosArrayPush(tbUids, &uid); (void)taosArrayPush(tbUids, &uid);
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) { if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL); tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL);
@ -1135,7 +1135,7 @@ static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
tbFName[TSDB_TABLE_FNAME_LEN] = '\0'; tbFName[TSDB_TABLE_FNAME_LEN] = '\0';
int32_t ret = vnodeValidateTableHash(pMeta->pVnode, tbFName); int32_t ret = vnodeValidateTableHash(pMeta->pVnode, tbFName);
if (ret < 0 && terrno == TSDB_CODE_VND_HASH_MISMATCH) { if (ret < 0 && terrno == TSDB_CODE_VND_HASH_MISMATCH) {
taosArrayPush(uidList, &me.uid); (void)taosArrayPush(uidList, &me.uid);
} }
} }
tDecoderClear(&dc); tDecoderClear(&dc);
@ -1783,11 +1783,11 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
} else { } else {
memcpy(&val.i64, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal); memcpy(&val.i64, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
} }
taosArrayPush(pTagArray, &val); (void)taosArrayPush(pTagArray, &val);
} else { } else {
STagVal val = {.cid = pCol->colId}; STagVal val = {.cid = pCol->colId};
if (tTagGet(pOldTag, &val)) { if (tTagGet(pOldTag, &val)) {
taosArrayPush(pTagArray, &val); (void)taosArrayPush(pTagArray, &val);
} }
} }
} }
@ -2171,7 +2171,7 @@ static int metaDropTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterT
} }
SMetaPair pair = {.key = pKey, nKey = nKey}; SMetaPair pair = {.key = pKey, nKey = nKey};
taosArrayPush(tagIdxList, &pair); (void)taosArrayPush(tagIdxList, &pair);
} }
tdbTbcClose(pTagIdxc); tdbTbcClose(pTagIdxc);

View File

@ -289,7 +289,13 @@ STqReader* tqReaderOpen(SVnode* pVnode) {
pReader->cachedSchemaSuid = 0; pReader->cachedSchemaSuid = 0;
pReader->pSchemaWrapper = NULL; pReader->pSchemaWrapper = NULL;
pReader->tbIdHash = NULL; pReader->tbIdHash = NULL;
pReader->pResBlock = createDataBlock(); pReader->pResBlock = NULL;
int32_t code = createDataBlock(&pReader->pResBlock);
if (code) {
terrno = code;
}
return pReader; return pReader;
} }

View File

@ -117,8 +117,13 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
} }
STqOffsetVal offset = {0}; STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset); qStreamExtractOffset(task, &offset);
pHandle->block = createOneDataBlock(pDataBlock, true); pHandle->block = NULL;
TSDB_CHECK_NULL(pDataBlock, code, line, END, terrno);
code = createOneDataBlock(pDataBlock, true, &pHandle->block);
if (code) {
return code;
}
pHandle->blockTime = offset.ts; pHandle->blockTime = offset.ts;
tOffsetDestroy(&offset); tOffsetDestroy(&offset);
code = getDataBlock(task, pHandle, vgId, &pDataBlock); code = getDataBlock(task, pHandle, vgId, &pDataBlock);

View File

@ -576,8 +576,10 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
goto END; goto END;
} }
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); SSDataBlock* pDelBlock = NULL;
TSDB_CHECK_NULL(pDelBlock, code, line, END, terrno) code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock);
TSDB_CHECK_CODE(code, line, END);
code = blockDataEnsureCapacity(pDelBlock, numOfTables); code = blockDataEnsureCapacity(pDelBlock, numOfTables);
TSDB_CHECK_CODE(code, line, END); TSDB_CHECK_CODE(code, line, END);

View File

@ -770,7 +770,11 @@ int32_t tsdbDisableAndCancelAllBgTask(STsdb *pTsdb) {
STFileSet *fset; STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) { TARRAY2_FOREACH(fs->fSetArr, fset) {
if (fset->channelOpened) { if (fset->channelOpened) {
taosArrayPush(channelArray, &fset->channel); if (taosArrayPush(channelArray, &fset->channel) == NULL) {
taosArrayDestroy(channelArray);
taosThreadMutexUnlock(&pTsdb->mutex);
return terrno;
}
fset->channel = (SVAChannelID){0}; fset->channel = (SVAChannelID){0};
fset->mergeScheduled = false; fset->mergeScheduled = false;
tsdbFSSetBlockCommit(fset, false); tsdbFSSetBlockCommit(fset, false);

View File

@ -402,28 +402,30 @@ static void initReaderStatus(SReaderStatus* pStatus) {
} }
static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) { static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) {
*pResBlock = createDataBlock(); QRY_OPTR_CHECK(pResBlock);
if (*pResBlock == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code != 0) {
return code;
} }
for (int32_t i = 0; i < pCond->numOfCols; ++i) { for (int32_t i = 0; i < pCond->numOfCols; ++i) {
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};
colInfo.info = pCond->colList[i]; colInfo.info = pCond->colList[i];
int32_t code = blockDataAppendColInfo(*pResBlock, &colInfo); code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(*pResBlock); taosMemoryFree(pBlock);
*pResBlock = NULL;
return code; return code;
} }
} }
int32_t code = blockDataEnsureCapacity(*pResBlock, capacity); code = blockDataEnsureCapacity(pBlock, capacity);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(*pResBlock); taosMemoryFree(pBlock);
*pResBlock = NULL;
} }
*pResBlock = pBlock;
return code; return code;
} }

View File

@ -88,6 +88,9 @@ int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket; int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
if (pBuf->pData == NULL) { if (pBuf->pData == NULL) {
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES); pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
if (pBuf->pData == NULL) {
return terrno;
}
} }
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
@ -163,22 +166,22 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in
if (IS_NUMERIC_TYPE(type)) { if (IS_NUMERIC_TYPE(type)) {
if (asc) { if (asc) {
switch(type) { switch (type) {
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
pKey->pks[0].val = INT64_MIN; pKey->pks[0].val = INT64_MIN;
break; break;
} }
case TSDB_DATA_TYPE_INT:{ case TSDB_DATA_TYPE_INT: {
int32_t min = INT32_MIN; int32_t min = INT32_MIN;
(void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
break; break;
} }
case TSDB_DATA_TYPE_SMALLINT:{ case TSDB_DATA_TYPE_SMALLINT: {
int16_t min = INT16_MIN; int16_t min = INT16_MIN;
(void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
break; break;
} }
case TSDB_DATA_TYPE_TINYINT:{ case TSDB_DATA_TYPE_TINYINT: {
int8_t min = INT8_MIN; int8_t min = INT8_MIN;
(void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes);
break; break;
@ -194,15 +197,31 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in
ASSERT(0); ASSERT(0);
} }
} else { } else {
switch(type) { switch (type) {
case TSDB_DATA_TYPE_BIGINT:pKey->pks[0].val = INT64_MAX;break; case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_INT:pKey->pks[0].val = INT32_MAX;break; pKey->pks[0].val = INT64_MAX;
case TSDB_DATA_TYPE_SMALLINT:pKey->pks[0].val = INT16_MAX;break; break;
case TSDB_DATA_TYPE_TINYINT:pKey->pks[0].val = INT8_MAX;break; case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UBIGINT:pKey->pks[0].val = UINT64_MAX;break; pKey->pks[0].val = INT32_MAX;
case TSDB_DATA_TYPE_UINT:pKey->pks[0].val = UINT32_MAX;break; break;
case TSDB_DATA_TYPE_USMALLINT:pKey->pks[0].val = UINT16_MAX;break; case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_UTINYINT:pKey->pks[0].val = UINT8_MAX;break; pKey->pks[0].val = INT16_MAX;
break;
case TSDB_DATA_TYPE_TINYINT:
pKey->pks[0].val = INT8_MAX;
break;
case TSDB_DATA_TYPE_UBIGINT:
pKey->pks[0].val = UINT64_MAX;
break;
case TSDB_DATA_TYPE_UINT:
pKey->pks[0].val = UINT32_MAX;
break;
case TSDB_DATA_TYPE_USMALLINT:
pKey->pks[0].val = UINT16_MAX;
break;
case TSDB_DATA_TYPE_UTINYINT:
pKey->pks[0].val = UINT8_MAX;
break;
default: default:
ASSERT(0); ASSERT(0);
} }
@ -232,7 +251,7 @@ void clearRowKey(SRowKey* pKey) {
taosMemoryFreeClear(pKey->pks[0].pData); taosMemoryFreeClear(pKey->pks[0].pData);
} }
static int32_t initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { static int32_t initLastProcKey(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
int32_t code = 0; int32_t code = 0;
int32_t numOfPks = pReader->suppInfo.numOfPks; int32_t numOfPks = pReader->suppInfo.numOfPks;
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
@ -448,8 +467,8 @@ void cleanupInfoForNextFileset(SSHashObj* pTableMap) {
// brin records iterator // brin records iterator
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) { void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) {
(void) memset(&pIter->block, 0, sizeof(SBrinBlock)); (void)memset(&pIter->block, 0, sizeof(SBrinBlock));
(void) memset(&pIter->record, 0, sizeof(SBrinRecord)); (void)memset(&pIter->record, 0, sizeof(SBrinRecord));
pIter->blockIndex = -1; pIter->blockIndex = -1;
pIter->recordIndex = -1; pIter->recordIndex = -1;
@ -471,7 +490,7 @@ int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
(void) tBrinBlockClear(&pIter->block); (void)tBrinBlockClear(&pIter->block);
int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block); int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code)); tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code));
@ -488,7 +507,7 @@ int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) {
return code; return code;
} }
void clearBrinBlockIter(SBrinRecordIter* pIter) { (void) tBrinBlockDestroy(&pIter->block); } void clearBrinBlockIter(SBrinRecordIter* pIter) { (void)tBrinBlockDestroy(&pIter->block); }
// initialize the file block access order // initialize the file block access order
// sort the file blocks according to the offset of each data block in the files // sort the file blocks according to the offset of each data block in the files
@ -658,7 +677,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0); STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0);
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
void* px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); void* px = taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
if (px == NULL) { if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -774,6 +793,9 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
if (pScanInfo->pFileDelData == NULL) { if (pScanInfo->pFileDelData == NULL) {
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData)); pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
if (pScanInfo->pFileDelData == NULL) {
return terrno;
}
} }
for (int32_t k = 0; k < pBlock->numOfRecords; ++k) { for (int32_t k = 0; k < pBlock->numOfRecords; ++k) {
@ -810,6 +832,9 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
if (pScanInfo->pFileDelData == NULL) { if (pScanInfo->pFileDelData == NULL) {
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData)); pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
if (pScanInfo->pFileDelData == NULL) {
return terrno;
}
} }
} }
@ -821,7 +846,7 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
if (record.version <= pReader->info.verRange.maxVer) { if (record.version <= pReader->info.verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
void* px = taosArrayPush(pScanInfo->pFileDelData, &delData); void* px = taosArrayPush(pScanInfo->pFileDelData, &delData);
if (px == NULL) { if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -878,7 +903,7 @@ static int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STs
ETombBlkCheckEnum ret = 0; ETombBlkCheckEnum ret = 0;
code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret); code = doCheckTombBlock(&block, pReader, numOfTables, &j, &ret);
(void) tTombBlockDestroy(&block); (void)tTombBlockDestroy(&block);
if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) { if (code != TSDB_CODE_SUCCESS || ret == BLK_CHECK_QUIT) {
return code; return code;
} }
@ -977,7 +1002,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
SStatisBlk* p = &pStatisBlkArray->data[i]; SStatisBlk* p = &pStatisBlkArray->data[i];
STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); STbStatisBlock* pStatisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
(void) tStatisBlockInit(pStatisBlock); (void)tStatisBlockInit(pStatisBlock);
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock); int32_t code = tsdbSttFileReadStatisBlock(pSttFileReader, p, pStatisBlock);
@ -995,7 +1020,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
} }
if (index >= pStatisBlock->numOfRecords) { if (index >= pStatisBlock->numOfRecords) {
(void) tStatisBlockDestroy(pStatisBlock); (void)tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock); taosMemoryFreeClear(pStatisBlock);
return num; return num;
} }
@ -1005,7 +1030,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) { while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex < numOfTables) {
p = &pStatisBlkArray->data[i]; p = &pStatisBlkArray->data[i];
if (p->minTbid.suid > suid) { if (p->minTbid.suid > suid) {
(void) tStatisBlockDestroy(pStatisBlock); (void)tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock); taosMemoryFreeClear(pStatisBlock);
return num; return num;
} }
@ -1025,7 +1050,7 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo
} }
} }
(void) tStatisBlockDestroy(pStatisBlock); (void)tStatisBlockDestroy(pStatisBlock);
taosMemoryFreeClear(pStatisBlock); taosMemoryFreeClear(pStatisBlock);
return num; return num;
} }
@ -1037,7 +1062,7 @@ static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, STbStatisBlo
(*i) += 1; (*i) += 1;
(*j) = 0; (*j) = 0;
if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) { if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) {
(void) tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock); (void)tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pStatisBlock);
} }
} }
} }
@ -1049,7 +1074,7 @@ int32_t doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
int32_t inc = numOfFileObj - size; int32_t inc = numOfFileObj - size;
for (int32_t k = 0; k < inc; ++k) { for (int32_t k = 0; k < inc; ++k) {
SLDataIter* pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); SLDataIter* pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
void* px = taosArrayPush(pLDIterList, &pIter); void* px = taosArrayPush(pLDIterList, &pIter);
if (px == NULL) { if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -1073,6 +1098,9 @@ int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet)
// add the list/iter placeholder // add the list/iter placeholder
while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) { while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) {
SArray* pList = taosArrayInit(4, POINTER_BYTES); SArray* pList = taosArrayInit(4, POINTER_BYTES);
if (pList == NULL) {
return terrno;
}
void* px = taosArrayPush(pSttFileBlockIterArray, &pList); void* px = taosArrayPush(pSttFileBlockIterArray, &pList);
if (px == NULL) { if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1210,8 +1238,7 @@ static int32_t sortUidComparFn(const void* p1, const void* p2) {
return ret; return ret;
} }
bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlockScanInfo* pScanInfo, int32_t order) {
int32_t order) {
// check if it overlap with del skyline // check if it overlap with del skyline
taosArraySort(pKeyRangeList, sortUidComparFn); taosArraySort(pKeyRangeList, sortUidComparFn);
@ -1242,7 +1269,7 @@ bool isCleanSttBlock(SArray* pKeyRangeList, STimeWindow* pQueryWindow, STableBlo
} }
STimeWindow w2 = {.skey = p2->skey.ts, .ekey = p2->ekey.ts}; STimeWindow w2 = {.skey = p2->skey.ts, .ekey = p2->ekey.ts};
bool overlap = overlapWithTimeWindow(&w2, pQueryWindow, pScanInfo, order); bool overlap = overlapWithTimeWindow(&w2, pQueryWindow, pScanInfo, order);
if (overlap) { if (overlap) {
return false; return false;
} }

View File

@ -1124,7 +1124,11 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name); sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
if (vnodeValidateTableHash(pVnode, tbName) < 0) { if (vnodeValidateTableHash(pVnode, tbName) < 0) {
cRsp.code = TSDB_CODE_VND_HASH_MISMATCH; cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
taosArrayPush(rsp.pArray, &cRsp); if (taosArrayPush(rsp.pArray, &cRsp) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
rcode = -1;
goto _exit;
}
vError("vgId:%d create-table:%s failed due to hash value mismatch", TD_VID(pVnode), tbName); vError("vgId:%d create-table:%s failed due to hash value mismatch", TD_VID(pVnode), tbName);
continue; continue;
} }
@ -1139,11 +1143,19 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
} else { } else {
cRsp.code = TSDB_CODE_SUCCESS; cRsp.code = TSDB_CODE_SUCCESS;
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
taosArrayPush(tbUids, &pCreateReq->uid); if (taosArrayPush(tbUids, &pCreateReq->uid) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
rcode = -1;
goto _exit;
}
vnodeUpdateMetaRsp(pVnode, cRsp.pMeta); vnodeUpdateMetaRsp(pVnode, cRsp.pMeta);
} }
taosArrayPush(rsp.pArray, &cRsp); if (taosArrayPush(rsp.pArray, &cRsp) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
rcode = -1;
goto _exit;
}
} }
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids)); vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
@ -1375,12 +1387,20 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid); if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
} }
taosArrayPush(rsp.pArray, &dropTbRsp); if (taosArrayPush(rsp.pArray, &dropTbRsp) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
pRsp->code = terrno;
goto _exit;
}
if (tsEnableAuditCreateTable) { if (tsEnableAuditCreateTable) {
char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); char *str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
strcpy(str, pDropTbReq->name); strcpy(str, pDropTbReq->name);
taosArrayPush(tbNames, &str); if (taosArrayPush(tbNames, &str) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
pRsp->code = terrno;
goto _exit;
}
} }
} }
@ -1499,11 +1519,13 @@ static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
taosArrayDestroy(pCxt->pColValues); taosArrayDestroy(pCxt->pColValues);
pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal)); pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal));
if (NULL == pCxt->pColValues) { if (NULL == pCxt->pColValues) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) { for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) {
SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type); SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type);
taosArrayPush(pCxt->pColValues, &val); if (taosArrayPush(pCxt->pColValues, &val) == NULL) {
return terrno;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1819,7 +1841,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
goto _exit; goto _exit;
} }
taosArrayPush(newTbUids, &pSubmitTbData->uid); if (taosArrayPush(newTbUids, &pSubmitTbData->uid) == NULL) {
code = terrno;
goto _exit;
}
if (pCreateTbRsp->pMeta) { if (pCreateTbRsp->pMeta) {
vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta); vnodeUpdateMetaRsp(pVnode, pCreateTbRsp->pMeta);

View File

@ -74,13 +74,16 @@ static int32_t getSchemaBytes(const SSchema* pSchema) {
} }
static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) { static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock(); QRY_OPTR_CHECK(pOutput);
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY; SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
} }
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_FIELD_LEN, 1); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_FIELD_LEN, 1);
int32_t code = blockDataAppendColInfo(pBlock, &infoData); code = blockDataAppendColInfo(pBlock, &infoData);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_TYPE_LEN, 2); infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_TYPE_LEN, 2);
code = blockDataAppendColInfo(pBlock, &infoData); code = blockDataAppendColInfo(pBlock, &infoData);
@ -229,13 +232,16 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp**
static int32_t execResetQueryCache() { return catalogClearCache(); } static int32_t execResetQueryCache() { return catalogClearCache(); }
static int32_t buildCreateDBResultDataBlock(SSDataBlock** pOutput) { static int32_t buildCreateDBResultDataBlock(SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock(); QRY_OPTR_CHECK(pOutput);
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY; SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
} }
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_COLS, 1); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_COLS, 1);
int32_t code = blockDataAppendColInfo(pBlock, &infoData); code = blockDataAppendColInfo(pBlock, &infoData);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_FIELD2_LEN, 2); infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_DB_RESULT_FIELD2_LEN, 2);
code = blockDataAppendColInfo(pBlock, &infoData); code = blockDataAppendColInfo(pBlock, &infoData);
@ -418,13 +424,16 @@ static int32_t execShowCreateDatabase(SShowCreateDatabaseStmt* pStmt, SRetrieveT
} }
static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) { static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock(); QRY_OPTR_CHECK(pOutput);
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY; SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
} }
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD1_LEN, 1); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD1_LEN, 1);
int32_t code = blockDataAppendColInfo(pBlock, &infoData); code = blockDataAppendColInfo(pBlock, &infoData);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD2_LEN, 2); infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_TB_RESULT_FIELD2_LEN, 2);
code = blockDataAppendColInfo(pBlock, &infoData); code = blockDataAppendColInfo(pBlock, &infoData);
@ -439,13 +448,16 @@ static int32_t buildCreateTbResultDataBlock(SSDataBlock** pOutput) {
} }
static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) { static int32_t buildCreateViewResultDataBlock(SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock(); QRY_OPTR_CHECK(pOutput);
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY; SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
} }
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD1_LEN, 1); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD1_LEN, 1);
int32_t code = blockDataAppendColInfo(pBlock, &infoData); code = blockDataAppendColInfo(pBlock, &infoData);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD2_LEN, 2); infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, SHOW_CREATE_VIEW_RESULT_FIELD2_LEN, 2);
code = blockDataAppendColInfo(pBlock, &infoData); code = blockDataAppendColInfo(pBlock, &infoData);
@ -892,9 +904,12 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
} }
static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) { static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) {
SSDataBlock* pBlock = createDataBlock(); QRY_OPTR_CHECK(pOutput);
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY; SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
} }
SNode* pProj = NULL; SNode* pProj = NULL;
@ -910,8 +925,9 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p
} }
QRY_ERR_RET(blockDataAppendColInfo(pBlock, &infoData)); QRY_ERR_RET(blockDataAppendColInfo(pBlock, &infoData));
} }
*pOutput = pBlock; *pOutput = pBlock;
return TSDB_CODE_SUCCESS; return code;
} }
int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) { int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {

View File

@ -1941,7 +1941,8 @@ _return:
} }
int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
int32_t code = 0; int32_t code = 0;
SSDataBlock *pBlock = NULL;
SExplainCtx *pCtx = (SExplainCtx *)ctx; SExplainCtx *pCtx = (SExplainCtx *)ctx;
int32_t rowNum = taosArrayGetSize(pCtx->rows); int32_t rowNum = taosArrayGetSize(pCtx->rows);
if (rowNum <= 0) { if (rowNum <= 0) {
@ -1949,7 +1950,9 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(TSDB_CODE_APP_ERROR); QRY_ERR_RET(TSDB_CODE_APP_ERROR);
} }
SSDataBlock *pBlock = createDataBlock(); code = createDataBlock(&pBlock);
QRY_ERR_JRET(code);
SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_EXPLAIN_RESULT_ROW_SIZE, 1); SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_EXPLAIN_RESULT_ROW_SIZE, 1);
QRY_ERR_JRET(blockDataAppendColInfo(pBlock, &infoData)); QRY_ERR_JRET(blockDataAppendColInfo(pBlock, &infoData));
QRY_ERR_JRET(blockDataEnsureCapacity(pBlock, rowNum)); QRY_ERR_JRET(blockDataEnsureCapacity(pBlock, rowNum));

View File

@ -20,15 +20,6 @@
extern "C" { extern "C" {
#endif #endif
#define QRY_OPTR_CHECK(_o) \
do { \
if ((_o) == NULL) { \
return TSDB_CODE_INVALID_PARA; \
} else { \
*(_o) = NULL; \
} \
} while(0)
typedef struct SOperatorCostInfo { typedef struct SOperatorCostInfo {
double openCost; double openCost;
double totalCost; double totalCost;

View File

@ -364,7 +364,12 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* pBlock = createDataBlock(); SSDataBlock* pBlock = NULL;
code = createDataBlock(&pBlock);
if (code) {
return code;
}
pBlock->info.rows = 1; pBlock->info.rows = 1;
pBlock->info.capacity = 0; pBlock->info.capacity = 0;

View File

@ -181,7 +181,10 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
capacity = TMIN(totalTables, 4096); capacity = TMIN(totalTables, 4096);
pInfo->pBufferedRes = createOneDataBlock(pInfo->pRes, false); pInfo->pBufferedRes = NULL;
code = createOneDataBlock(pInfo->pRes, false, &pInfo->pBufferedRes);
QUERY_CHECK_CODE(code, lino, _error);
setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode); setColIdForCacheReadBlock(pInfo->pBufferedRes, pScanNode);
code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity); code = blockDataEnsureCapacity(pInfo->pBufferedRes, capacity);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);

View File

@ -672,7 +672,10 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
SSDataBlock* pBlock = createDataBlock(); SSDataBlock* pBlock = NULL;
code = createDataBlock(&pBlock);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId); SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
code = blockDataAppendColInfo(pBlock, &idata); code = blockDataAppendColInfo(pBlock, &idata);
@ -791,8 +794,8 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks); pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
blockDataCleanup(pb); blockDataCleanup(pb);
} else { } else {
pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
QUERY_CHECK_NULL(pb, code, lino, _end, terrno); QUERY_CHECK_NULL(pb, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
} }
int32_t compLen = *(int32_t*)pStart; int32_t compLen = *(int32_t*)pStart;

View File

@ -251,9 +251,13 @@ SArray* createSortInfo(SNodeList* pNodeList) {
} }
SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
int32_t numOfCols = LIST_LENGTH(pNode->pSlots); int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
SSDataBlock* pBlock = NULL;
SSDataBlock* pBlock = createDataBlock(); int32_t code = createDataBlock(&pBlock);
if (code) {
terrno = code;
return NULL;
}
pBlock->info.id.blockId = pNode->dataBlockId; pBlock->info.id.blockId = pNode->dataBlockId;
pBlock->info.type = STREAM_INVALID; pBlock->info.type = STREAM_INVALID;
@ -267,7 +271,7 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
idata.info.scale = pDescNode->dataType.scale; idata.info.scale = pDescNode->dataType.scale;
idata.info.precision = pDescNode->dataType.precision; idata.info.precision = pDescNode->dataType.precision;
int32_t code = blockDataAppendColInfo(pBlock, &idata); code = blockDataAppendColInfo(pBlock, &idata);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
@ -1029,11 +1033,9 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
SStorageAPI* pStorageAPI) { SStorageAPI* pStorageAPI) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SSDataBlock* pResBlock = createDataBlock(); SSDataBlock* pResBlock = NULL;
if (pResBlock == NULL) { code = createDataBlock(&pResBlock);
terrno = TSDB_CODE_OUT_OF_MEMORY; QUERY_CHECK_CODE(code, lino, _end);
return NULL;
}
for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};

View File

@ -695,9 +695,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
while (pRes != NULL) { while (pRes != NULL) {
SSDataBlock* p = NULL; SSDataBlock* p = NULL;
if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) { if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
SSDataBlock* p1 = createOneDataBlock(pRes, true); SSDataBlock* p1 = NULL;
void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1); code = createOneDataBlock(pRes, true, &p1);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
p = p1; p = p1;
} else { } else {
p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex); p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);

View File

@ -513,7 +513,13 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
goto _error; goto _error;
} }
pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false); pInfo->pFinalRes = NULL;
code = createOneDataBlock(pInfo->pRes, false, &pInfo->pFinalRes);
if (code) {
goto _error;
}
code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;

View File

@ -613,7 +613,10 @@ SSDataBlock* createBlockDataNotLoaded(const SOperatorInfo* pOperator, SSDataBloc
return NULL; return NULL;
} }
SSDataBlock* pDstBlock = createDataBlock(); SSDataBlock* pDstBlock = NULL;
code = createDataBlock(&pDstBlock);
QUERY_CHECK_CODE(code, lino, _end);
pDstBlock->info = pDataBlock->info; pDstBlock->info = pDataBlock->info;
pDstBlock->info.id.blockId = pOperator->resultDataBlockId; pDstBlock->info.id.blockId = pOperator->resultDataBlockId;
pDstBlock->info.capacity = 0; pDstBlock->info.capacity = 0;
@ -1319,7 +1322,10 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) { if (winCode != TSDB_CODE_SUCCESS) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); SSDataBlock* pTmpBlock = NULL;
code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock);
QUERY_CHECK_CODE(code, lino, _end);
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
pTmpBlock->info.id.groupId = groupId; pTmpBlock->info.id.groupId = groupId;
char* tbName = pSrcBlock->info.parTbName; char* tbName = pSrcBlock->info.parTbName;
@ -1708,8 +1714,9 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
taosHashSetFreeFp(pInfo->pPartitions, freePartItem); taosHashSetFreeFp(pInfo->pPartitions, freePartItem);
pInfo->tsColIndex = 0; pInfo->tsColIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
QUERY_CHECK_NULL(pInfo->pDelRes, code, lino, _error, terrno); code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);

View File

@ -1157,22 +1157,23 @@ int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinN
int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode)); int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode));
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QRY_ERR_RET(terrno); QRY_ERR_RET(code);
} }
if (NULL != pJoin->pPreFilter) { if (NULL != pJoin->pPreFilter) {
pJoin->midBlk = createOneDataBlock(pJoin->finBlk, false); pJoin->midBlk = NULL;
if (NULL == pJoin->finBlk) { code = createOneDataBlock(pJoin->finBlk, false, &pJoin->midBlk);
QRY_ERR_RET(terrno); if (code) {
QRY_ERR_RET(code);
} }
code = blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity); code = blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QRY_ERR_RET(terrno); QRY_ERR_RET(code);
} }
} }
pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO; pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -2248,10 +2248,12 @@ static int32_t mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInf
} while (true); } while (true);
if (buildGot && NULL == pCtx->cache.outBlk) { if (buildGot && NULL == pCtx->cache.outBlk) {
pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); pCtx->cache.outBlk = NULL;
if (NULL == pCtx->cache.outBlk) { int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk);
MJ_ERR_RET(terrno); if (code) {
MJ_ERR_RET(code);
} }
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit)); MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
} }
@ -2678,10 +2680,12 @@ static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo
if (buildGot && pJoin->build->newBlk) { if (buildGot && pJoin->build->newBlk) {
if (NULL == pCtx->cache.outBlk) { if (NULL == pCtx->cache.outBlk) {
pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false); pCtx->cache.outBlk = NULL;
if (NULL == pCtx->cache.outBlk) { int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk);
MJ_ERR_RET(terrno); if (code) {
MJ_ERR_RET(code);
} }
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit)); MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
} }
@ -2833,7 +2837,12 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) {
if (!pGrp->clonedBlk) { if (!pGrp->clonedBlk) {
if (0 == pGrp->beginIdx) { if (0 == pGrp->beginIdx) {
pGrp->blk = createOneDataBlock(pGrp->blk, true); SSDataBlock* p = NULL;
int32_t code = createOneDataBlock(pGrp->blk, true, &p);
if (code) {
MJ_ERR_RET(code);
}
pGrp->blk = p;
} else { } else {
pGrp->blk = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx); pGrp->blk = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx);
pGrp->endIdx -= pGrp->beginIdx; pGrp->endIdx -= pGrp->beginIdx;
@ -3672,9 +3681,10 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode))); MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));
if (pJoin->pFPreFilter) { if (pJoin->pFPreFilter) {
pCtx->midBlk = createOneDataBlock(pCtx->finBlk, false); pCtx->midBlk = NULL;
if (NULL == pCtx->midBlk) { int32_t code = createOneDataBlock(pCtx->finBlk, false, &pCtx->midBlk);
MJ_ERR_RET(terrno); if (code) {
MJ_ERR_RET(code);
} }
MJ_ERR_RET(blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity)); MJ_ERR_RET(blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity));
} }

View File

@ -1331,10 +1331,12 @@ int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* whol
} }
if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) { if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) {
pGrp->blk = createOneDataBlock(pTable->blk, true); pGrp->blk = NULL;
if (NULL == pGrp->blk) { code = createOneDataBlock(pTable->blk, true, &pGrp->blk);
MJ_ERR_RET(terrno); if (code) {
MJ_ERR_RET(code);
} }
if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) { if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
MJ_ERR_RET(terrno); MJ_ERR_RET(terrno);
} }

View File

@ -207,7 +207,6 @@ int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
if (pSortMergeInfo->pIntermediateBlock == NULL) { if (pSortMergeInfo->pIntermediateBlock == NULL) {
pSortMergeInfo->pIntermediateBlock = NULL; pSortMergeInfo->pIntermediateBlock = NULL;
code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock); code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock);
if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) { if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) {
return code; return code;

View File

@ -27,6 +27,8 @@
#include "querytask.h" #include "querytask.h"
#include "storageapi.h" #include "storageapi.h"
#include "tdatablock.h"
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,

View File

@ -114,7 +114,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFinalRes = NULL;
code = createOneDataBlock(pResBlock, false, &pInfo->pFinalRes);
TSDB_CHECK_CODE(code, lino, _error);
pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder; pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder; pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup; pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;

View File

@ -1747,8 +1747,9 @@ static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t ts
} }
if (pInfo->partitionSup.needCalc) { if (pInfo->partitionSup.needCalc) {
SSDataBlock* tmpBlock = createOneDataBlock(pResult, true); SSDataBlock* tmpBlock = NULL;
QUERY_CHECK_NULL(tmpBlock, code, lino, _end, terrno); code = createOneDataBlock(pResult, true, &tmpBlock);
QUERY_CHECK_CODE(code, lino, _end);
blockDataCleanup(pResult); blockDataCleanup(pResult);
for (int32_t i = 0; i < tmpBlock->info.rows; i++) { for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
@ -3141,7 +3142,9 @@ FETCH_NEXT_BLOCK:
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo)); printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo));
SSDataBlock* pDelBlock = NULL; SSDataBlock* pDelBlock = NULL;
if (pInfo->tqReader) { if (pInfo->tqReader) {
pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock);
QUERY_CHECK_CODE(code, lino, _end);
code = filterDelBlockByUid(pDelBlock, pBlock, pInfo); code = filterDelBlockByUid(pDelBlock, pBlock, pInfo);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} else { } else {
@ -3782,7 +3785,8 @@ _end:
} }
int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo); QRY_OPTR_CHECK(pOptrInfo);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -3945,15 +3949,21 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
} }
pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); code = createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0; pInfo->groupId = 0;
pInfo->pStreamScanOp = pOperator; pInfo->pStreamScanOp = pOperator;
pInfo->deleteDataIndex = 0; pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA); code = createSpecialDataBlock(STREAM_DELETE_DATA, &pInfo->pDeleteDataRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX}; pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR); createSpecialDataBlock(STREAM_CLEAR, &pInfo->pUpdateDataRes);
QUERY_CHECK_CODE(code, lino, _error);
if (hasPrimaryKeyCol(pInfo)) { if (hasPrimaryKeyCol(pInfo)) {
code = addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes); code = addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
@ -3961,6 +3971,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
pInfo->pkColType = pkType.type; pInfo->pkColType = pkType.type;
pInfo->pkColLen = pkType.bytes; pInfo->pkColLen = pkType.bytes;
} }
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->partitionSup.needCalc = false; pInfo->partitionSup.needCalc = false;
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
@ -3969,7 +3980,9 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
pInfo->pState = pTaskInfo->streamInfo.pState; pInfo->pState = pTaskInfo->streamInfo.pState;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
// for stream // for stream
if (pTaskInfo->streamInfo.pState) { if (pTaskInfo->streamInfo.pState) {
@ -4770,10 +4783,16 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo*
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
pInput->type = SUB_TABLE_MEM_BLOCK; pInput->type = SUB_TABLE_MEM_BLOCK;
code = dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond); code = dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false); code = createOneDataBlock(pInfo->pResBlock, false, &pInput->pReaderBlock);
QUERY_CHECK_CODE(code, lino, _end);
code = createOneDataBlock(pInfo->pResBlock, false, &pInput->pPageBlock);
QUERY_CHECK_CODE(code, lino, _end);
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex); STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
pInput->pKeyInfo = keyInfo; pInput->pKeyInfo = keyInfo;
@ -5183,7 +5202,12 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) { if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) {
if (!bSkipped) { if (!bSkipped) {
pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks] = createOneDataBlock(pBlock, true); int32_t code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]);
if (code) {
terrno = code;
return NULL;
}
++pInfo->numNextDurationBlocks; ++pInfo->numNextDurationBlocks;
if (pInfo->numNextDurationBlocks > 2) { if (pInfo->numNextDurationBlocks > 2) {
qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo), qError("%s table merge scan prefetch %d next duration blocks. end early.", GET_TASKID(pTaskInfo),
@ -5685,8 +5709,9 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
code = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order, &pInfo->pSortInfo); code = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order, &pInfo->pSortInfo);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
QUERY_CHECK_NULL(pInfo->pReaderBlock, code, lino, _error, terrno); code = createOneDataBlock(pInfo->pResBlock, false, &pInfo->pReaderBlock);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable; pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
@ -5696,7 +5721,8 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
// start one reader variable // start one reader variable
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); code = createOneDataBlock(pInfo->pResBlock, false, &pInfo->pSortInputBlock);
QUERY_CHECK_CODE(code, lino, _error);
if (!tsExperimental) { if (!tsExperimental) {
pInfo->filesetDelimited = false; pInfo->filesetDelimited = false;

View File

@ -858,7 +858,10 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = tSimpleHashInit(64, hashFn); pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL; pInfo->pDelIterator = NULL;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->ignoreExpiredData = pCountNode->window.igExpired; pInfo->ignoreExpiredData = pCountNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false; pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
@ -870,7 +873,9 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
} }
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->recvGetAll = false; pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey; pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey;

View File

@ -900,7 +900,9 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL; pInfo->pDelIterator = NULL;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pEventNode->window.igExpired; pInfo->ignoreExpiredData = pEventNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false; pInfo->ignoreExpiredDataSaved = false;
@ -922,8 +924,8 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->pAllUpdated = NULL; pInfo->pAllUpdated = NULL;
} }
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_NULL(pInfo->pCheckpointRes, code, lino, _error, terrno); QUERY_CHECK_CODE(code, lino, _error);
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
pInfo->recvGetAll = false; pInfo->recvGetAll = false;

View File

@ -1394,11 +1394,8 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
} }
} }
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
if (!pInfo->pDelRes) { QUERY_CHECK_CODE(code, lino, _error);
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _error);
}
code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);

View File

@ -1918,10 +1918,15 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pFinalPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pFinalPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
code = createSpecialDataBlock(STREAM_RETRIEVE, &pInfo->pPullDataRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false; pInfo->ignoreExpiredDataSaved = false;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delKey.ts = INT64_MAX; pInfo->delKey.ts = INT64_MAX;
@ -1936,11 +1941,16 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
pInfo->dataVersion = 0; pInfo->dataVersion = 0;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false; pInfo->recvGetAll = false;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvRetrive = false;
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->recvPullover = false; pInfo->recvPullover = false;
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->recvRetrive = false;
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
code = createSpecialDataBlock(STREAM_MID_RETRIEVE, &pInfo->pMidRetriveRes);
QUERY_CHECK_CODE(code, lino, _error);
code = createSpecialDataBlock(STREAM_MID_RETRIEVE, &pInfo->pMidPulloverRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->clearState = false; pInfo->clearState = false;
pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey));
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
@ -2101,13 +2111,18 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
SStorageAPI* pApi, int32_t tsIndex) { SStorageAPI* pApi, int32_t tsIndex) {
pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput);
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock);
if (code) {
return code;
}
pSup->gap = gap; pSup->gap = gap;
pSup->stateKeySize = keySize; pSup->stateKeySize = keySize;
pSup->stateKeyType = keyType; pSup->stateKeyType = keyType;
pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); pSup->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pSup->pDummyCtx == NULL) { if (pSup->pDummyCtx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
pSup->stateStore = *pStore; pSup->stateStore = *pStore;
@ -2129,7 +2144,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
} }
pSup->pSessionAPI = pApi; pSup->pSessionAPI = pApi;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3718,7 +3732,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = tSimpleHashInit(64, hashFn); pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL; pInfo->pDelIterator = NULL;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->pPhyNode = pPhyNode; pInfo->pPhyNode = pPhyNode;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired; pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
@ -3734,7 +3750,9 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->isHistoryOp = pHandle->fillHistory; pInfo->isHistoryOp = pHandle->fillHistory;
} }
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->clearState = false; pInfo->clearState = false;
pInfo->recvGetAll = false; pInfo->recvGetAll = false;
pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey; pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey;
@ -3771,6 +3789,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return code;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo); destroyStreamSessionAggOperatorInfo(pInfo);
@ -4847,7 +4866,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL; pInfo->pDelIterator = NULL;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pStateNode->window.igExpired; pInfo->ignoreExpiredData = pStateNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false; pInfo->ignoreExpiredDataSaved = false;
@ -4856,14 +4878,17 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->dataVersion = 0; pInfo->dataVersion = 0;
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey)); pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
if (!pInfo->historyWins) { if (!pInfo->historyWins) {
code = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
} }
if (pHandle) { if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory; pInfo->isHistoryOp = pHandle->fillHistory;
} }
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->recvGetAll = false; pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn); pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey; pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey;
@ -5157,7 +5182,10 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->invertible = false; pInfo->invertible = false;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pPhyNode = NULL; // create new child pInfo->pPhyNode = NULL; // create new child
@ -5187,7 +5215,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false; pInfo->recvGetAll = false;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);

View File

@ -1176,11 +1176,17 @@ static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
} }
} }
SSDataBlock* pBlock = createDataBlock(); SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
terrno = code;
return NULL;
}
for (int32_t i = 0; i < pMeta[index].colNum; ++i) { for (int32_t i = 0; i < pMeta[index].colNum; ++i) {
SColumnInfoData colInfoData = SColumnInfoData colInfoData =
createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1); createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1);
int32_t code = blockDataAppendColInfo(pBlock, &colInfoData); code = blockDataAppendColInfo(pBlock, &colInfoData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);

View File

@ -168,22 +168,11 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
} }
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) { int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) {
if (pBlock == NULL) {
*pBlock = NULL;
return TSDB_CODE_SUCCESS;
}
if (pSortHandle->pDataBlock == NULL) { if (pSortHandle->pDataBlock == NULL) {
*pBlock = NULL; *pBlock = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock);
*pBlock = createOneDataBlock(pSortHandle->pDataBlock, false);
if (*pBlock == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
}
} }
#define AllocatedTupleType 0 #define AllocatedTupleType 0
@ -271,9 +260,8 @@ int32_t tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize,
pSortHandle->forceUsePQSort = false; pSortHandle->forceUsePQSort = false;
if (pBlock != NULL) { if (pBlock != NULL) {
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); code = createOneDataBlock(pBlock, false, &pSortHandle->pDataBlock);
if (pSortHandle->pDataBlock == NULL) { if (code) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
} }
@ -500,7 +488,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false); SSDataBlock* pBlock = NULL;
int32_t code = createOneDataBlock(pDataBlock, false, &pBlock);
if (code) {
return code;
}
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList); return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
} }
@ -994,7 +987,14 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
tMergeTreeDestroy(&pHandle->pMergeTree); tMergeTreeDestroy(&pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0; pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); SSDataBlock* pBlock = NULL;
code = createOneDataBlock(pHandle->pDataBlock, false, &pBlock);
if (code) {
taosArrayDestroy(pResList);
return code;
}
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList); taosArrayDestroy(pResList);
@ -1481,12 +1481,19 @@ static int32_t appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSou
static int32_t initRowIdSort(SSortHandle* pHandle) { static int32_t initRowIdSort(SSortHandle* pHandle) {
SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL; SBlockOrderInfo* pkOrder = (pHandle->bSortPk) ? taosArrayGet(pHandle->aExtRowsOrders, 1) : NULL;
SColumnInfoData* extPkCol = (pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL; SColumnInfoData* extPkCol =
(pHandle->bSortPk) ? taosArrayGet(pHandle->pDataBlock->pDataBlock, pkOrder->slotId) : NULL;
SColumnInfoData pkCol = {0}; SColumnInfoData pkCol = {0};
SSDataBlock* pSortInput = createDataBlock(); SSDataBlock* pSortInput = NULL;
int32_t code = createDataBlock(&pSortInput);
if (code) {
return code;
}
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
int32_t code = blockDataAppendColInfo(pSortInput, &tsCol);
code = blockDataAppendColInfo(pSortInput, &tsCol);
if (code) { if (code) {
blockDataDestroy(pSortInput); blockDataDestroy(pSortInput);
return code; return code;
@ -1499,14 +1506,14 @@ static int32_t initRowIdSort(SSortHandle* pHandle) {
return code; return code;
} }
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
code = blockDataAppendColInfo(pSortInput, &offsetCol); code = blockDataAppendColInfo(pSortInput, &offsetCol);
if (code) { if (code) {
blockDataDestroy(pSortInput); blockDataDestroy(pSortInput);
return code; return code;
} }
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
code = blockDataAppendColInfo(pSortInput, &lengthCol); code = blockDataAppendColInfo(pSortInput, &lengthCol);
if (code) { if (code) {
blockDataDestroy(pSortInput); blockDataDestroy(pSortInput);
@ -1525,9 +1532,9 @@ static int32_t initRowIdSort(SSortHandle* pHandle) {
blockDataDestroy(pHandle->pDataBlock); blockDataDestroy(pHandle->pDataBlock);
pHandle->pDataBlock = pSortInput; pHandle->pDataBlock = pSortInput;
// int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); // int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock);
// size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); // size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
pHandle->pageSize = 256 * 1024; // 256k pHandle->pageSize = 256 * 1024; // 256k
pHandle->numOfPages = 256; pHandle->numOfPages = 256;
SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo)); SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo));
@ -1928,7 +1935,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
blockDataCleanup(pHandle->pDataBlock); blockDataCleanup(pHandle->pDataBlock);
} }
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); SSDataBlock* pMemSrcBlk = NULL;
code = createOneDataBlock(pHandle->pDataBlock, false, &pMemSrcBlk);
if (code) {
cleanupMergeSup(&sup);
tMergeTreeDestroy(&pTree);
return code;
}
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
cleanupMergeSup(&sup); cleanupMergeSup(&sup);
@ -2054,7 +2068,16 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
blockDataDestroy(pBlk); blockDataDestroy(pBlk);
} }
} else { } else {
SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true); SSDataBlock* tBlk = NULL;
if (bExtractedBlock) {
tBlk = pBlk;
} else {
code = createOneDataBlock(pBlk, true, &tBlk);
if (code) {
return code;
}
}
code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
if (code) { if (code) {
return code; return code;
@ -2175,7 +2198,11 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
// todo, number of pages are set according to the total available sort buffer // todo, number of pages are set according to the total available sort buffer
pHandle->numOfPages = 1024; pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize; sortBufSize = pHandle->numOfPages * pHandle->pageSize;
pHandle->pDataBlock = createOneDataBlock(pBlock, false); code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
if (code) {
freeSSortSource(source);
return code;
}
} }
if (pHandle->beforeFp != NULL) { if (pHandle->beforeFp != NULL) {
@ -2453,11 +2480,10 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
} }
if (pHandle->pDataBlock == NULL) { if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createOneDataBlock(pBlock, false); int32_t code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
} if (code) {
return code;
if (pHandle->pDataBlock == NULL) { }
return TSDB_CODE_OUT_OF_MEMORY;
} }
size_t colNum = blockDataGetNumOfCols(pBlock); size_t colNum = blockDataGetNumOfCols(pBlock);

View File

@ -71,7 +71,9 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
} }
if (pInfo->pBlock == NULL) { if (pInfo->pBlock == NULL) {
pInfo->pBlock = createDataBlock(); pInfo->pBlock = NULL;
int32_t code = createDataBlock(&pInfo->pBlock);
ASSERT(code == 0);
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pInfo->pBlock, &colInfo); blockDataAppendColInfo(pInfo->pBlock, &colInfo);
@ -129,7 +131,10 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
} }
if (pInfo->pBlock == NULL) { if (pInfo->pBlock == NULL) {
pInfo->pBlock = createDataBlock(); pInfo->pBlock = NULL;
int32_t code = createDataBlock(&pInfo->pBlock);
ASSERT(code == 0);
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1);
blockDataAppendColInfo(pInfo->pBlock, &colInfo); blockDataAppendColInfo(pInfo->pBlock, &colInfo);

View File

@ -928,8 +928,9 @@ SExecTaskInfo* createDummyTaskInfo(char* taskId) {
} }
SSDataBlock* createDummyBlock(int32_t blkId) { SSDataBlock* createDummyBlock(int32_t blkId) {
SSDataBlock* p = createDataBlock(); SSDataBlock* p = NULL;
assert(p); int32_t code = createDataBlock(&p);
assert(code == 0);
p->info.id.blockId = blkId; p->info.id.blockId = blkId;
p->info.type = STREAM_INVALID; p->info.type = STREAM_INVALID;

View File

@ -60,7 +60,12 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
return NULL; return NULL;
} }
SSDataBlock* pBlock = createDataBlock(); SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return NULL;
}
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};
colInfo.info.type = pInfo->type; colInfo.info.type = pInfo->type;
@ -348,7 +353,10 @@ TEST(testCase, ordered_merge_sort_Test) {
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi); taosArrayPush(orderInfo, &oi);
SSDataBlock* pBlock = createDataBlock(); SSDataBlock* pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
ASSERT(code == 0);
for (int32_t i = 0; i < 1; ++i) { for (int32_t i = 0; i < 1; ++i) {
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pBlock, &colInfo); blockDataAppendColInfo(pBlock, &colInfo);

View File

@ -1254,8 +1254,13 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
int32_t numOfCols = pInput->numOfInputCols; int32_t numOfCols = pInput->numOfInputCols;
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows; int32_t numOfRows = pInput->numOfRows;
SSDataBlock *pTempBlock = NULL;
int32_t code = createDataBlock(&pTempBlock);
if (code) {
return code;
}
SSDataBlock *pTempBlock = createDataBlock();
pTempBlock->info.rows = pInput->totalRows; pTempBlock->info.rows = pInput->totalRows;
pTempBlock->info.id.uid = pInput->uid; pTempBlock->info.id.uid = pInput->uid;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {

View File

@ -88,7 +88,13 @@ int aggregateFuncTest() {
return -1; return -1;
} }
SSDataBlock *pBlock = createDataBlock(); SSDataBlock *pBlock = NULL;
int32_t code = createDataBlock(&pBlock);
if (code) {
return code;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pBlock, &colInfo); blockDataAppendColInfo(pBlock, &colInfo);

View File

@ -112,7 +112,10 @@ int32_t flttMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType,
} }
if (NULL == *block) { if (NULL == *block) {
SSDataBlock *res = createDataBlock(); SSDataBlock *res = NULL;
int32_t code = createDataBlock(&res);
ASSERT(code == 0);
for (int32_t i = 0; i < 2; ++i) { for (int32_t i = 0; i < 2; ++i) {
SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, 10, 1 + i); SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_NULL, 10, 1 + i);
FLT_ERR_RET(blockDataAppendColInfo(res, &idata)); FLT_ERR_RET(blockDataAppendColInfo(res, &idata));

View File

@ -91,14 +91,16 @@ void scltInitLogFile() {
int32_t scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *slotId, bool newBlock, int32_t rows, int32_t scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *slotId, bool newBlock, int32_t rows,
SColumnInfo *colInfo) { SColumnInfo *colInfo) {
if (newBlock) { if (newBlock) {
SSDataBlock *res = createDataBlock(); SSDataBlock *res = NULL;
if (NULL == res || NULL == res->pDataBlock) { int32_t code = createDataBlock(&res);
if (code != 0 || NULL == res->pDataBlock) {
SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
SColumnInfoData idata = {0}; SColumnInfoData idata = {0};
idata.info = *colInfo; idata.info = *colInfo;
int32_t code = colInfoDataEnsureCapacity(&idata, rows, true);
code = colInfoDataEnsureCapacity(&idata, rows, true);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(&idata); taosMemoryFree(&idata);
SCL_ERR_RET(code); SCL_ERR_RET(code);
@ -185,7 +187,11 @@ int32_t scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType,
} }
if (NULL == *block) { if (NULL == *block) {
SSDataBlock *res = createDataBlock(); SSDataBlock *res = NULL;
int32_t code = createDataBlock(&res);
ASSERT(code == 0);
for (int32_t i = 0; i < 2; ++i) { for (int32_t i = 0; i < 2; ++i) {
SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_INT, 10, i + 1); SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_INT, 10, i + 1);
code = colInfoDataEnsureCapacity(&idata, rowNum, true); code = colInfoDataEnsureCapacity(&idata, rowNum, true);

View File

@ -142,7 +142,9 @@ static int32_t syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
int64_t nowMS = taosGetTimestampMs(); int64_t nowMS = taosGetTimestampMs();
if (nowMS - pStub->createTime > ttl || -1 == ttl) { if (nowMS - pStub->createTime > ttl || -1 == ttl) {
taosArrayPush(delIndexArray, pSeqNum); if (taosArrayPush(delIndexArray, pSeqNum) == NULL) {
return terrno;
}
cnt++; cnt++;
SFsmCbMeta cbMeta = { SFsmCbMeta cbMeta = {

View File

@ -1366,11 +1366,6 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
if (ret < 0) { if (ret < 0) {
return ret; return ret;
} }
/*
if (pDecoder->ofps) {
taosArrayPush(pDecoder->ofps, &ofp);
}
*/
ofpCell = tdbPageGetCell(ofp, 0); ofpCell = tdbPageGetCell(ofp, 0);
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
@ -1411,11 +1406,6 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
if (ret < 0) { if (ret < 0) {
return ret; return ret;
} }
/*
if (pDecoder->ofps) {
taosArrayPush(pDecoder->ofps, &ofp);
}
*/
ofpCell = tdbPageGetCell(ofp, 0); ofpCell = tdbPageGetCell(ofp, 0);
int lastKeyPage = 0; int lastKeyPage = 0;
@ -1642,7 +1632,10 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
SArray *ofps = pPage->pPager->ofps; SArray *ofps = pPage->pPager->ofps;
if (ofps) { if (ofps) {
taosArrayPush(ofps, &ofp); if (taosArrayPush(ofps, &ofp) == NULL) {
ASSERT(0);
return terrno;
}
} }
tdbPagerReturnPage(pPage->pPager, ofp, pTxn); tdbPagerReturnPage(pPage->pPager, ofp, pTxn);

View File

@ -34,14 +34,12 @@ SArray* taosArrayInit(size_t size, size_t elemSize) {
SArray* pArray = taosMemoryMalloc(sizeof(SArray)); SArray* pArray = taosMemoryMalloc(sizeof(SArray));
if (pArray == NULL) { if (pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pArray->size = 0; pArray->size = 0;
pArray->pData = taosMemoryCalloc(size, elemSize); pArray->pData = taosMemoryCalloc(size, elemSize);
if (pArray->pData == NULL) { if (pArray->pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pArray); taosMemoryFree(pArray);
return NULL; return NULL;
} }

View File

@ -169,7 +169,7 @@ TdThread doRegisterCacheObj(SCacheObj *pCacheObj) {
taosThreadOnce(&cacheThreadInit, doInitRefreshThread); taosThreadOnce(&cacheThreadInit, doInitRefreshThread);
taosThreadMutexLock(&guard); taosThreadMutexLock(&guard);
taosArrayPush(pCacheArrayList, &pCacheObj); (void)taosArrayPush(pCacheArrayList, &pCacheObj);
taosThreadMutexUnlock(&guard); taosThreadMutexUnlock(&guard);
return cacheRefreshWorker; return cacheRefreshWorker;

View File

@ -1275,18 +1275,23 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
char *itemName = NULL, *itemValueString = NULL; char *itemName = NULL, *itemValueString = NULL;
TAOS_CHECK_GOTO(tjsonGetObjectName(item, &itemName), NULL, _err_json); TAOS_CHECK_GOTO(tjsonGetObjectName(item, &itemName), NULL, _err_json);
TAOS_CHECK_GOTO(tjsonGetObjectValueString(item, &itemValueString), NULL, _err_json); TAOS_CHECK_GOTO(tjsonGetObjectValueString(item, &itemValueString), NULL, _err_json);
if (itemValueString != NULL && itemName != NULL) { if (itemValueString != NULL && itemName != NULL) {
size_t itemNameLen = strlen(itemName); size_t itemNameLen = strlen(itemName);
size_t itemValueStringLen = strlen(itemValueString); size_t itemValueStringLen = strlen(itemValueString);
cfgLineBuf = taosMemoryRealloc(cfgLineBuf, itemNameLen + itemValueStringLen + 3); void* px = taosMemoryRealloc(cfgLineBuf, itemNameLen + itemValueStringLen + 3);
if (NULL == cfgLineBuf) { if (NULL == px) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err_json; goto _err_json;
} }
cfgLineBuf = px;
(void)memset(cfgLineBuf, 0, itemNameLen + itemValueStringLen + 3);
(void)memcpy(cfgLineBuf, itemName, itemNameLen); (void)memcpy(cfgLineBuf, itemName, itemNameLen);
cfgLineBuf[itemNameLen] = ' '; cfgLineBuf[itemNameLen] = ' ';
(void)memcpy(&cfgLineBuf[itemNameLen + 1], itemValueString, itemValueStringLen); (void)memcpy(&cfgLineBuf[itemNameLen + 1], itemValueString, itemValueStringLen);
(void)paGetToken(cfgLineBuf, &name, &olen); (void)paGetToken(cfgLineBuf, &name, &olen);
if (olen == 0) continue; if (olen == 0) continue;
name[olen] = 0; name[olen] = 0;

View File

@ -326,7 +326,7 @@ static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArr
ASSERT(shard->usage >= old->totalCharge); ASSERT(shard->usage >= old->totalCharge);
shard->usage -= old->totalCharge; shard->usage -= old->totalCharge;
taosArrayPush(deleted, &old); (void)taosArrayPush(deleted, &old);
} }
} }
@ -392,7 +392,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) { if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) {
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
if (handle == NULL) { if (handle == NULL) {
taosArrayPush(lastReferenceList, &e); (void)taosArrayPush(lastReferenceList, &e);
} else { } else {
if (freeOnFail) { if (freeOnFail) {
taosMemoryFree(e); taosMemoryFree(e);
@ -415,7 +415,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
ASSERT(shard->usage >= old->totalCharge); ASSERT(shard->usage >= old->totalCharge);
shard->usage -= old->totalCharge; shard->usage -= old->totalCharge;
taosArrayPush(lastReferenceList, &old); (void)taosArrayPush(lastReferenceList, &old);
} }
} }
if (handle == NULL) { if (handle == NULL) {
@ -536,7 +536,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
ASSERT(shard->usage >= old->totalCharge); ASSERT(shard->usage >= old->totalCharge);
shard->usage -= old->totalCharge; shard->usage -= old->totalCharge;
taosArrayPush(lastReferenceList, &old); (void)taosArrayPush(lastReferenceList, &old);
} }
(void)taosThreadMutexUnlock(&shard->mutex); (void)taosThreadMutexUnlock(&shard->mutex);