Merge pull request #16453 from taosdata/szhou/fixbugs

fix: interval operator interpolation for different partition
This commit is contained in:
Shengliang Guan 2022-08-27 10:41:32 +08:00 committed by GitHub
commit 6aef861374
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 26 deletions

View File

@ -257,6 +257,7 @@ typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCo
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData);
#ifdef __cplusplus

View File

@ -1706,8 +1706,8 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) {
SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock));
taosArrayPush(dataBlocks, pBlock);
SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*));
taosArrayPush(dataBlocks, &pBlock);
blockDebugShowDataBlocks(dataBlocks, flag);
taosArrayDestroy(dataBlocks);
}

View File

@ -33,11 +33,16 @@ typedef struct SPullWindowInfo {
uint64_t groupId;
} SPullWindowInfo;
typedef struct SOpenWindowInfo {
SResultRowPosition pos;
uint64_t groupId;
} SOpenWindowInfo;
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
///*
@ -598,14 +603,14 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
int32_t startPos = 0;
int32_t numOfOutput = pSup->numOfExprs;
uint64_t groupId = pBlock->info.groupId;
SResultRow* pResult = NULL;
while (1) {
SListNode* pn = tdListGetHead(pResultRowInfo->openWindow);
SResultRowPosition* p1 = (SResultRowPosition*)pn->data;
SOpenWindowInfo* pOpenWin = (SOpenWindowInfo *)pn->data;
uint64_t groupId = pOpenWin->groupId;
SResultRowPosition* p1 = &pOpenWin->pos;
if (p->pageId == p1->pageId && p->offset == p1->offset) {
break;
}
@ -631,12 +636,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0);
int64_t prevTs = *(int64_t*)pTsKey->pData;
if (groupId == pBlock->info.groupId) {
doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey,
RESULT_ROW_END_INTERP, pSup);
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows,
numOfExprs);
@ -965,7 +973,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// prev time window not interpolation yet.
if (pInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
@ -1017,10 +1025,18 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder);
// window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
//TODO: add to open window? how to close the open windows after input blocks exhausted?
#if 0
if ((ascScan && ekey <= pBlock->info.window.ekey) ||
(!ascScan && ekey >= pBlock->info.window.skey)) {
// window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
} else if (pInfo->timeWindowInterpo) {
addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
}
#endif
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
numOfOutput);
@ -1040,20 +1056,23 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf
}
}
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult) {
SResultRowPosition pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) {
SOpenWindowInfo openWin = {0};
openWin.pos.pageId = pResult->pageId;
openWin.pos.offset = pResult->offset;
openWin.groupId = groupId;
SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
if (pn == NULL) {
tdListAppend(pResultRowInfo->openWindow, &pos);
return pos;
tdListAppend(pResultRowInfo->openWindow, &openWin);
return openWin.pos;
}
SResultRowPosition* px = (SResultRowPosition*)pn->data;
if (px->pageId != pos.pageId || px->offset != pos.offset) {
tdListAppend(pResultRowInfo->openWindow, &pos);
SOpenWindowInfo * px = (SOpenWindowInfo *)pn->data;
if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) {
tdListAppend(pResultRowInfo->openWindow, &openWin);
}
return pos;
return openWin.pos;
}
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
@ -1884,7 +1903,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo);
if (pInfo->timeWindowInterpo) {
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error;
}
@ -5157,7 +5176,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo);
if (iaInfo->timeWindowInterpo) {
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
}
initResultRowInfo(&iaInfo->binfo.resultRowInfo);
@ -5292,7 +5311,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
// prev time window not interpolation yet.
if (iaInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window
@ -5467,9 +5486,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initBasicInfo(&pIntervalInfo->binfo, pResBlock);
initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo);
if (pIntervalInfo->timeWindowInterpo) {
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo));
if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) {
goto _error;
}

View File

@ -84,6 +84,7 @@ typedef struct SUdf {
TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc;
TUdfAggMergeFunc aggMergeFunc;
TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc;
@ -271,6 +272,15 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf);
freeUdfInterBuf(&call->interBuf);
freeUdfInterBuf(&call->interBuf2);
subRsp->resultBuf = outBuf;
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
@ -309,6 +319,10 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
freeUdfInterBuf(&subRsp->resultBuf);
break;
@ -560,7 +574,11 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
// TODO: merge
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc));
}
return 0;
}