metaReader/decoder: clear to release tDecoderMalloc
This commit is contained in:
parent
0b45f95211
commit
2a38443640
|
@ -576,14 +576,15 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
int32_t numOfRows = 0;
|
||||
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||
SqlFunctionCtx* pfCtx = &pCtx[k];
|
||||
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
|
||||
SqlFunctionCtx* pfCtx = &pCtx[k];
|
||||
SInputColumnInfoData* pInputData = &pfCtx->input;
|
||||
|
||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||
if (pResult->info.rows > 0 && !createNewColModel) {
|
||||
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows);
|
||||
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
|
||||
pInputData->numOfRows);
|
||||
} else {
|
||||
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
|
||||
}
|
||||
|
@ -641,11 +642,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
} else if (fmIsAggFunc(pfCtx->functionId)) {
|
||||
// _group_key function for "partition by tbname" + csum(col_name) query
|
||||
SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||
|
||||
// todo handle the json tag
|
||||
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
||||
for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
|
||||
for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
|
||||
bool isNull = colDataIsNull_s(pInput, f);
|
||||
if (isNull) {
|
||||
colDataAppendNULL(pOutput, pResult->info.rows + f);
|
||||
|
@ -3819,7 +3820,8 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) {
|
||||
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
int32_t order = 0;
|
||||
int32_t scanFlag = 0;
|
||||
|
||||
|
@ -3874,9 +3876,9 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while(1) {
|
||||
while (1) {
|
||||
// here we need to handle the existsed group results
|
||||
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
|
||||
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
|
||||
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
|
||||
SqlFunctionCtx* pCtx = &pSup->pCtx[k];
|
||||
|
||||
|
@ -3974,15 +3976,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|||
|
||||
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pCondition = pPhyNode->node.pConditions;
|
||||
pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pCondition = pPhyNode->node.pConditions;
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
||||
|
@ -4047,8 +4049,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* pColMatchColInfo =
|
||||
extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc,
|
||||
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
|
||||
pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
|
||||
|
@ -4056,18 +4058,18 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->multigroupResult = multigroupResult;
|
||||
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
||||
pInfo->pColMatchColInfo = pColMatchColInfo;
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->multigroupResult = multigroupResult;
|
||||
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
||||
pInfo->pColMatchColInfo = pColMatchColInfo;
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = num;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
||||
|
@ -4117,6 +4119,8 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
|
|||
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
|
||||
} else if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||
tDecoderClear(&mr.coder);
|
||||
|
||||
tb_uid_t suid = mr.me.ctbEntry.suid;
|
||||
metaGetTableEntryByUid(&mr, suid);
|
||||
pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
|
||||
|
|
Loading…
Reference in New Issue