multi agg

This commit is contained in:
liuyao 2023-08-17 09:04:05 +08:00
parent 519c57cc55
commit 380e7406d4
3 changed files with 205 additions and 0 deletions

View File

@ -257,6 +257,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_FILL, QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION,

View File

@ -2189,6 +2189,8 @@ char* getStreamOpName(uint16_t opType) {
return "interval final"; return "interval final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return "interval semi"; return "interval semi";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
return "interval mid";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
return "stream fill"; return "stream fill";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:

View File

@ -3193,3 +3193,205 @@ _error:
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t numOfOutput = pSup->numOfExprs;
int32_t step = 1;
SRowBuffPos* pResPos = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 1;
uint64_t groupId = pSDataBlock->info.id.groupId;
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* tsCol = (int64_t*)pColDataInfo->pData;
int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCol);
STimeWindow nextWin = getFinalTimeWindow(ts, &pInfo->interval);
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed) ||
!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCol, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->statestore);
pResult = (SResultRow*)pResPos->pRowBuff;
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
SWinKey key = {
.ts = pResult->win.skey,
.groupId = groupId,
};
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResult(&key, pResPos, pUpdatedMap);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
key.ts = nextWin.skey;
if (pInfo->delKey.ts > key.ts) {
pInfo->delKey = key;
}
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
",maxKey %" PRId64,
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
blockDataUpdateTsWindow(pSDataBlock, 0);
// timestamp of the data is incorrect
if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64,
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
}
}
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCol, prevEndPos);
if (startPos < 0) {
break;
}
}
}
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SExprSupp* pSup = &pOperator->exprSupp;
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
setOperatorCompleted(pOperator);
clearFunctionContext(&pOperator->exprSupp);
clearStreamIntervalOperator(pInfo);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType));
return NULL;
} else {
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
}
if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4096, POINTER_BYTES);
}
if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = tSimpleHashInit(4096, hashFn);
}
while (1) {
if (isTaskKilled(pTaskInfo)) {
if (pInfo->pUpdated != NULL) {
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
}
if (pInfo->pUpdatedMap != NULL) {
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
}
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
pInfo->numOfDatapack);
pInfo->numOfDatapack = 0;
break;
}
pInfo->numOfDatapack++;
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
removeResults(delWins, pInfo->pUpdatedMap);
taosArrayAddAll(pInfo->pDelWins, delWins);
taosArrayDestroy(delWins);
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_CLEAR) {
pInfo->pDelRes->info.type = STREAM_CLEAR;
} else {
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
}
return pInfo->pDelRes;
}
break;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
} else {
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamMidIntervalAggImpl(pOperator, pBlock, pInfo->pUpdatedMap);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
}
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
taosArrayPush(pInfo->pUpdated, pIte);
}
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
taosArraySort(pInfo->pUpdated, winPosCmprImpl);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
return buildIntervalResult(pOperator);
}