fix malloc issue
This commit is contained in:
parent
37fc4f5674
commit
f0e4d2f085
|
@ -165,6 +165,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
int32_t capacity = 0;
|
||||
|
||||
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
|
||||
QUERY_CHECK_NULL(pInfo->pUidList, code, lino, _error, terrno);
|
||||
|
||||
// partition by tbname
|
||||
if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
|
||||
|
@ -203,6 +204,7 @@ int32_t createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandl
|
|||
p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs);
|
||||
p->pCtx =
|
||||
createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(p->pCtx, code, lino, _error, terrno);
|
||||
}
|
||||
|
||||
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
|
||||
|
|
|
@ -238,6 +238,9 @@ static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWi
|
|||
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
|
||||
if (*pResult == NULL) {
|
||||
SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize);
|
||||
if (!p) {
|
||||
return terrno;
|
||||
}
|
||||
pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset};
|
||||
*pResult = p;
|
||||
}
|
||||
|
|
|
@ -307,6 +307,9 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
|
|||
|
||||
int32_t len = strlen(id) + 1;
|
||||
pInfo->pTaskId = taosMemoryCalloc(1, len);
|
||||
if (!pInfo->pTaskId) {
|
||||
return terrno;
|
||||
}
|
||||
strncpy(pInfo->pTaskId, id, len);
|
||||
for (int32_t i = 0; i < numOfSources; ++i) {
|
||||
SSourceDataInfo dataInfo = {0};
|
||||
|
@ -389,7 +392,9 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
|
|||
|
||||
pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
|
||||
pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
|
||||
pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
|
||||
|
||||
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
|
||||
code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
|
||||
|
|
|
@ -144,6 +144,7 @@ int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap,
|
|||
|
||||
void* pData = NULL;
|
||||
pGroupResInfo->pRows = taosArrayInit(size, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pGroupResInfo->pRows, code, lino, _end, terrno);
|
||||
|
||||
size_t keyLen = 0;
|
||||
int32_t iter = 0;
|
||||
|
@ -353,9 +354,15 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
|||
} else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t len = ((const STag*)p)->len;
|
||||
res->datum.p = taosMemoryCalloc(len + 1, 1);
|
||||
if (NULL == res->datum.p) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
memcpy(res->datum.p, p, len);
|
||||
} else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
|
||||
res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
|
||||
if (NULL == res->datum.p) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
|
||||
varDataSetLen(res->datum.p, tagVal.nData);
|
||||
} else {
|
||||
|
@ -378,6 +385,9 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
|||
|
||||
int32_t len = strlen(mr->me.name);
|
||||
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
|
||||
if (NULL == res->datum.p) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
memcpy(varDataVal(res->datum.p), mr->me.name, len);
|
||||
varDataSetLen(res->datum.p, len);
|
||||
nodesDestroyNode(*pNode);
|
||||
|
@ -856,6 +866,7 @@ static SArray* getTableNameList(const SNodeListNode* pList) {
|
|||
|
||||
// remove the duplicates
|
||||
SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*));
|
||||
QUERY_CHECK_NULL(pNewList, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(pNewList, taosArrayGet(pTbList, 0));
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
|
||||
|
@ -1739,6 +1750,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
|
||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
pExp->base.numOfParams = numOfParam;
|
||||
|
||||
for (int32_t j = 0; j < numOfParam; ++j) {
|
||||
|
@ -1760,6 +1772,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
pExp->base.numOfParams = 1;
|
||||
|
||||
SDataType* pType = &pOpNode->node.resType;
|
||||
|
@ -1771,6 +1784,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
SCaseWhenNode* pCaseNode = (SCaseWhenNode*)pNode;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
pExp->base.numOfParams = 1;
|
||||
|
||||
SDataType* pType = &pCaseNode->node.resType;
|
||||
|
@ -1781,9 +1795,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
|
||||
SLogicConditionNode* pCond = (SLogicConditionNode*)pNode;
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
if (!pExp->base.pParam) {
|
||||
code = terrno;
|
||||
}
|
||||
QUERY_CHECK_NULL(pExp->base.pParam, code, lino, _end, terrno);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pExp->base.numOfParams = 1;
|
||||
SDataType* pType = &pCond->node.resType;
|
||||
|
@ -1808,6 +1821,9 @@ int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
|
|||
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
|
||||
*numOfExprs = LIST_LENGTH(pNodeList);
|
||||
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
|
||||
if (!pExprs) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < (*numOfExprs); ++i) {
|
||||
SExprInfo* pExp = &pExprs[i];
|
||||
|
@ -2068,6 +2084,9 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
|||
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
|
||||
|
||||
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
|
||||
if (!pCond->colList) {
|
||||
return terrno;
|
||||
}
|
||||
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
|
||||
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -2448,6 +2467,9 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
|
|||
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
|
||||
|
||||
SArray* pList = taosArrayInit(4, sizeof(int32_t));
|
||||
if (!pList) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
|
||||
uint64_t gid = pInfo->groupId;
|
||||
|
|
|
@ -373,6 +373,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||
QUERY_CHECK_NULL(qa, code, lino, _end, terrno);
|
||||
int32_t numOfUids = taosArrayGetSize(tableIdList);
|
||||
if (numOfUids == 0) {
|
||||
(*ppArrayRes) = qa;
|
||||
|
|
|
@ -409,6 +409,9 @@ static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SF
|
|||
SColumnDataAgg* da = NULL;
|
||||
if (pInput->pColumnDataAgg[paramIndex] == NULL) {
|
||||
da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
|
||||
if (!da) {
|
||||
return terrno;
|
||||
}
|
||||
pInput->pColumnDataAgg[paramIndex] = da;
|
||||
if (da == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -397,6 +397,9 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
|||
pInfo->win.ekey = win.skey;
|
||||
}
|
||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||
if (pInfo->p) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
|
||||
taosMemoryFree(pInfo->pFillInfo);
|
||||
|
|
|
@ -854,6 +854,9 @@ _end:
|
|||
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
|
||||
if (!offset) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
offset[0] = sizeof(int32_t) +
|
||||
sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
|
||||
|
@ -1193,6 +1196,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
|
|||
blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf),
|
||||
blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock)));
|
||||
pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
|
||||
QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno);
|
||||
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
|
@ -1431,6 +1436,7 @@ static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDat
|
|||
SPartitionDataInfo newParData = {0};
|
||||
newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
|
||||
newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
|
||||
QUERY_CHECK_NULL(newParData.rowIds, code, lino, _end, terrno);
|
||||
void* tmp = taosArrayPush(newParData.rowIds, &i);
|
||||
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||
|
||||
|
@ -1594,6 +1600,9 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
if (!pBlock) {
|
||||
return NULL;
|
||||
}
|
||||
pBlock->info.hasVarCol = false;
|
||||
pBlock->info.id.groupId = 0;
|
||||
pBlock->info.rows = 0;
|
||||
|
@ -1601,6 +1610,7 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
|
|||
pBlock->info.watermark = INT64_MIN;
|
||||
|
||||
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
|
||||
QUERY_CHECK_NULL(pBlock->pDataBlock, code, lino, _end, terrno);
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
|
||||
if (tbName->numOfExprs > 0) {
|
||||
|
|
|
@ -1280,6 +1280,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||
SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
|
||||
if (!pRecorder) {
|
||||
return terrno;
|
||||
}
|
||||
STableScanInfo* pTableScanInfo = pOptr->info;
|
||||
*pRecorder = pTableScanInfo->base.readRecorder;
|
||||
*pOptrExplain = pRecorder;
|
||||
|
@ -1341,6 +1344,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
|
|||
pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
|
||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno);
|
||||
}
|
||||
|
||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||
|
@ -3007,6 +3011,9 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
|||
}
|
||||
|
||||
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||
if (!pUpInfo) {
|
||||
return;
|
||||
}
|
||||
int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
|
||||
|
@ -3466,6 +3473,7 @@ static int32_t extractTableIdList(const STableListInfo* pTableListInfo, SArray**
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
|
||||
QUERY_CHECK_NULL(tableIdList, code, lino, _end, terrno);
|
||||
|
||||
// Transfer the Array of STableKeyInfo into uid list.
|
||||
size_t size = tableListGetSize(pTableListInfo);
|
||||
|
@ -3861,6 +3869,8 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
|
|||
pInfo->primaryKeyIndex = -1;
|
||||
int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
|
||||
pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
|
||||
QUERY_CHECK_NULL(pColIds, code, lino, _error, terrno);
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||
|
||||
|
@ -5286,6 +5296,7 @@ int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppS
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SArray* pSortInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||
QUERY_CHECK_NULL(pSortInfo, code, lino, _end, terrno);
|
||||
SBlockOrderInfo biTs = {0};
|
||||
SBlockOrderInfo biPk = {0};
|
||||
|
||||
|
@ -5374,9 +5385,11 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
|
||||
QUERY_CHECK_NULL(param, code, lino, _end, terrno);
|
||||
param->pOperator = pOperator;
|
||||
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
QUERY_CHECK_NULL(ps, code, lino, _end, terrno);
|
||||
ps->param = param;
|
||||
ps->onlyRef = false;
|
||||
code = tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
@ -5653,6 +5666,9 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
|
|||
ASSERT(pOptr != NULL);
|
||||
// TODO: merge these two info into one struct
|
||||
STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
|
||||
if (!execInfo) {
|
||||
return terrno;
|
||||
}
|
||||
STableMergeScanInfo* pInfo = pOptr->info;
|
||||
execInfo->blockRecorder = pInfo->base.readRecorder;
|
||||
execInfo->sortExecInfo = pInfo->sortExecInfo;
|
||||
|
@ -5697,6 +5713,7 @@ int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SR
|
|||
pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
|
||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(pSup->pCtx, code, lino, _error, terrno);
|
||||
}
|
||||
|
||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||
|
|
|
@ -60,6 +60,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
|
|||
QRY_OPTR_CHECK(pOptrInfo);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -89,6 +90,7 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
|
|||
|
||||
pOperator->exprSupp.pCtx =
|
||||
createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -778,6 +780,7 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
|
|||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset,
|
||||
&pTaskInfo->storageAPI.functionStore);
|
||||
QUERY_CHECK_NULL(pOperator->exprSupp.pCtx, code, lino, _error, terrno);
|
||||
|
||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
|
|
@ -651,10 +651,12 @@ static int32_t doStreamCountAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pStUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -857,6 +859,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
pInfo->binfo.pRes = pResBlock;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno);
|
||||
pInfo->pDelIterator = NULL;
|
||||
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
|
@ -878,6 +881,7 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
pInfo->recvGetAll = false;
|
||||
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
|
||||
pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey;
|
||||
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
|
||||
|
|
|
@ -619,10 +619,12 @@ static int32_t doStreamEventAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SEventWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pSeUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -738,6 +740,9 @@ void streamEventReleaseState(SOperatorInfo* pOperator) {
|
|||
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||
int32_t resSize = winSize + sizeof(TSKEY);
|
||||
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||
if (!pBuff) {
|
||||
return ;
|
||||
}
|
||||
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||
qDebug("===stream=== event window operator relase state. save result count:%d",
|
||||
|
@ -780,10 +785,12 @@ void streamEventReloadState(SOperatorInfo* pOperator) {
|
|||
if (!pInfo->pSeUpdated && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pSeDeleted && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
SEventWindowInfo curInfo = {0};
|
||||
|
@ -897,6 +904,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _error, terrno);
|
||||
pInfo->pDelIterator = NULL;
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
|
|
@ -604,6 +604,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
|
|||
SPoint cur = {0};
|
||||
cur.key = pFillInfo->current;
|
||||
cur.val = taosMemoryCalloc(1, pCell->bytes);
|
||||
QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno);
|
||||
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
|
||||
code = colDataSetVal(pColData, index, (const char*)cur.val, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -683,15 +684,24 @@ _end:
|
|||
}
|
||||
}
|
||||
|
||||
void keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
|
||||
int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
|
||||
int32_t rowId, uint64_t groupId, int32_t rowSize) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
TSKEY ts = tsCol[rowId];
|
||||
pFillInfo->nextRowKey = ts;
|
||||
SResultRowData tmpNextRow = {.key = ts};
|
||||
tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize);
|
||||
QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno);
|
||||
transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
|
||||
keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize);
|
||||
taosMemoryFreeClear(tmpNextRow.pRowVal);
|
||||
|
||||
_end:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
|
||||
|
@ -721,13 +731,15 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
|
|||
pInfo->srcRowIndex++;
|
||||
|
||||
if (pInfo->srcRowIndex == 0) {
|
||||
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pInfo->srcRowIndex++;
|
||||
}
|
||||
|
||||
while (pInfo->srcRowIndex < pBlock->info.rows) {
|
||||
TSKEY ts = tsCol[pInfo->srcRowIndex];
|
||||
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
|
||||
if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) {
|
||||
code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
|
||||
|
@ -1207,6 +1219,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pFillSup->pResMap = tSimpleHashInit(16, hashFn);
|
||||
QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno);
|
||||
pFillSup->hasDelete = false;
|
||||
|
||||
_end:
|
||||
|
|
|
@ -647,6 +647,7 @@ int32_t addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
SArray* childIds = taosArrayInit(8, sizeof(int32_t));
|
||||
QUERY_CHECK_NULL(childIds, code, lino, _end, terrno);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
void* tmp = taosArrayPush(childIds, &i);
|
||||
if (!tmp) {
|
||||
|
@ -1579,10 +1580,12 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
|||
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pUpdatedMap) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
@ -1609,6 +1612,7 @@ static int32_t doStreamFinalIntervalAggNext(SOperatorInfo* pOperator, SSDataBloc
|
|||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
|
||||
QUERY_CHECK_NULL(delWins, code, lino, _end, terrno);
|
||||
SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL;
|
||||
code = doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -1891,6 +1895,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||
|
||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno);
|
||||
qInfo("open state %p", pInfo->pState);
|
||||
pAPI->stateStore.streamStateCopyBackend(pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||
//*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||
|
@ -1914,6 +1919,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
}
|
||||
|
||||
pInfo->pPullWins = taosArrayInit(8, sizeof(SPullWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pPullWins, code, lino, _error, terrno);
|
||||
pInfo->pullIndex = 0;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pPullDataMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
|
@ -1929,6 +1935,7 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
|
||||
pInfo->delIndex = 0;
|
||||
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
|
||||
QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
|
||||
pInfo->delKey.ts = INT64_MAX;
|
||||
pInfo->delKey.groupId = 0;
|
||||
pInfo->numOfDatapack = 0;
|
||||
|
@ -1953,7 +1960,9 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
|
||||
pInfo->clearState = false;
|
||||
pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey));
|
||||
QUERY_CHECK_NULL(pInfo->pMidPullDatas, code, lino, _error, terrno);
|
||||
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
|
||||
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey;
|
||||
|
||||
pOperator->operatorType = pPhyNode->type;
|
||||
|
@ -2129,6 +2138,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
|
||||
initDummyFunction(pSup->pDummyCtx, pExpSup->pCtx, numOfOutput);
|
||||
pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
if (!pSup->pState) {
|
||||
return terrno;
|
||||
}
|
||||
*(pSup->pState) = *pState;
|
||||
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
||||
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
||||
|
@ -2138,6 +2150,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
||||
if (!pSup->pResultRows) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
||||
|
@ -3340,10 +3355,12 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pStUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -3356,6 +3373,7 @@ static int32_t doStreamSessionAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
|
|||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||
QUERY_CHECK_NULL(pWins, code, lino, _end, terrno);
|
||||
// gap must be 0
|
||||
code = doDeleteTimeWindows(pAggSup, pBlock, pWins);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -3496,6 +3514,9 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) {
|
|||
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||
int32_t resSize = winSize + sizeof(TSKEY);
|
||||
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||
if (!pBuff) {
|
||||
return;
|
||||
}
|
||||
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
|
||||
|
@ -3623,6 +3644,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
|||
if (!pInfo->pStUpdated && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
SResultWindowInfo winInfo = {0};
|
||||
|
@ -3731,6 +3753,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
pInfo->order = TSDB_ORDER_ASC;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _error, terrno);
|
||||
pInfo->pDelIterator = NULL;
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
@ -3757,6 +3780,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
pInfo->recvGetAll = false;
|
||||
pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey;
|
||||
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
|
||||
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||
|
@ -3886,10 +3910,12 @@ static int32_t doStreamSessionSemiAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pStUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pStUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -4008,6 +4034,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
|
||||
if (numOfChild > 0) {
|
||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
|
||||
QUERY_CHECK_NULL(pInfo->pChildren, code, lino, _error, terrno);
|
||||
for (int32_t i = 0; i < numOfChild; i++) {
|
||||
SOperatorInfo* pChildOp = NULL;
|
||||
code = createStreamFinalSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo, 0, pHandle, &pChildOp);
|
||||
|
@ -4598,10 +4625,12 @@ static int32_t doStreamStateAggNext(SOperatorInfo* pOperator, SSDataBlock** ppRe
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pSeUpdated) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
|
@ -4702,6 +4731,9 @@ void streamStateReleaseState(SOperatorInfo* pOperator) {
|
|||
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||
int32_t resSize = winSize + sizeof(TSKEY);
|
||||
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||
if (!pBuff) {
|
||||
return ;
|
||||
}
|
||||
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||
qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
|
||||
|
@ -4752,10 +4784,12 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
|||
if (!pInfo->pSeUpdated && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pSeDeleted && num > 0) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _end, terrno);
|
||||
}
|
||||
for (int32_t i = 0; i < num; i++) {
|
||||
SStateWindowInfo curInfo = {0};
|
||||
|
@ -4865,6 +4899,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pSeDeleted, code, lino, _error, terrno);
|
||||
pInfo->pDelIterator = NULL;
|
||||
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
|
@ -4891,6 +4926,7 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
pInfo->recvGetAll = false;
|
||||
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
|
||||
pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey;
|
||||
|
||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||
|
@ -4985,11 +5021,13 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p
|
|||
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
if (!pInfo->pUpdatedMap) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
@ -5164,6 +5202,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
QUERY_CHECK_NULL(pInfo->pState, code, lino, _error, terrno);
|
||||
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
|
||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex);
|
||||
|
||||
|
@ -5181,6 +5220,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
pInfo->invertible = false;
|
||||
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
|
||||
QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
|
||||
pInfo->delIndex = 0;
|
||||
|
||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||
|
@ -5221,6 +5261,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
|
||||
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey;
|
||||
|
||||
// for stream
|
||||
|
@ -5453,10 +5494,12 @@ static int32_t doStreamMidIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock*
|
|||
|
||||
if (!pInfo->pUpdated) {
|
||||
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||
}
|
||||
if (!pInfo->pUpdatedMap) {
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
|
||||
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
|
|
@ -1772,8 +1772,10 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
|||
.pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI};
|
||||
|
||||
SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex));
|
||||
QUERY_CHECK_NULL(idx, code, lino, _end, terrno);
|
||||
idx->init = 0;
|
||||
idx->uids = taosArrayInit(128, sizeof(int64_t));
|
||||
QUERY_CHECK_NULL(idx->uids, code, lino, _end, terrno);
|
||||
idx->lastIdx = 0;
|
||||
|
||||
pInfo->pIdx = idx; // set idx arg
|
||||
|
@ -1992,6 +1994,9 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
|
|||
|
||||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
|
||||
char* buf1 = taosMemoryCalloc(1, contLen);
|
||||
if (!buf1) {
|
||||
return NULL;
|
||||
}
|
||||
int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
|
||||
if (tempRes < 0) {
|
||||
code = terrno;
|
||||
|
@ -2470,12 +2475,18 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
|
|||
bool hasIdx = false;
|
||||
bool hasRslt = true;
|
||||
SArray* mRslt = taosArrayInit(len, POINTER_BYTES);
|
||||
if (!mRslt) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
SListCell* cell = pList->pHead;
|
||||
for (int i = 0; i < len; i++) {
|
||||
if (cell == NULL) break;
|
||||
|
||||
SArray* aRslt = taosArrayInit(16, sizeof(int64_t));
|
||||
if (!aRslt) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
ret = optSysTabFilteImpl(arg, cell->pNode, aRslt);
|
||||
if (ret == 0) {
|
||||
|
|
|
@ -666,6 +666,9 @@ static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExp
|
|||
pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type;
|
||||
pInfo->pPrevGroupKey->isNull = false;
|
||||
pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes);
|
||||
if (!pInfo->pPrevGroupKey->pData) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1168,6 +1171,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
|
|||
|
||||
if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
|
||||
pInfo->prevKey.pks[0].pData = taosMemoryCalloc(1, pInfo->pkCol.bytes);
|
||||
QUERY_CHECK_NULL(pInfo->prevKey.pks[0].pData, code, lino, _error, terrno);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -198,6 +198,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
|
|||
SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type,
|
||||
int32_t numOfCols) {
|
||||
SOperatorInfo* pOperator = static_cast<SOperatorInfo*>(taosMemoryCalloc(1, sizeof(SOperatorInfo)));
|
||||
ASSERT(!pOperator);
|
||||
pOperator->name = "dummyInputOpertor4Test";
|
||||
|
||||
if (numOfCols == 1) {
|
||||
|
@ -207,6 +208,7 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_
|
|||
}
|
||||
|
||||
SDummyInputInfo* pInfo = (SDummyInputInfo*)taosMemoryCalloc(1, sizeof(SDummyInputInfo));
|
||||
ASSERT(!pInfo);
|
||||
pInfo->totalPages = numOfBlocks;
|
||||
pInfo->startVal = startVal;
|
||||
pInfo->numOfRowsPerPage = rowsPerPage;
|
||||
|
|
Loading…
Reference in New Issue