Merge pull request #26794 from taosdata/fix/TD-30967-3
adj operator result
This commit is contained in:
commit
e608750323
|
@ -380,7 +380,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
|
||||||
|
|
||||||
for (int32_t k = numOfCols; k < slotId + 1; ++k) {
|
for (int32_t k = numOfCols; k < slotId + 1; ++k) {
|
||||||
void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
|
void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
|
||||||
|
|
|
@ -132,20 +132,20 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
|
SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
|
||||||
QUERY_CHECK_NULL(pCidList, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pCidList, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t));
|
pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t));
|
||||||
QUERY_CHECK_NULL(pInfo->pFuncTypeList, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pFuncTypeList, code, lino, _error, terrno);
|
||||||
|
|
||||||
void* tmp = taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes);
|
void* tmp = taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes);
|
||||||
if (!tmp && taosArrayGetSize(pScanNode->pFuncTypes) > 0) {
|
if (!tmp && taosArrayGetSize(pScanNode->pFuncTypes) > 0) {
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
|
for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
|
||||||
SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||||
void* tmp = taosArrayPush(pCidList, &pColInfo->colId);
|
void* tmp = taosArrayPush(pCidList, &pColInfo->colId);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
|
||||||
if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) {
|
if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) {
|
||||||
pColInfo->funcType = *(int32_t*)taosArrayGet(pInfo->pFuncTypeList, i);
|
pColInfo->funcType = *(int32_t*)taosArrayGet(pInfo->pFuncTypeList, i);
|
||||||
}
|
}
|
||||||
|
@ -469,7 +469,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pColMatchInfo->pList);
|
size_t size = taosArrayGetSize(pColMatchInfo->pList);
|
||||||
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
|
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
|
||||||
QUERY_CHECK_NULL(pMatchInfo, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pMatchInfo, code, lino, _end, terrno);
|
||||||
|
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
|
SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
|
||||||
|
@ -480,10 +480,10 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
|
||||||
SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
|
SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
|
||||||
if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
void* tmp = taosArrayPush(pMatchInfo, pColInfo);
|
void* tmp = taosArrayPush(pMatchInfo, pColInfo);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
} else if (FUNCTION_TYPE_CACHE_LAST_ROW == pColInfo->funcType) {
|
} else if (FUNCTION_TYPE_CACHE_LAST_ROW == pColInfo->funcType) {
|
||||||
void* tmp = taosArrayPush(pMatchInfo, pColInfo);
|
void* tmp = taosArrayPush(pMatchInfo, pColInfo);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -100,7 +100,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc);
|
||||||
QUERY_CHECK_NULL(pResBlock, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
|
code = blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
|
@ -547,7 +547,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
|
||||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||||
|
|
||||||
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
|
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
|
||||||
QUERY_CHECK_NULL(pWrapper, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
|
||||||
pWrapper->exchangeId = pExchangeInfo->self;
|
pWrapper->exchangeId = pExchangeInfo->self;
|
||||||
pWrapper->sourceIndex = sourceIndex;
|
pWrapper->sourceIndex = sourceIndex;
|
||||||
|
|
||||||
|
@ -764,12 +764,12 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
|
||||||
if (pRetrieveRsp->compressed) { // decompress the data
|
if (pRetrieveRsp->compressed) { // decompress the data
|
||||||
if (pDataInfo->decompBuf == NULL) {
|
if (pDataInfo->decompBuf == NULL) {
|
||||||
pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
|
pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
|
||||||
QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
|
||||||
pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
|
pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
|
||||||
} else {
|
} else {
|
||||||
if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
|
if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
|
||||||
char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
|
char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
|
||||||
QUERY_CHECK_NULL(p, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
pDataInfo->decompBuf = p;
|
pDataInfo->decompBuf = p;
|
||||||
pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
|
pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
|
||||||
|
@ -787,7 +787,7 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
|
||||||
blockDataCleanup(pb);
|
blockDataCleanup(pb);
|
||||||
} else {
|
} else {
|
||||||
pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
|
pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
|
||||||
QUERY_CHECK_NULL(pb, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compLen = *(int32_t*)pStart;
|
int32_t compLen = *(int32_t*)pStart;
|
||||||
|
@ -811,7 +811,7 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
|
|
@ -521,7 +521,7 @@ static int32_t genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD
|
||||||
|
|
||||||
if (filterDigest[0]) {
|
if (filterDigest[0]) {
|
||||||
payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
|
payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
|
||||||
QUERY_CHECK_NULL(payload, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(payload, code, lino, _end, terrno);
|
||||||
memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
|
memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
|
||||||
len += tListLen(pContext->digest);
|
len += tListLen(pContext->digest);
|
||||||
}
|
}
|
||||||
|
@ -597,13 +597,13 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
||||||
}
|
}
|
||||||
|
|
||||||
pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
|
pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
|
||||||
QUERY_CHECK_NULL(pUidTagList, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
|
||||||
|
|
||||||
for (int32_t i = 0; i < rows; ++i) {
|
for (int32_t i = 0; i < rows; ++i) {
|
||||||
STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
STUidTagInfo info = {.uid = pkeyInfo->uid};
|
STUidTagInfo info = {.uid = pkeyInfo->uid};
|
||||||
void* tmp = taosArrayPush(pUidTagList, &info);
|
void* tmp = taosArrayPush(pUidTagList, &info);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
|
code = pAPI->metaFn.getTableTags(pVnode, pTableListInfo->idInfo.suid, pUidTagList);
|
||||||
|
@ -622,13 +622,13 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
||||||
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
|
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
|
||||||
|
|
||||||
pBlockList = taosArrayInit(2, POINTER_BYTES);
|
pBlockList = taosArrayInit(2, POINTER_BYTES);
|
||||||
QUERY_CHECK_NULL(pBlockList, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pBlockList, &pResBlock);
|
void* tmp = taosArrayPush(pBlockList, &pResBlock);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
|
||||||
|
|
||||||
groupData = taosArrayInit(2, POINTER_BYTES);
|
groupData = taosArrayInit(2, POINTER_BYTES);
|
||||||
QUERY_CHECK_NULL(groupData, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(groupData, code, lino, end, terrno);
|
||||||
|
|
||||||
FOREACH(pNode, group) {
|
FOREACH(pNode, group) {
|
||||||
SScalarParam output = {0};
|
SScalarParam output = {0};
|
||||||
|
@ -668,7 +668,7 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tmp = taosArrayPush(groupData, &output.columnData);
|
void* tmp = taosArrayPush(groupData, &output.columnData);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t keyLen = 0;
|
int32_t keyLen = 0;
|
||||||
|
@ -792,7 +792,7 @@ static SArray* getTableNameList(const SNodeListNode* pList) {
|
||||||
SListCell* cell = pList->pNodeList->pHead;
|
SListCell* cell = pList->pNodeList->pHead;
|
||||||
|
|
||||||
SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
|
SArray* pTbList = taosArrayInit(len, POINTER_BYTES);
|
||||||
QUERY_CHECK_NULL(pTbList, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pTbList, code, lino, _end, terrno);
|
||||||
|
|
||||||
for (int i = 0; i < pList->pNodeList->length; i++) {
|
for (int i = 0; i < pList->pNodeList->length; i++) {
|
||||||
SValueNode* valueNode = (SValueNode*)cell->pNode;
|
SValueNode* valueNode = (SValueNode*)cell->pNode;
|
||||||
|
@ -804,7 +804,7 @@ static SArray* getTableNameList(const SNodeListNode* pList) {
|
||||||
|
|
||||||
char* name = varDataVal(valueNode->datum.p);
|
char* name = varDataVal(valueNode->datum.p);
|
||||||
void* tmp = taosArrayPush(pTbList, &name);
|
void* tmp = taosArrayPush(pTbList, &name);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
cell = cell->pNext;
|
cell = cell->pNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -816,7 +816,7 @@ static SArray* getTableNameList(const SNodeListNode* pList) {
|
||||||
// remove the duplicates
|
// remove the duplicates
|
||||||
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
|
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
|
||||||
void* tmp = taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
|
void* tmp = taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
for (int32_t i = 1; i < numOfTables; ++i) {
|
for (int32_t i = 1; i < numOfTables; ++i) {
|
||||||
char** name = taosArrayGetLast(pNewList);
|
char** name = taosArrayGetLast(pNewList);
|
||||||
|
@ -826,7 +826,7 @@ static SArray* getTableNameList(const SNodeListNode* pList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tmp = taosArrayPush(pNewList, nameInOldList);
|
tmp = taosArrayPush(pNewList, nameInOldList);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
@ -1051,7 +1051,7 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||||
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
|
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
varDataSetLen(tmp, tagVal.nData);
|
varDataSetLen(tmp, tagVal.nData);
|
||||||
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
||||||
code = colDataSetVal(pColInfo, i, tmp, false);
|
code = colDataSetVal(pColInfo, i, tmp, false);
|
||||||
|
@ -1171,7 +1171,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
||||||
|
|
||||||
// int64_t stt = taosGetTimestampUs();
|
// int64_t stt = taosGetTimestampUs();
|
||||||
pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
|
pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
|
||||||
QUERY_CHECK_NULL(pUidTagList, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pUidTagList, code, lino, end, terrno);
|
||||||
|
|
||||||
code = copyExistedUids(pUidTagList, pUidList);
|
code = copyExistedUids(pUidTagList, pUidList);
|
||||||
QUERY_CHECK_CODE(code, lino, end);
|
QUERY_CHECK_CODE(code, lino, end);
|
||||||
|
@ -1189,7 +1189,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
|
STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
|
||||||
void* tmp = taosArrayPush(pUidList, &pInfo->uid);
|
void* tmp = taosArrayPush(pUidList, &pInfo->uid);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
|
||||||
}
|
}
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1219,10 +1219,10 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
||||||
// int64_t st1 = taosGetTimestampUs();
|
// int64_t st1 = taosGetTimestampUs();
|
||||||
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
|
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
|
||||||
pBlockList = taosArrayInit(2, POINTER_BYTES);
|
pBlockList = taosArrayInit(2, POINTER_BYTES);
|
||||||
QUERY_CHECK_NULL(pBlockList, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pBlockList, code, lino, end, terrno);
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pBlockList, &pResBlock);
|
void* tmp = taosArrayPush(pBlockList, &pResBlock);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, end, terrno);
|
||||||
|
|
||||||
code = createResultData(&type, numOfTables, &output);
|
code = createResultData(&type, numOfTables, &output);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1270,14 +1270,14 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
|
||||||
pListInfo->idInfo.tableType = pScanNode->tableType;
|
pListInfo->idInfo.tableType = pScanNode->tableType;
|
||||||
|
|
||||||
SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
|
SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
|
||||||
QUERY_CHECK_NULL(pUidList, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pUidList, code, lino, _error, terrno);
|
||||||
|
|
||||||
SIdxFltStatus status = SFLT_NOT_INDEX;
|
SIdxFltStatus status = SFLT_NOT_INDEX;
|
||||||
if (pScanNode->tableType != TSDB_SUPER_TABLE) {
|
if (pScanNode->tableType != TSDB_SUPER_TABLE) {
|
||||||
pListInfo->idInfo.uid = pScanNode->uid;
|
pListInfo->idInfo.uid = pScanNode->uid;
|
||||||
if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
|
if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) {
|
||||||
void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
|
void* tmp = taosArrayPush(pUidList, &pScanNode->uid);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
|
||||||
}
|
}
|
||||||
code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false, &listAdded);
|
code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false, &listAdded);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
@ -1334,7 +1334,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
|
||||||
if (tsTagFilterCache) {
|
if (tsTagFilterCache) {
|
||||||
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
|
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
|
||||||
char* pPayload = taosMemoryMalloc(size);
|
char* pPayload = taosMemoryMalloc(size);
|
||||||
QUERY_CHECK_NULL(pPayload, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pPayload, code, lino, _end, terrno);
|
||||||
|
|
||||||
*(int32_t*)pPayload = numOfTables;
|
*(int32_t*)pPayload = numOfTables;
|
||||||
if (numOfTables > 0) {
|
if (numOfTables > 0) {
|
||||||
|
@ -1531,7 +1531,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
||||||
c.isPk = pColNode->isPk;
|
c.isPk = pColNode->isPk;
|
||||||
c.dataType = pColNode->node.resType;
|
c.dataType = pColNode->node.resType;
|
||||||
void* tmp = taosArrayPush(pList, &c);
|
void* tmp = taosArrayPush(pList, &c);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1608,7 +1608,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
|
pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
|
||||||
QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pExp->pExpr, code, lino, _end, terrno);
|
||||||
|
|
||||||
pExp->pExpr->_function.num = 1;
|
pExp->pExpr->_function.num = 1;
|
||||||
pExp->pExpr->_function.functionId = -1;
|
pExp->pExpr->_function.functionId = -1;
|
||||||
|
@ -1620,7 +1620,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
||||||
SColumnNode* pColNode = (SColumnNode*)pNode;
|
SColumnNode* pColNode = (SColumnNode*)pNode;
|
||||||
|
|
||||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||||
|
|
||||||
pExp->base.numOfParams = 1;
|
pExp->base.numOfParams = 1;
|
||||||
|
|
||||||
|
@ -1635,7 +1635,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
||||||
SValueNode* pValNode = (SValueNode*)pNode;
|
SValueNode* pValNode = (SValueNode*)pNode;
|
||||||
|
|
||||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||||
|
|
||||||
pExp->base.numOfParams = 1;
|
pExp->base.numOfParams = 1;
|
||||||
|
|
||||||
|
@ -1898,9 +1898,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
||||||
|
|
||||||
pCtx->input.numOfInputCols = pFunct->numOfParams;
|
pCtx->input.numOfInputCols = pFunct->numOfParams;
|
||||||
pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
|
pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
|
||||||
QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pCtx->input.pData, code, lino, _end, terrno);
|
||||||
pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
|
pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
|
||||||
QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pCtx->input.pColumnDataAgg, code, lino, _end, terrno);
|
||||||
|
|
||||||
pCtx->pTsOutput = NULL;
|
pCtx->pTsOutput = NULL;
|
||||||
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
|
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
|
||||||
|
@ -2238,12 +2238,12 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
if (pTableList->map == NULL) {
|
if (pTableList->map == NULL) {
|
||||||
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
QUERY_CHECK_NULL(pTableList->map, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pTableList->map, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
|
STableKeyInfo keyInfo = {.uid = uid, .groupId = gid};
|
||||||
void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
|
void* tmp = taosArrayPush(pTableList->pTableList, &keyInfo);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
|
int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1;
|
||||||
code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
|
code = taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot));
|
||||||
|
|
|
@ -63,31 +63,31 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf
|
||||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
||||||
void* tmp = taosArrayPush(pInfo->pBlockLists, pReq);
|
void* tmp = taosArrayPush(pInfo->pBlockLists, pReq);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||||
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
void* tmp = taosArrayPush(pInfo->pBlockLists, &input);
|
void* tmp = taosArrayPush(pInfo->pBlockLists, &input);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK) {
|
} else if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
||||||
SPackedData tmp = {.pDataBlock = pDataBlock};
|
SPackedData tmp = {.pDataBlock = pDataBlock};
|
||||||
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
|
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||||
QUERY_CHECK_NULL(tmpItem, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT) {
|
} else if (type == STREAM_INPUT__CHECKPOINT) {
|
||||||
SPackedData tmp = {.pDataBlock = input};
|
SPackedData tmp = {.pDataBlock = input};
|
||||||
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
|
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||||
QUERY_CHECK_NULL(tmpItem, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
|
||||||
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
|
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
|
||||||
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
||||||
void* tmp = taosArrayPush(pInfo->pBlockLists, pReq);
|
void* tmp = taosArrayPush(pInfo->pBlockLists, pReq);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||||
}
|
}
|
||||||
|
@ -164,13 +164,13 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
|
||||||
void* tmp = taosArrayPush(pInfo->pBlockLists, pReq);
|
void* tmp = taosArrayPush(pInfo->pBlockLists, pReq);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||||
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
void* tmp = taosArrayPush(pInfo->pBlockLists, input);
|
void* tmp = taosArrayPush(pInfo->pBlockLists, input);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK) {
|
} else if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
|
@ -178,14 +178,14 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
|
||||||
SPackedData tmp = {.pDataBlock = pDataBlock};
|
SPackedData tmp = {.pDataBlock = pDataBlock};
|
||||||
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
|
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||||
QUERY_CHECK_NULL(tmpItem, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
} else if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||||
SPackedData tmp = {.pDataBlock = input};
|
SPackedData tmp = {.pDataBlock = input};
|
||||||
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
|
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
|
||||||
QUERY_CHECK_NULL(tmpItem, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
|
||||||
|
|
||||||
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
|
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
|
||||||
} else {
|
} else {
|
||||||
|
@ -435,7 +435,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
||||||
|
|
||||||
// handle multiple partition
|
// handle multiple partition
|
||||||
void* tmp = taosArrayPush(qa, id);
|
void* tmp = taosArrayPush(qa, id);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
pAPI->metaReaderFn.clearReader(&mr);
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
|
@ -698,7 +698,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
|
if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
|
||||||
SSDataBlock* p1 = createOneDataBlock(pRes, true);
|
SSDataBlock* p1 = createOneDataBlock(pRes, true);
|
||||||
void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
|
void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
p = p1;
|
p = p1;
|
||||||
} else {
|
} else {
|
||||||
p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
|
p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
|
||||||
|
@ -711,7 +711,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
|
||||||
current += p->info.rows;
|
current += p->info.rows;
|
||||||
ASSERT(p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT);
|
ASSERT(p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT);
|
||||||
void* tmp = taosArrayPush(pResList, &p);
|
void* tmp = taosArrayPush(pResList, &p);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
if (current >= rowsThreshold) {
|
if (current >= rowsThreshold) {
|
||||||
break;
|
break;
|
||||||
|
@ -1464,13 +1464,13 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
|
||||||
STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
|
STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
|
||||||
|
|
||||||
SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
|
SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
|
||||||
QUERY_CHECK_NULL(pUidList, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
|
||||||
|
|
||||||
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
int32_t numOfTables = tableListGetSize(pTableListInfo);
|
||||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
|
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||||
void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
|
void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(plist);
|
taosArrayDestroy(plist);
|
||||||
|
@ -1492,11 +1492,11 @@ static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
|
||||||
SStreamScanInfo* pScanInfo = pOperator->info;
|
SStreamScanInfo* pScanInfo = pOperator->info;
|
||||||
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
|
||||||
void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
|
void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
|
void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
} else {
|
} else {
|
||||||
if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
|
if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
|
||||||
extractTableList(pList, pOperator->pDownstream[0]);
|
extractTableList(pList, pOperator->pDownstream[0]);
|
||||||
|
|
|
@ -267,7 +267,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
|
||||||
SColumnInfoData* pColInfo = NULL;
|
SColumnInfoData* pColInfo = NULL;
|
||||||
if (pInput->pData[paramIndex] == NULL) {
|
if (pInput->pData[paramIndex] == NULL) {
|
||||||
pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
||||||
QUERY_CHECK_NULL(pColInfo, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
|
||||||
|
|
||||||
// Set the correct column info (data type and bytes)
|
// Set the correct column info (data type and bytes)
|
||||||
pColInfo->info.type = pFuncParam->param.nType;
|
pColInfo->info.type = pFuncParam->param.nType;
|
||||||
|
|
|
@ -753,13 +753,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
if (pGroupInfo->blockForNotLoaded == NULL) {
|
if (pGroupInfo->blockForNotLoaded == NULL) {
|
||||||
pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*));
|
pGroupInfo->blockForNotLoaded = taosArrayInit(0, sizeof(SSDataBlock*));
|
||||||
QUERY_CHECK_NULL(pGroupInfo->blockForNotLoaded, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pGroupInfo->blockForNotLoaded, code, lino, _end, terrno);
|
||||||
pGroupInfo->offsetForNotLoaded = 0;
|
pGroupInfo->offsetForNotLoaded = 0;
|
||||||
}
|
}
|
||||||
dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
|
dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
|
||||||
dataNotLoadBlock->info.dataLoad = 0;
|
dataNotLoadBlock->info.dataLoad = 0;
|
||||||
void* tmp = taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock);
|
void* tmp = taosArrayPush(pGroupInfo->blockForNotLoaded, &dataNotLoadBlock);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -780,7 +780,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
||||||
if (p == NULL) { // it is a new group
|
if (p == NULL) { // it is a new group
|
||||||
SDataGroupInfo gi = {0};
|
SDataGroupInfo gi = {0};
|
||||||
gi.pPageList = taosArrayInit(100, sizeof(int32_t));
|
gi.pPageList = taosArrayInit(100, sizeof(int32_t));
|
||||||
QUERY_CHECK_NULL(gi.pPageList, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(gi.pPageList, code, lino, _end, terrno);
|
||||||
|
|
||||||
code = taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
|
code = taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
|
||||||
if (code == TSDB_CODE_DUP_KEY) {
|
if (code == TSDB_CODE_DUP_KEY) {
|
||||||
|
@ -797,7 +797,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tmp = taosArrayPush(p->pPageList, &pageId);
|
void* tmp = taosArrayPush(p->pPageList, &pageId);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
*(int32_t*)pPage = 0;
|
*(int32_t*)pPage = 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -822,7 +822,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tmp = taosArrayPush(p->pPageList, &pageId);
|
void* tmp = taosArrayPush(p->pPageList, &pageId);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
|
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
|
||||||
}
|
}
|
||||||
|
@ -1030,13 +1030,13 @@ static int32_t hashPartitionNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
|
SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
|
||||||
QUERY_CHECK_NULL(groupArray, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(groupArray, code, lino, _end, terrno);
|
||||||
|
|
||||||
void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
|
void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
|
||||||
while (pGroupIter != NULL) {
|
while (pGroupIter != NULL) {
|
||||||
SDataGroupInfo* pGroupInfo = pGroupIter;
|
SDataGroupInfo* pGroupInfo = pGroupIter;
|
||||||
void* tmp = taosArrayPush(groupArray, pGroupInfo);
|
void* tmp = taosArrayPush(groupArray, pGroupInfo);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
|
pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1129,7 +1129,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order);
|
void* tmp = taosArrayPush(pInfo->pOrderInfoArr, &order);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pPartNode->pExprs != NULL) {
|
if (pPartNode->pExprs != NULL) {
|
||||||
|
@ -1409,13 +1409,13 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
|
||||||
(SPartitionDataInfo*)taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
|
(SPartitionDataInfo*)taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
|
||||||
if (pParData) {
|
if (pParData) {
|
||||||
void* tmp = taosArrayPush(pParData->rowIds, &i);
|
void* tmp = taosArrayPush(pParData->rowIds, &i);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
} else {
|
} else {
|
||||||
SPartitionDataInfo newParData = {0};
|
SPartitionDataInfo newParData = {0};
|
||||||
newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
|
newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
|
||||||
newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
|
newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
|
||||||
void* tmp = taosArrayPush(newParData.rowIds, &i);
|
void* tmp = taosArrayPush(newParData.rowIds, &i);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
code =
|
code =
|
||||||
taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
|
taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
|
||||||
|
@ -1594,7 +1594,7 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
|
||||||
pBlock->info.rowSize += infoData.info.bytes;
|
pBlock->info.rowSize += infoData.info.bytes;
|
||||||
// sub table name
|
// sub table name
|
||||||
void* tmp = taosArrayPush(pBlock->pDataBlock, &infoData);
|
void* tmp = taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
SColumnInfoData gpIdData = {0};
|
SColumnInfoData gpIdData = {0};
|
||||||
gpIdData.info.type = TSDB_DATA_TYPE_UBIGINT;
|
gpIdData.info.type = TSDB_DATA_TYPE_UBIGINT;
|
||||||
|
@ -1602,7 +1602,7 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
|
||||||
pBlock->info.rowSize += gpIdData.info.bytes;
|
pBlock->info.rowSize += gpIdData.info.bytes;
|
||||||
// group id
|
// group id
|
||||||
tmp = taosArrayPush(pBlock->pDataBlock, &gpIdData);
|
tmp = taosArrayPush(pBlock->pDataBlock, &gpIdData);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
for (int32_t i = 0; i < tag->numOfExprs; i++) {
|
for (int32_t i = 0; i < tag->numOfExprs; i++) {
|
||||||
SColumnInfoData tagCol = {0};
|
SColumnInfoData tagCol = {0};
|
||||||
|
@ -1611,7 +1611,7 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
|
||||||
tagCol.info.precision = tag->pExprInfo[i].base.resSchema.precision;
|
tagCol.info.precision = tag->pExprInfo[i].base.resSchema.precision;
|
||||||
// tag info
|
// tag info
|
||||||
tmp = taosArrayPush(pBlock->pDataBlock, &tagCol);
|
tmp = taosArrayPush(pBlock->pDataBlock, &tagCol);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
pBlock->info.rowSize += tagCol.info.bytes;
|
pBlock->info.rowSize += tagCol.info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1652,7 +1652,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
pInfo->tbnameCalSup.numOfExprs = 0;
|
pInfo->tbnameCalSup.numOfExprs = 0;
|
||||||
if (pPartNode->pSubtable != NULL) {
|
if (pPartNode->pSubtable != NULL) {
|
||||||
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
|
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
|
||||||
QUERY_CHECK_NULL(pSubTableExpr, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pSubTableExpr, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
|
pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
|
||||||
code = createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0);
|
code = createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0);
|
||||||
|
@ -1666,7 +1666,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
if (pPartNode->pTags != NULL) {
|
if (pPartNode->pTags != NULL) {
|
||||||
int32_t numOfTags;
|
int32_t numOfTags;
|
||||||
SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags);
|
SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags);
|
||||||
QUERY_CHECK_NULL(pTagExpr, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pTagExpr, code, lino, _error, terrno);
|
||||||
|
|
||||||
code = initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore);
|
code = initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
@ -1674,7 +1674,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
|
|
||||||
if (pInfo->tbnameCalSup.numOfExprs != 0 || pInfo->tagCalSup.numOfExprs != 0) {
|
if (pInfo->tbnameCalSup.numOfExprs != 0 || pInfo->tagCalSup.numOfExprs != 0) {
|
||||||
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
|
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
|
||||||
QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, terrno);
|
||||||
} else {
|
} else {
|
||||||
pInfo->pCreateTbRes = NULL;
|
pInfo->pCreateTbRes = NULL;
|
||||||
}
|
}
|
||||||
|
@ -1687,7 +1687,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
pInfo->partitionSup.needCalc = true;
|
pInfo->partitionSup.needCalc = true;
|
||||||
|
|
||||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc);
|
pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc);
|
||||||
QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(pInfo->binfo.pRes, 4096);
|
code = blockDataEnsureCapacity(pInfo->binfo.pRes, 4096);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
@ -1701,7 +1701,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
taosHashSetFreeFp(pInfo->pPartitions, freePartItem);
|
taosHashSetFreeFp(pInfo->pPartitions, freePartItem);
|
||||||
pInfo->tsColIndex = 0;
|
pInfo->tsColIndex = 0;
|
||||||
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
|
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
|
||||||
QUERY_CHECK_NULL(pInfo->pDelRes, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pDelRes, code, lino, _error, terrno);
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
|
||||||
|
@ -1750,7 +1750,7 @@ int32_t extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes) {
|
||||||
|
|
||||||
SColumn c = extractColumnFromColumnNode(pColNode);
|
SColumn c = extractColumnFromColumnNode(pColNode);
|
||||||
void* tmp = taosArrayPush(pList, &c);
|
void* tmp = taosArrayPush(pList, &c);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
} else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
|
} else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
|
||||||
SValueNode* pValNode = (SValueNode*)pNode->pExpr;
|
SValueNode* pValNode = (SValueNode*)pNode->pExpr;
|
||||||
SColumn c = {0};
|
SColumn c = {0};
|
||||||
|
@ -1762,7 +1762,7 @@ int32_t extractColumnInfo(SNodeList* pNodeList, SArray** pArrayRes) {
|
||||||
c.precision = pValNode->node.resType.precision;
|
c.precision = pValNode->node.resType.precision;
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pList, &c);
|
void* tmp = taosArrayPush(pList, &c);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1725,7 +1725,7 @@ static int32_t doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t ts
|
||||||
|
|
||||||
if (pInfo->partitionSup.needCalc) {
|
if (pInfo->partitionSup.needCalc) {
|
||||||
SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
|
SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
|
||||||
QUERY_CHECK_NULL(tmpBlock, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmpBlock, code, lino, _end, terrno);
|
||||||
|
|
||||||
blockDataCleanup(pResult);
|
blockDataCleanup(pResult);
|
||||||
for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
|
for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
|
||||||
|
@ -2549,7 +2549,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
pBlockInfo->id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
|
||||||
|
|
||||||
SArray* pColList = taosArrayInit(4, sizeof(int32_t));
|
SArray* pColList = taosArrayInit(4, sizeof(int32_t));
|
||||||
QUERY_CHECK_NULL(pColList, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pColList, code, lino, _end, terrno);
|
||||||
|
|
||||||
// todo extract method
|
// todo extract method
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||||
|
@ -2568,7 +2568,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
|
|
||||||
colExists = true;
|
colExists = true;
|
||||||
void* tmp = taosArrayPush(pColList, &pColMatchInfo->dstSlotId);
|
void* tmp = taosArrayPush(pColList, &pColMatchInfo->dstSlotId);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2578,7 +2578,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
|
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
|
||||||
colDataSetNNULL(pDst, 0, pBlockInfo->rows);
|
colDataSetNNULL(pDst, 0, pBlockInfo->rows);
|
||||||
void* tmp = taosArrayPush(pColList, &pColMatchInfo->dstSlotId);
|
void* tmp = taosArrayPush(pColList, &pColMatchInfo->dstSlotId);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2597,7 +2597,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
|
|
||||||
for (int32_t i = 0; i < pInfo->numOfPseudoExpr; ++i) {
|
for (int32_t i = 0; i < pInfo->numOfPseudoExpr; ++i) {
|
||||||
void* tmp = taosArrayPush(pColList, &pInfo->pPseudoExpr[i].base.resSchema.slotId);
|
void* tmp = taosArrayPush(pColList, &pInfo->pPseudoExpr[i].base.resSchema.slotId);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3405,7 +3405,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
|
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
|
||||||
void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid);
|
void* tmp = taosArrayPush(tableIdList, &pkeyInfo->uid);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
(*ppArrayRes) = tableIdList;
|
(*ppArrayRes) = tableIdList;
|
||||||
|
@ -3500,10 +3500,10 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
tmpMetaRsp.metaRsp = data;
|
tmpMetaRsp.metaRsp = data;
|
||||||
if (!pTaskInfo->streamInfo.btMetaRsp.batchMetaReq) {
|
if (!pTaskInfo->streamInfo.btMetaRsp.batchMetaReq) {
|
||||||
pTaskInfo->streamInfo.btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
|
pTaskInfo->streamInfo.btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES);
|
||||||
QUERY_CHECK_NULL(pTaskInfo->streamInfo.btMetaRsp.batchMetaReq, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pTaskInfo->streamInfo.btMetaRsp.batchMetaReq, code, lino, _end, terrno);
|
||||||
|
|
||||||
pTaskInfo->streamInfo.btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
|
pTaskInfo->streamInfo.btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t));
|
||||||
QUERY_CHECK_NULL(pTaskInfo->streamInfo.btMetaRsp.batchMetaLen, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pTaskInfo->streamInfo.btMetaRsp.batchMetaLen, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
int32_t tempRes = TSDB_CODE_SUCCESS;
|
int32_t tempRes = TSDB_CODE_SUCCESS;
|
||||||
uint32_t len = 0;
|
uint32_t len = 0;
|
||||||
|
@ -3516,7 +3516,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
|
||||||
int32_t tLen = sizeof(SMqRspHead) + len;
|
int32_t tLen = sizeof(SMqRspHead) + len;
|
||||||
void* tBuf = taosMemoryCalloc(1, tLen);
|
void* tBuf = taosMemoryCalloc(1, tLen);
|
||||||
QUERY_CHECK_NULL(tBuf, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tBuf, code, lino, _end, terrno);
|
||||||
|
|
||||||
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
|
void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
|
@ -3531,10 +3531,10 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(data);
|
taosMemoryFreeClear(data);
|
||||||
void* tmp = taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaReq, &tBuf);
|
void* tmp = taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaReq, &tBuf);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
tmp = taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaLen, &tLen);
|
tmp = taosArrayPush(pTaskInfo->streamInfo.btMetaRsp.batchMetaLen, &tLen);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(*ppRes) = NULL;
|
(*ppRes) = NULL;
|
||||||
|
@ -3741,7 +3741,7 @@ int32_t addPrimaryKeyCol(SSDataBlock* pBlock, uint8_t type, int32_t bytes) {
|
||||||
infoData.info.type = type;
|
infoData.info.type = type;
|
||||||
infoData.info.bytes = bytes;
|
infoData.info.bytes = bytes;
|
||||||
void* tmp = taosArrayPush(pBlock->pDataBlock, &infoData);
|
void* tmp = taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -3788,7 +3788,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
|
|
||||||
int16_t colId = id->colId;
|
int16_t colId = id->colId;
|
||||||
void* tmp = taosArrayPush(pColIds, &colId);
|
void* tmp = taosArrayPush(pColIds, &colId);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _error, terrno);
|
||||||
|
|
||||||
if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
pInfo->primaryTsIndex = id->dstSlotId;
|
pInfo->primaryTsIndex = id->dstSlotId;
|
||||||
|
@ -3880,7 +3880,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
pInfo->readHandle = *pHandle;
|
pInfo->readHandle = *pHandle;
|
||||||
pTaskInfo->streamInfo.snapshotVer = pHandle->version;
|
pTaskInfo->streamInfo.snapshotVer = pHandle->version;
|
||||||
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
|
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
|
||||||
QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pCreateTbRes, code, lino, _error, terrno);
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
|
code = blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
@ -4111,7 +4111,7 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
|
||||||
.type = pSColumnNode->node.resType.type,
|
.type = pSColumnNode->node.resType.type,
|
||||||
.bytes = pSColumnNode->node.resType.bytes};
|
.bytes = pSColumnNode->node.resType.bytes};
|
||||||
void* tmp = taosArrayPush(pCtx->cInfoList, &cInfo);
|
void* tmp = taosArrayPush(pCtx->cInfoList, &cInfo);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
} else {
|
} else {
|
||||||
SColumnNode* col = *(SColumnNode**)data;
|
SColumnNode* col = *(SColumnNode**)data;
|
||||||
pSColumnNode->slotId = col->slotId;
|
pSColumnNode->slotId = col->slotId;
|
||||||
|
@ -4132,13 +4132,13 @@ static int32_t tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray*
|
||||||
int32_t numOfTables = taosArrayGetSize(aUidTags);
|
int32_t numOfTables = taosArrayGetSize(aUidTags);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
|
SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
|
||||||
QUERY_CHECK_NULL(pResBlock, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pResBlock, code, lino, _end, terrno);
|
||||||
|
|
||||||
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES);
|
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES);
|
||||||
QUERY_CHECK_NULL(pBlockList, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pBlockList, code, lino, _end, terrno);
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pBlockList, &pResBlock);
|
void* tmp = taosArrayPush(pBlockList, &pResBlock);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||||
|
|
||||||
|
@ -4153,7 +4153,7 @@ static int32_t tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray*
|
||||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
if (result[i]) {
|
if (result[i]) {
|
||||||
void* tmp = taosArrayPush(aFilterIdxs, &i);
|
void* tmp = taosArrayPush(aFilterIdxs, &i);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4203,7 +4203,7 @@ static int32_t tagScanFillOneCellWithTag(SOperatorInfo* pOperator, const STUidTa
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||||
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
|
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
varDataSetLen(tmp, tagVal.nData);
|
varDataSetLen(tmp, tagVal.nData);
|
||||||
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
||||||
|
@ -4274,7 +4274,7 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
|
||||||
|
|
||||||
if (pInfo->pCtbCursor == NULL) {
|
if (pInfo->pCtbCursor == NULL) {
|
||||||
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
|
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
|
||||||
QUERY_CHECK_NULL(pInfo->pCtbCursor, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pCtbCursor, code, lino, _end, terrno);
|
||||||
} else {
|
} else {
|
||||||
code = pAPI->metaFn.resumeCtbCursor(pInfo->pCtbCursor, 0);
|
code = pAPI->metaFn.resumeCtbCursor(pInfo->pCtbCursor, 0);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
@ -4300,7 +4300,7 @@ static int32_t doTagScanFromCtbIdxNext(SOperatorInfo* pOperator, SSDataBlock** p
|
||||||
info.pTagVal = taosMemoryMalloc(pCur->vLen);
|
info.pTagVal = taosMemoryMalloc(pCur->vLen);
|
||||||
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
|
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
|
||||||
void* tmp = taosArrayPush(aUidTags, &info);
|
void* tmp = taosArrayPush(aUidTags, &info);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
++numTables;
|
++numTables;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4463,7 +4463,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
|
|
||||||
pInfo->pTableListInfo = pTableListInfo;
|
pInfo->pTableListInfo = pTableListInfo;
|
||||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->readHandle = *pReadHandle;
|
pInfo->readHandle = *pReadHandle;
|
||||||
pInfo->curPos = 0;
|
pInfo->curPos = 0;
|
||||||
|
@ -4477,17 +4477,17 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
|
|
||||||
if (pTagScanNode->onlyMetaCtbIdx) {
|
if (pTagScanNode->onlyMetaCtbIdx) {
|
||||||
pInfo->aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
|
pInfo->aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
|
||||||
QUERY_CHECK_NULL(pInfo->aUidTags, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->aUidTags, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
|
pInfo->aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
|
||||||
QUERY_CHECK_NULL(pInfo->aFilterIdxs, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->aFilterIdxs, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->filterCtx.colHash =
|
pInfo->filterCtx.colHash =
|
||||||
taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||||
QUERY_CHECK_NULL(pInfo->filterCtx.colHash, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->filterCtx.colHash, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->filterCtx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
|
pInfo->filterCtx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
|
||||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||||
|
|
||||||
if (pInfo->pTagCond != NULL) {
|
if (pInfo->pTagCond != NULL) {
|
||||||
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx);
|
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx);
|
||||||
|
@ -4563,7 +4563,7 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
|
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
|
||||||
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
|
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
|
||||||
QUERY_CHECK_NULL(dst->colList, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(dst->colList, code, lino, _end, terrno);
|
||||||
for (int i = 0; i < src->numOfCols; i++) {
|
for (int i = 0; i < src->numOfCols; i++) {
|
||||||
dst->colList[i] = src->colList[i];
|
dst->colList[i] = src->colList[i];
|
||||||
}
|
}
|
||||||
|
@ -5038,7 +5038,7 @@ static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeOpInfo) {
|
||||||
if (pInfo->mSkipTables == NULL) {
|
if (pInfo->mSkipTables == NULL) {
|
||||||
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
|
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
|
||||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
QUERY_CHECK_NULL(pInfo->mSkipTables, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->mSkipTables, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
int bSkip = 1;
|
int bSkip = 1;
|
||||||
if (pInfo->mSkipTables != NULL) {
|
if (pInfo->mSkipTables != NULL) {
|
||||||
|
@ -5209,10 +5209,10 @@ int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppS
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pSortInfo, &biTs);
|
void* tmp = taosArrayPush(pSortInfo, &biTs);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
if (pkTargetSlotId != -1) {
|
if (pkTargetSlotId != -1) {
|
||||||
tmp = taosArrayPush(pSortInfo, &biPk);
|
tmp = taosArrayPush(pSortInfo, &biPk);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
(*ppSortArray) = pSortInfo;
|
(*ppSortArray) = pSortInfo;
|
||||||
|
@ -5625,7 +5625,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
|
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||||
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pResBlock, code, lino, _error, terrno);
|
||||||
code = blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
code = blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
if (!hasLimit && blockDataGetRowSize(pInfo->pResBlock) >= 256 && !pTableScanNode->smallDataTsSort) {
|
if (!hasLimit && blockDataGetRowSize(pInfo->pResBlock) >= 256 && !pTableScanNode->smallDataTsSort) {
|
||||||
|
@ -5640,7 +5640,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
code = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order, &pInfo->pSortInfo);
|
code = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order, &pInfo->pSortInfo);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
QUERY_CHECK_NULL(pInfo->pReaderBlock, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pReaderBlock, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
|
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
|
||||||
|
|
||||||
|
@ -5807,7 +5807,7 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
|
||||||
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
|
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 1);
|
initResultSizeInfo(&pOperator->resultInfo, 1);
|
||||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||||
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
@ -6030,7 +6030,7 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca
|
||||||
if (pSupp->groupByStbName) {
|
if (pSupp->groupByStbName) {
|
||||||
if (pInfo->stbUidList == NULL) {
|
if (pInfo->stbUidList == NULL) {
|
||||||
pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
|
pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
|
||||||
QUERY_CHECK_NULL(pInfo->stbUidList, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->stbUidList, code, lino, _end, terrno);
|
||||||
code = pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList);
|
code = pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
|
@ -754,7 +754,7 @@ void streamCountReleaseState(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
int32_t resSize = sizeof(TSKEY);
|
int32_t resSize = sizeof(TSKEY);
|
||||||
char* pBuff = taosMemoryCalloc(1, resSize);
|
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||||
QUERY_CHECK_NULL(pBuff, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
|
||||||
|
|
||||||
memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||||
qDebug("===stream=== count window operator relase state. ");
|
qDebug("===stream=== count window operator relase state. ");
|
||||||
|
|
|
@ -694,7 +694,7 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
||||||
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pAllUpdated = tSimpleHashInit(64, hashFn);
|
pInfo->pAllUpdated = tSimpleHashInit(64, hashFn);
|
||||||
QUERY_CHECK_NULL(pInfo->pAllUpdated, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pAllUpdated, code, lino, _end, terrno);
|
||||||
|
|
||||||
code = getMaxTsWins(pHisWins, pInfo->historyWins);
|
code = getMaxTsWins(pHisWins, pInfo->historyWins);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
@ -915,18 +915,18 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
|
|
||||||
if (pInfo->isHistoryOp) {
|
if (pInfo->isHistoryOp) {
|
||||||
pInfo->pAllUpdated = tSimpleHashInit(64, hashFn);
|
pInfo->pAllUpdated = tSimpleHashInit(64, hashFn);
|
||||||
QUERY_CHECK_NULL(pInfo->pAllUpdated, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pAllUpdated, code, lino, _error, terrno);
|
||||||
} else {
|
} else {
|
||||||
pInfo->pAllUpdated = NULL;
|
pInfo->pAllUpdated = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||||
QUERY_CHECK_NULL(pInfo->pCheckpointRes, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pCheckpointRes, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->reCkBlock = false;
|
pInfo->reCkBlock = false;
|
||||||
pInfo->recvGetAll = false;
|
pInfo->recvGetAll = false;
|
||||||
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
|
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
|
||||||
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
|
||||||
pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimayKey;
|
pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimayKey;
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
|
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
|
||||||
|
|
|
@ -1055,7 +1055,7 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
|
||||||
char* tagJson = NULL;
|
char* tagJson = NULL;
|
||||||
parseTagDatatoJson(tagData, &tagJson);
|
parseTagDatatoJson(tagData, &tagJson);
|
||||||
tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
|
tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
|
||||||
QUERY_CHECK_NULL(tagVarChar, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tagVarChar, code, lino, _end, terrno);
|
||||||
memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
|
memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
|
||||||
varDataSetLen(tagVarChar, strlen(tagJson));
|
varDataSetLen(tagVarChar, strlen(tagJson));
|
||||||
taosMemoryFree(tagJson);
|
taosMemoryFree(tagJson);
|
||||||
|
@ -1063,7 +1063,7 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
|
||||||
int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
|
int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
|
||||||
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
|
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
|
||||||
tagVarChar = taosMemoryCalloc(1, bufSize + 1);
|
tagVarChar = taosMemoryCalloc(1, bufSize + 1);
|
||||||
QUERY_CHECK_NULL(tagVarChar, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tagVarChar, code, lino, _end, terrno);
|
||||||
int32_t len = -1;
|
int32_t len = -1;
|
||||||
if (tagLen > 0)
|
if (tagLen > 0)
|
||||||
convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
|
convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
|
||||||
|
@ -1258,7 +1258,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
|
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
|
||||||
QUERY_CHECK_NULL(p, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(p, capacity);
|
code = blockDataEnsureCapacity(p, capacity);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
@ -1542,7 +1542,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
||||||
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
varDataSetLen(dbname, strlen(varDataVal(dbname)));
|
||||||
|
|
||||||
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
|
SSDataBlock* p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_TABLES);
|
||||||
QUERY_CHECK_NULL(p, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
|
|
||||||
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
@ -2362,7 +2362,7 @@ int32_t optSysIntersection(SArray* in, SArray* out) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
MergeIndex* mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
|
MergeIndex* mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
|
||||||
QUERY_CHECK_NULL(mi, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(mi, code, lino, _end, terrno);
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
SArray* t = taosArrayGetP(in, i);
|
SArray* t = taosArrayGetP(in, i);
|
||||||
mi[i].len = (int32_t)taosArrayGetSize(t);
|
mi[i].len = (int32_t)taosArrayGetSize(t);
|
||||||
|
@ -2591,7 +2591,7 @@ static int32_t doBlockInfoScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes
|
||||||
|
|
||||||
int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
|
int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo);
|
||||||
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
|
||||||
QUERY_CHECK_NULL(p, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(p, code, lino, _end, terrno);
|
||||||
|
|
||||||
int32_t tempRes = tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
int32_t tempRes = tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo);
|
||||||
if (tempRes < 0) {
|
if (tempRes < 0) {
|
||||||
|
|
|
@ -296,19 +296,19 @@ static int32_t initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
|
||||||
SGroupKeys key = {0};
|
SGroupKeys key = {0};
|
||||||
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
|
SResSchema* pSchema = &pCol->pExpr->base.resSchema;
|
||||||
key.pData = taosMemoryMalloc(pSchema->bytes);
|
key.pData = taosMemoryMalloc(pSchema->bytes);
|
||||||
QUERY_CHECK_NULL(key.pData, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
|
||||||
key.isNull = true;
|
key.isNull = true;
|
||||||
key.bytes = pSchema->bytes;
|
key.bytes = pSchema->bytes;
|
||||||
key.type = pSchema->type;
|
key.type = pSchema->type;
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pFillInfo->next.pRowVal, &key);
|
void* tmp = taosArrayPush(pFillInfo->next.pRowVal, &key);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
key.pData = taosMemoryMalloc(pSchema->bytes);
|
key.pData = taosMemoryMalloc(pSchema->bytes);
|
||||||
QUERY_CHECK_NULL(key.pData, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
|
||||||
|
|
||||||
tmp = taosArrayPush(pFillInfo->prev.pRowVal, &key);
|
tmp = taosArrayPush(pFillInfo->prev.pRowVal, &key);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
@ -522,7 +522,7 @@ void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillC
|
||||||
}
|
}
|
||||||
|
|
||||||
SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
|
SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
|
||||||
QUERY_CHECK_NULL(pFillInfo, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pFillInfo, code, lino, _end, terrno);
|
||||||
|
|
||||||
pFillInfo->order = order;
|
pFillInfo->order = order;
|
||||||
pFillInfo->srcTsSlotId = primaryTsSlotId;
|
pFillInfo->srcTsSlotId = primaryTsSlotId;
|
||||||
|
@ -546,10 +546,10 @@ void taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillC
|
||||||
pFillInfo->interval = *pInterval;
|
pFillInfo->interval = *pInterval;
|
||||||
|
|
||||||
pFillInfo->next.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
|
pFillInfo->next.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
|
||||||
QUERY_CHECK_NULL(pFillInfo->next.pRowVal, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pFillInfo->next.pRowVal, code, lino, _end, terrno);
|
||||||
|
|
||||||
pFillInfo->prev.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
|
pFillInfo->prev.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
|
||||||
QUERY_CHECK_NULL(pFillInfo->prev.pRowVal, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pFillInfo->prev.pRowVal, code, lino, _end, terrno);
|
||||||
|
|
||||||
code = initBeforeAfterDataBuf(pFillInfo);
|
code = initBeforeAfterDataBuf(pFillInfo);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
|
@ -435,7 +435,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
}
|
}
|
||||||
|
|
||||||
current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
|
current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
|
||||||
QUERY_CHECK_NULL(current.val, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(current.val, code, lino, _end, terrno);
|
||||||
taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type);
|
taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type);
|
||||||
code = colDataSetVal(pDst, rows, (char*)current.val, false);
|
code = colDataSetVal(pDst, rows, (char*)current.val, false);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
@ -558,9 +558,9 @@ static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
|
||||||
key.type = pColInfo->info.type;
|
key.type = pColInfo->info.type;
|
||||||
key.isNull = false;
|
key.isNull = false;
|
||||||
key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
|
key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
|
||||||
QUERY_CHECK_NULL(key.pData, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
|
||||||
void* tmp = taosArrayPush(pInfo->pPrevRow, &key);
|
void* tmp = taosArrayPush(pInfo->pPrevRow, &key);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->isPrevRowSet = false;
|
pInfo->isPrevRowSet = false;
|
||||||
|
@ -593,10 +593,10 @@ static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
|
||||||
key.type = pColInfo->info.type;
|
key.type = pColInfo->info.type;
|
||||||
key.isNull = false;
|
key.isNull = false;
|
||||||
key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
|
key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
|
||||||
QUERY_CHECK_NULL(key.pData, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
|
||||||
|
|
||||||
void* tmp = taosArrayPush(pInfo->pNextRow, &key);
|
void* tmp = taosArrayPush(pInfo->pNextRow, &key);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->isNextRowSet = false;
|
pInfo->isNextRowSet = false;
|
||||||
|
@ -628,16 +628,16 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
|
||||||
linearInfo.start.key = INT64_MIN;
|
linearInfo.start.key = INT64_MIN;
|
||||||
linearInfo.end.key = INT64_MIN;
|
linearInfo.end.key = INT64_MIN;
|
||||||
linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
|
linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
|
||||||
QUERY_CHECK_NULL(linearInfo.start.val, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(linearInfo.start.val, code, lino, _end, terrno);
|
||||||
|
|
||||||
linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
|
linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
|
||||||
QUERY_CHECK_NULL(linearInfo.end.val, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(linearInfo.end.val, code, lino, _end, terrno);
|
||||||
linearInfo.isStartSet = false;
|
linearInfo.isStartSet = false;
|
||||||
linearInfo.isEndSet = false;
|
linearInfo.isEndSet = false;
|
||||||
linearInfo.type = pColInfo->info.type;
|
linearInfo.type = pColInfo->info.type;
|
||||||
linearInfo.bytes = pColInfo->info.bytes;
|
linearInfo.bytes = pColInfo->info.bytes;
|
||||||
void* tmp = taosArrayPush(pInfo->pLinearInfo, &linearInfo);
|
void* tmp = taosArrayPush(pInfo->pLinearInfo, &linearInfo);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
|
|
@ -1210,10 +1210,10 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
|
||||||
|
|
||||||
if (needed) {
|
if (needed) {
|
||||||
pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
|
pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
|
||||||
QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pInterpCols, code, lino, _end, terrno);
|
||||||
|
|
||||||
pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
|
pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
|
||||||
QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(pInfo->pPrevValues, code, lino, _end, terrno);
|
||||||
|
|
||||||
{ // ts column
|
{ // ts column
|
||||||
SColumn c = {0};
|
SColumn c = {0};
|
||||||
|
@ -1222,17 +1222,17 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
|
||||||
c.type = TSDB_DATA_TYPE_TIMESTAMP;
|
c.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
c.bytes = sizeof(int64_t);
|
c.bytes = sizeof(int64_t);
|
||||||
tmp = taosArrayPush(pInfo->pInterpCols, &c);
|
tmp = taosArrayPush(pInfo->pInterpCols, &c);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
SGroupKeys key;
|
SGroupKeys key;
|
||||||
key.bytes = c.bytes;
|
key.bytes = c.bytes;
|
||||||
key.type = c.type;
|
key.type = c.type;
|
||||||
key.isNull = true; // to denote no value is assigned yet
|
key.isNull = true; // to denote no value is assigned yet
|
||||||
key.pData = taosMemoryCalloc(1, c.bytes);
|
key.pData = taosMemoryCalloc(1, c.bytes);
|
||||||
QUERY_CHECK_NULL(key.pData, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
|
||||||
|
|
||||||
tmp = taosArrayPush(pInfo->pPrevValues, &key);
|
tmp = taosArrayPush(pInfo->pPrevValues, &key);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1244,17 +1244,17 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
|
||||||
|
|
||||||
SColumn c = *pParam->pCol;
|
SColumn c = *pParam->pCol;
|
||||||
tmp = taosArrayPush(pInfo->pInterpCols, &c);
|
tmp = taosArrayPush(pInfo->pInterpCols, &c);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
SGroupKeys key = {0};
|
SGroupKeys key = {0};
|
||||||
key.bytes = c.bytes;
|
key.bytes = c.bytes;
|
||||||
key.type = c.type;
|
key.type = c.type;
|
||||||
key.isNull = false;
|
key.isNull = false;
|
||||||
key.pData = taosMemoryCalloc(1, c.bytes);
|
key.pData = taosMemoryCalloc(1, c.bytes);
|
||||||
QUERY_CHECK_NULL(key.pData, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
|
||||||
|
|
||||||
tmp = taosArrayPush(pInfo->pPrevValues, &key);
|
tmp = taosArrayPush(pInfo->pPrevValues, &key);
|
||||||
QUERY_CHECK_NULL(tmp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue