fix:[TD-32727] fix bug when select interp with two more group key.

This commit is contained in:
Jing Sima 2024-10-29 13:27:09 +08:00 committed by 54liuyao
parent e7b4fbcf90
commit c87b054a02
1 changed files with 44 additions and 18 deletions

View File

@ -42,7 +42,7 @@ typedef struct STimeSliceOperatorInfo {
SRowKey prevKey; SRowKey prevKey;
bool prevTsSet; bool prevTsSet;
uint64_t groupId; uint64_t groupId;
SGroupKeys* pPrevGroupKey; SArray* pPrevGroupKeys;
SSDataBlock* pNextGroupRes; SSDataBlock* pNextGroupRes;
SSDataBlock* pRemainRes; // save block unfinished processing SSDataBlock* pRemainRes; // save block unfinished processing
int32_t remainIndex; // the remaining index in the block to be processed int32_t remainIndex; // the remaining index in the block to be processed
@ -288,6 +288,7 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
// output the result // output the result
int32_t fillColIndex = 0; int32_t fillColIndex = 0;
int32_t groupKeyIndex = 0;
bool hasInterp = true; bool hasInterp = true;
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
@ -320,7 +321,9 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} else if (!isSelectGroupConstValueFunc(pExprInfo)) { } else if (!isSelectGroupConstValueFunc(pExprInfo)) {
// use stored group key // use stored group key
SGroupKeys* pkey = pSliceInfo->pPrevGroupKey; SGroupKeys *pkey = taosArrayGet(pSliceInfo->pPrevGroupKeys, groupKeyIndex);
QUERY_CHECK_NULL(pkey, code, lino, _end, terrno);
groupKeyIndex++;
if (pkey->isNull == false) { if (pkey->isNull == false) {
code = colDataSetVal(pDst, rows, pkey->pData, false); code = colDataSetVal(pDst, rows, pkey->pData, false);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
@ -645,13 +648,20 @@ _end:
return code; return code;
} }
static void destroyGroupKey(void* pKey) {
SGroupKeys* key = (SGroupKeys*)pKey;
if (key->pData != NULL) {
taosMemoryFreeClear(key->pData);
}
}
static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) { static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) {
if (pInfo->pPrevGroupKey != NULL) { if (pInfo->pPrevGroupKeys != NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pInfo->pPrevGroupKey = taosMemoryCalloc(1, sizeof(SGroupKeys)); pInfo->pPrevGroupKeys = taosArrayInit(pExprSup->numOfExprs, sizeof(SGroupKeys));
if (pInfo->pPrevGroupKey == NULL) { if (pInfo->pPrevGroupKeys == NULL) {
return terrno; return terrno;
} }
@ -659,11 +669,17 @@ static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExp
SExprInfo* pExprInfo = &pExprSup->pExprInfo[i]; SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
if (isGroupKeyFunc(pExprInfo)) { if (isGroupKeyFunc(pExprInfo)) {
pInfo->pPrevGroupKey->bytes = pExprInfo->base.resSchema.bytes; SGroupKeys key = {.bytes = pExprInfo->base.resSchema.bytes,
pInfo->pPrevGroupKey->type = pExprInfo->base.resSchema.type; .type = pExprInfo->base.resSchema.type,
pInfo->pPrevGroupKey->isNull = false; .isNull = false,
pInfo->pPrevGroupKey->pData = taosMemoryCalloc(1, pInfo->pPrevGroupKey->bytes); .pData = taosMemoryCalloc(1, pExprInfo->base.resSchema.bytes)};
if (!pInfo->pPrevGroupKey->pData) { if (!key.pData) {
taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
return terrno;
}
if (NULL == taosArrayPush(pInfo->pPrevGroupKeys, &key)) {
taosMemoryFree(key.pData);
taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
return terrno; return terrno;
} }
} }
@ -910,7 +926,7 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato
SInterval* pInterval = &pSliceInfo->interval; SInterval* pInterval = &pSliceInfo->interval;
if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR || if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR ||
pSliceInfo->pPrevGroupKey == NULL) { pSliceInfo->pPrevGroupKeys == NULL) {
return; return;
} }
@ -921,12 +937,18 @@ static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperato
} }
} }
static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataBlock* pSrcBlock) { static int32_t copyPrevGroupKey(SExprSupp* pExprSup, SArray * pGroupKeys, SSDataBlock* pSrcBlock) {
int32_t groupKeyIdx = 0;
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
if (isGroupKeyFunc(pExprInfo)) { if (isGroupKeyFunc(pExprInfo)) {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SGroupKeys *pGroupKey = taosArrayGet(pGroupKeys, groupKeyIdx);
if (pGroupKey == NULL) {
return terrno;
}
groupKeyIdx++;
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
if (colDataIsNull_s(pSrc, 0)) { if (colDataIsNull_s(pSrc, 0)) {
@ -942,9 +964,9 @@ static void copyPrevGroupKey(SExprSupp* pExprSup, SGroupKeys* pGroupKey, SSDataB
} }
pGroupKey->isNull = false; pGroupKey->isNull = false;
break;
} }
} }
return TSDB_CODE_SUCCESS;
} }
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
@ -986,7 +1008,11 @@ static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull); doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock); code = copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKeys, pBlock);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
} }
static int32_t doTimesliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { static int32_t doTimesliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
@ -1160,7 +1186,7 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
pInfo->prevTsSet = false; pInfo->prevTsSet = false;
pInfo->prevKey.ts = INT64_MIN; pInfo->prevKey.ts = INT64_MIN;
pInfo->groupId = 0; pInfo->groupId = 0;
pInfo->pPrevGroupKey = NULL; pInfo->pPrevGroupKeys = NULL;
pInfo->pNextGroupRes = NULL; pInfo->pNextGroupRes = NULL;
pInfo->pRemainRes = NULL; pInfo->pRemainRes = NULL;
pInfo->remainIndex = 0; pInfo->remainIndex = 0;
@ -1233,9 +1259,9 @@ void destroyTimeSliceOperatorInfo(void* param) {
} }
taosArrayDestroy(pInfo->pLinearInfo); taosArrayDestroy(pInfo->pLinearInfo);
if (pInfo->pPrevGroupKey) { if (pInfo->pPrevGroupKeys) {
taosMemoryFree(pInfo->pPrevGroupKey->pData); taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
taosMemoryFree(pInfo->pPrevGroupKey); pInfo->pPrevGroupKeys = NULL;
} }
if (pInfo->hasPk && IS_VAR_DATA_TYPE(pInfo->pkCol.type)) { if (pInfo->hasPk && IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
taosMemoryFreeClear(pInfo->prevKey.pks[0].pData); taosMemoryFreeClear(pInfo->prevKey.pks[0].pData);