|
|
|
@ -851,23 +851,34 @@ static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t g
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) {
|
|
|
|
|
SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
|
|
|
|
if (newPos == NULL) {
|
|
|
|
|
return TSDB_CODE_OUT_OF_MEMORY;
|
|
|
|
|
}
|
|
|
|
|
newPos->groupId = groupId;
|
|
|
|
|
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
|
|
|
|
|
*(int64_t*)newPos->key = ts;
|
|
|
|
|
SWinRes key = {.ts = ts, .groupId = groupId};
|
|
|
|
|
if (taosHashPut(pUpdatedMap, &key, sizeof(SWinRes), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
taosMemoryFree(newPos);
|
|
|
|
|
}
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
|
|
|
|
|
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
|
|
|
|
|
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void removeResult(SArray* pUpdated, SWinRes* pKey) {
|
|
|
|
|
int32_t size = taosArrayGetSize(pUpdated);
|
|
|
|
|
int32_t index = binarySearchCom(pUpdated, size, pKey, TSDB_ORDER_DESC, compareResKey);
|
|
|
|
|
if (index >= 0 && 0 == compareResKey(pKey, pUpdated, index)) {
|
|
|
|
|
taosArrayRemove(pUpdated, index);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void removeResults(SArray* pWins, SArray* pUpdated) {
|
|
|
|
|
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
|
|
|
|
|
int32_t size = taosArrayGetSize(pWins);
|
|
|
|
|
for (int32_t i = 0; i < size; i++) {
|
|
|
|
|
SWinRes* pW = taosArrayGet(pWins, i);
|
|
|
|
|
removeResult(pUpdated, pW);
|
|
|
|
|
taosHashRemove(pUpdatedMap, pW, sizeof(SWinRes));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -894,11 +905,14 @@ int32_t compareWinRes(void* pKey, void* data, int32_t index) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) {
|
|
|
|
|
int32_t upSize = taosArrayGetSize(pUpdated);
|
|
|
|
|
static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
|
|
|
|
if (!pUpdatedMap || taosHashGetSize(pUpdatedMap) == 0) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
int32_t delSize = taosArrayGetSize(pDelWins);
|
|
|
|
|
for (int32_t i = 0; i < upSize; i++) {
|
|
|
|
|
SResKeyPos* pResKey = taosArrayGetP(pUpdated, i);
|
|
|
|
|
void* pIte = NULL;
|
|
|
|
|
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
|
|
|
|
|
SResKeyPos* pResKey = (SResKeyPos*)pIte;
|
|
|
|
|
int32_t index = binarySearchCom(pDelWins, delSize, pResKey, TSDB_ORDER_DESC, compareWinRes);
|
|
|
|
|
if (index >= 0 && 0 == compareWinRes(pResKey, pDelWins, index)) {
|
|
|
|
|
taosArrayRemove(pDelWins, index);
|
|
|
|
@ -914,7 +928,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) {
|
|
|
|
|
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) { return isOverdue(pWin->ekey, pSup); }
|
|
|
|
|
|
|
|
|
|
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
|
|
|
|
|
int32_t scanFlag, SArray* pUpdated) {
|
|
|
|
|
int32_t scanFlag, SHashObj* pUpdatedMap) {
|
|
|
|
|
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
|
|
|
|
|
|
|
|
|
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
|
|
|
@ -940,7 +954,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
|
|
|
|
saveResultRow(pResult, tableGroupId, pUpdated);
|
|
|
|
|
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
|
|
|
|
|
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -997,7 +1011,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
|
|
|
|
saveResultRow(pResult, tableGroupId, pUpdated);
|
|
|
|
|
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
|
|
|
|
|
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1437,7 +1451,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
|
|
|
|
|
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SHashObj* resWins) {
|
|
|
|
|
void* pIte = NULL;
|
|
|
|
|
size_t keyLen = 0;
|
|
|
|
|
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
|
|
|
@ -1446,7 +1460,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
|
|
|
|
|
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
|
|
|
|
|
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
|
|
|
|
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
|
|
|
|
|
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
|
|
|
|
|
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
@ -1455,7 +1469,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
|
|
|
|
SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages,
|
|
|
|
|
SHashObj* pPullDataMap, SHashObj* closeWins, SArray* pRecyPages,
|
|
|
|
|
SDiskbasedBuf* pDiscBuf) {
|
|
|
|
|
qDebug("===stream===close interval window");
|
|
|
|
|
void* pIte = NULL;
|
|
|
|
@ -1487,7 +1501,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
|
|
|
|
}
|
|
|
|
|
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
|
|
|
|
|
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
|
|
|
|
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
|
|
|
|
|
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
@ -1577,11 +1591,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
|
|
|
|
|
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
|
|
|
|
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
|
|
|
|
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
|
|
|
|
while (1) {
|
|
|
|
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
|
|
|
|
if (pBlock == NULL) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
// qInfo("===stream===%ld", pBlock->info.version);
|
|
|
|
|
printDataBlock(pBlock, "single interval recv");
|
|
|
|
|
|
|
|
|
|
if (pBlock->info.type == STREAM_CLEAR) {
|
|
|
|
@ -1594,7 +1611,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval);
|
|
|
|
|
continue;
|
|
|
|
|
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
|
|
|
|
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
|
|
|
|
|
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1617,17 +1634,24 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
|
|
|
|
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
|
|
|
|
|
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pOperator->status = OP_RES_TO_RETURN;
|
|
|
|
|
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated,
|
|
|
|
|
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
|
|
|
|
|
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
|
|
|
|
|
|
|
|
|
void* pIte = NULL;
|
|
|
|
|
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
|
|
|
|
|
taosArrayPush(pUpdated, pIte);
|
|
|
|
|
}
|
|
|
|
|
taosHashCleanup(pUpdatedMap);
|
|
|
|
|
taosArraySort(pUpdated, resultrowComparAsc);
|
|
|
|
|
|
|
|
|
|
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
|
|
|
|
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
|
|
|
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
|
|
|
|
removeDeleteResults(pUpdated, pInfo->pDelWins);
|
|
|
|
|
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
|
|
|
|
|
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
|
|
|
|
if (pInfo->pDelRes->info.rows > 0) {
|
|
|
|
|
return pInfo->pDelRes;
|
|
|
|
@ -2867,7 +2891,7 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t tableGroupId,
|
|
|
|
|
SArray* pUpdated) {
|
|
|
|
|
SHashObj* pUpdatedMap) {
|
|
|
|
|
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
|
|
|
|
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
|
|
|
|
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
|
|
|
@ -2949,8 +2973,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
|
|
|
|
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
|
|
|
|
|
NULL, TSDB_ORDER_ASC);
|
|
|
|
|
}
|
|
|
|
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
|
|
|
|
|
saveResultRow(pResult, tableGroupId, pUpdated);
|
|
|
|
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
|
|
|
|
|
saveWinResultRow(pResult, tableGroupId, pUpdatedMap);
|
|
|
|
|
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
|
|
|
|
|
}
|
|
|
|
|
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
|
|
|
@ -3056,6 +3080,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
|
|
|
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
|
|
|
|
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
|
|
|
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
|
|
|
|
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
|
|
|
|
|
TSKEY maxTs = INT64_MIN;
|
|
|
|
|
|
|
|
|
|
SExprSupp* pSup = &pOperator->exprSupp;
|
|
|
|
@ -3113,7 +3139,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
|
|
|
|
if (pBlock == NULL) {
|
|
|
|
|
clearSpecialDataBlock(pInfo->pUpdateRes);
|
|
|
|
|
removeDeleteResults(pUpdated, pInfo->pDelWins);
|
|
|
|
|
removeDeleteResults(pUpdatedMap, pInfo->pDelWins);
|
|
|
|
|
pOperator->status = OP_RES_TO_RETURN;
|
|
|
|
|
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
|
|
|
|
break;
|
|
|
|
@ -3140,7 +3166,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
taosArrayDestroy(pUpWins);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
removeResults(pUpWins, pUpdated);
|
|
|
|
|
removeResults(pUpWins, pUpdatedMap);
|
|
|
|
|
copyDataBlock(pInfo->pUpdateRes, pBlock);
|
|
|
|
|
// copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
|
|
|
|
|
pInfo->returnUpdate = true;
|
|
|
|
@ -3158,15 +3184,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdated);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
removeResults(pInfo->pDelWins, pUpdated);
|
|
|
|
|
removeResults(pInfo->pDelWins, pUpdatedMap);
|
|
|
|
|
break;
|
|
|
|
|
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
|
|
|
|
|
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
|
|
|
|
|
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
|
|
|
|
|
continue;
|
|
|
|
|
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
|
|
|
|
|
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
|
|
|
|
|
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
|
|
|
|
|
removeResults(pUpWins, pUpdated);
|
|
|
|
|
removeResults(pUpWins, pUpdatedMap);
|
|
|
|
|
taosArrayDestroy(pUpWins);
|
|
|
|
|
if (taosArrayGetSize(pUpdated) > 0) {
|
|
|
|
|
break;
|
|
|
|
@ -3182,7 +3208,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
|
|
|
|
}
|
|
|
|
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
|
|
|
|
|
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated);
|
|
|
|
|
doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap);
|
|
|
|
|
if (IS_FINAL_OP(pInfo)) {
|
|
|
|
|
int32_t chIndex = getChildIndex(pBlock);
|
|
|
|
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
|
|
|
@ -3207,12 +3233,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|
|
|
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
|
|
|
|
if (IS_FINAL_OP(pInfo)) {
|
|
|
|
|
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
|
|
|
|
|
pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
|
|
|
|
pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
|
|
|
|
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
|
|
|
|
|
} else {
|
|
|
|
|
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void* pIte = NULL;
|
|
|
|
|
while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
|
|
|
|
|
taosArrayPush(pUpdated, pIte);
|
|
|
|
|
}
|
|
|
|
|
taosHashCleanup(pUpdatedMap);
|
|
|
|
|
taosArraySort(pUpdated, resultrowComparAsc);
|
|
|
|
|
|
|
|
|
|
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
|
|
|
|
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
|
|
|
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
|
|
|
|