optimize group key storage
This commit is contained in:
parent
3b10ac29ce
commit
4693c22970
|
@ -39,7 +39,7 @@ typedef struct STimeSliceOperatorInfo {
|
||||||
int64_t prevTs;
|
int64_t prevTs;
|
||||||
bool prevTsSet;
|
bool prevTsSet;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
SSDataBlock* pPrevGroupRes;
|
SGroupKeys* pPrevGroupKey;
|
||||||
SSDataBlock* pNextGroupRes;
|
SSDataBlock* pNextGroupRes;
|
||||||
} STimeSliceOperatorInfo;
|
} STimeSliceOperatorInfo;
|
||||||
|
|
||||||
|
@ -227,6 +227,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
continue;
|
continue;
|
||||||
} else if (!isInterpFunc(pExprInfo)) {
|
} else if (!isInterpFunc(pExprInfo)) {
|
||||||
if (isGroupKeyFunc(pExprInfo)) {
|
if (isGroupKeyFunc(pExprInfo)) {
|
||||||
|
if (pSrcBlock != NULL) {
|
||||||
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
|
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
|
||||||
|
|
||||||
|
@ -237,6 +238,15 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
|
|
||||||
char* v = colDataGetData(pSrc, index);
|
char* v = colDataGetData(pSrc, index);
|
||||||
colDataSetVal(pDst, pResBlock->info.rows, v, false);
|
colDataSetVal(pDst, pResBlock->info.rows, v, false);
|
||||||
|
} else {
|
||||||
|
// use stored group key
|
||||||
|
SGroupKeys* pkey = pSliceInfo->pPrevGroupKey;
|
||||||
|
if (pkey->isNull == false) {
|
||||||
|
colDataSetVal(pDst, rows, pkey->pData, false);
|
||||||
|
} else {
|
||||||
|
colDataSetNULL(pDst, rows);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -464,7 +474,31 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) {
|
||||||
|
if (pInfo->pPrevGroupKey != NULL) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys));
|
||||||
|
if (pInfo->pPrevGroupKey == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
|
||||||
|
SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
|
||||||
|
|
||||||
|
if (isGroupKeyFunc(pExprInfo)) {
|
||||||
|
pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes;
|
||||||
|
pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type;
|
||||||
|
pInfo->pPrevGroupKey->isNull = false;
|
||||||
|
pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
code = initPrevRowsKeeper(pInfo, pBlock);
|
code = initPrevRowsKeeper(pInfo, pBlock);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -481,6 +515,12 @@ static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = initGroupKeyKeeper(pInfo, pExprSup);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -627,22 +667,42 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, SSDataBlock* pSrcBlock,
|
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
|
||||||
int32_t index) {
|
|
||||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||||
SInterval* pInterval = &pSliceInfo->interval;
|
SInterval* pInterval = &pSliceInfo->interval;
|
||||||
|
|
||||||
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
|
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
|
||||||
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
|
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
|
||||||
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pSrcBlock, index, false);
|
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false);
|
||||||
pSliceInfo->current =
|
pSliceInfo->current =
|
||||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void copyPrevGroupDataBlock(SSDataBlock* pDstBlock, SSDataBlock* pSrcBlock) {
|
static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) {
|
||||||
blockDataCleanup(pDstBlock);
|
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
|
||||||
assignOneDataBlock(pDstBlock, pSrcBlock);
|
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
|
||||||
|
|
||||||
|
if (isGroupKeyFunc(pExprInfo)) {
|
||||||
|
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
|
||||||
|
|
||||||
|
if (colDataIsNull_s(pSrc, 0)) {
|
||||||
|
pGroupKey->isNull = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* v = colDataGetData(pSrc, 0);
|
||||||
|
if (IS_VAR_DATA_TYPE(pGroupKey->type)) {
|
||||||
|
memcpy(pGroupKey->pData, v, varDataTLen(v));
|
||||||
|
} else {
|
||||||
|
memcpy(pGroupKey->pData, v, pGroupKey->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
pGroupKey->isNull = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
|
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
|
||||||
|
@ -672,7 +732,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
if (pSliceInfo->pNextGroupRes != NULL) {
|
if (pSliceInfo->pNextGroupRes != NULL) {
|
||||||
setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
|
setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
|
||||||
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
|
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
|
||||||
copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pSliceInfo->pNextGroupRes);
|
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes);
|
||||||
pSliceInfo->pNextGroupRes = NULL;
|
pSliceInfo->pNextGroupRes = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -698,7 +758,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = initKeeperInfo(pSliceInfo, pBlock);
|
int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -706,13 +766,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
||||||
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
|
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
|
||||||
copyPrevGroupDataBlock(pSliceInfo->pPrevGroupRes, pBlock);
|
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if need to interpolate after last datablock
|
// check if need to interpolate after last datablock
|
||||||
// except for fill(next), fill(linear)
|
// except for fill(next), fill(linear)
|
||||||
genInterpAfterDataBlock(pSliceInfo, pOperator, pSliceInfo->pPrevGroupRes,
|
genInterpAfterDataBlock(pSliceInfo, pOperator, 0);
|
||||||
pSliceInfo->pPrevGroupRes->info.rows - 1);
|
|
||||||
|
|
||||||
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
@ -776,7 +835,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
pInfo->prevTsSet = false;
|
pInfo->prevTsSet = false;
|
||||||
pInfo->prevTs = 0;
|
pInfo->prevTs = 0;
|
||||||
pInfo->groupId = 0;
|
pInfo->groupId = 0;
|
||||||
pInfo->pPrevGroupRes = createDataBlock();
|
pInfo->pPrevGroupKey = NULL;
|
||||||
pInfo->pNextGroupRes = NULL;
|
pInfo->pNextGroupRes = NULL;
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
|
@ -806,7 +865,6 @@ void destroyTimeSliceOperatorInfo(void* param) {
|
||||||
STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;
|
STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;
|
||||||
|
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
pInfo->pPrevGroupRes = blockDataDestroy(pInfo->pPrevGroupRes);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
|
||||||
SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
|
SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
|
||||||
|
@ -826,6 +884,10 @@ void destroyTimeSliceOperatorInfo(void* param) {
|
||||||
taosMemoryFree(pKey->end.val);
|
taosMemoryFree(pKey->end.val);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pInfo->pLinearInfo);
|
taosArrayDestroy(pInfo->pLinearInfo);
|
||||||
|
|
||||||
|
taosMemoryFree(pInfo->pPrevGroupKey->pData);
|
||||||
|
taosMemoryFree(pInfo->pPrevGroupKey);
|
||||||
|
|
||||||
cleanupExprSupp(&pInfo->scalarSup);
|
cleanupExprSupp(&pInfo->scalarSup);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
|
for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
|
||||||
|
|
Loading…
Reference in New Issue