Merge pull request #13716 from taosdata/feature/TD-16435
feat(stream): stream max delay
This commit is contained in:
commit
3097bf2af8
|
@ -50,6 +50,7 @@ typedef enum EStreamType {
|
||||||
STREAM_INVERT,
|
STREAM_INVERT,
|
||||||
STREAM_REPROCESS,
|
STREAM_REPROCESS,
|
||||||
STREAM_INVALID,
|
STREAM_INVALID,
|
||||||
|
STREAM_GET_ALL,
|
||||||
} EStreamType;
|
} EStreamType;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -1496,6 +1496,7 @@ typedef struct {
|
||||||
|
|
||||||
#define STREAM_TRIGGER_AT_ONCE 1
|
#define STREAM_TRIGGER_AT_ONCE 1
|
||||||
#define STREAM_TRIGGER_WINDOW_CLOSE 2
|
#define STREAM_TRIGGER_WINDOW_CLOSE 2
|
||||||
|
#define STREAM_TRIGGER_MAX_DELAY 3
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
|
|
|
@ -167,6 +167,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
pTask->dispatchType = TASK_DISPATCH__FIXED;
|
||||||
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
|
||||||
|
@ -178,7 +179,6 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
|
||||||
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
|
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
|
||||||
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
|
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -750,14 +750,15 @@ int64_t getReskey(void* data, int32_t index) {
|
||||||
return *(int64_t*)pos->key;
|
return *(int64_t*)pos->key;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
|
static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId,
|
||||||
|
SArray* pUpdated) {
|
||||||
int32_t size = taosArrayGetSize(pUpdated);
|
int32_t size = taosArrayGetSize(pUpdated);
|
||||||
int32_t index = binarySearch(pUpdated, size, result->win.skey, TSDB_ORDER_DESC, getReskey);
|
int32_t index = binarySearch(pUpdated, size, ts, TSDB_ORDER_DESC, getReskey);
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
index = 0;
|
index = 0;
|
||||||
} else {
|
} else {
|
||||||
TSKEY resTs = getReskey(pUpdated, index);
|
TSKEY resTs = getReskey(pUpdated, index);
|
||||||
if (resTs < result->win.skey) {
|
if (resTs < ts) {
|
||||||
index++;
|
index++;
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -769,14 +770,18 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
newPos->groupId = groupId;
|
newPos->groupId = groupId;
|
||||||
newPos->pos = (SResultRowPosition){.pageId = result->pageId, .offset = result->offset};
|
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
|
||||||
*(int64_t*)newPos->key = result->win.skey;
|
*(int64_t*)newPos->key = ts;
|
||||||
if (taosArrayInsert(pUpdated, index, &newPos) == NULL) {
|
if (taosArrayInsert(pUpdated, index, &newPos) == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
|
||||||
|
return saveResult(result->win.skey, result->pageId, result->offset, groupId, pUpdated);
|
||||||
|
}
|
||||||
|
|
||||||
static void removeResult(SArray* pUpdated, TSKEY key) {
|
static void removeResult(SArray* pUpdated, TSKEY key) {
|
||||||
int32_t size = taosArrayGetSize(pUpdated);
|
int32_t size = taosArrayGetSize(pUpdated);
|
||||||
int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey);
|
int32_t index = binarySearch(pUpdated, size, key, TSDB_ORDER_DESC, getReskey);
|
||||||
|
@ -819,7 +824,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
|
|
||||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||||
saveResult(pResult, tableGroupId, pUpdated);
|
saveResultRow(pResult, tableGroupId, pUpdated);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -869,7 +874,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
|
|
||||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||||
saveResult(pResult, tableGroupId, pUpdated);
|
saveResultRow(pResult, tableGroupId, pUpdated);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1243,6 +1248,23 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, SInterva
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
|
||||||
|
void* pIte = NULL;
|
||||||
|
size_t keyLen = 0;
|
||||||
|
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
||||||
|
void* key = taosHashGetKey(pIte, &keyLen);
|
||||||
|
uint64_t groupId = *(uint64_t*)key;
|
||||||
|
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
|
||||||
|
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
||||||
|
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
|
||||||
|
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
||||||
SArray* closeWins) {
|
SArray* closeWins) {
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
|
@ -1259,16 +1281,12 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
||||||
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
|
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
|
||||||
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
|
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
|
||||||
taosHashRemove(pHashMap, keyBuf, keyLen);
|
taosHashRemove(pHashMap, keyBuf, keyLen);
|
||||||
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
|
||||||
if (pos == NULL) {
|
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
int32_t code = saveResult(ts, pPos->pageId, pPos->offset, groupId, closeWins);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
pos->groupId = groupId;
|
|
||||||
pos->pos = *(SResultRowPosition*)pIte;
|
|
||||||
*(int64_t*)pos->key = ts;
|
|
||||||
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && !taosArrayPush(closeWins, &pos)) {
|
|
||||||
taosMemoryFree(pos);
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1296,8 +1314,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||||
SArray* pClosed = taosArrayInit(4, POINTER_BYTES);
|
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
|
@ -1316,19 +1332,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL);
|
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, 0, pOperator->numOfExprs, pBlock, NULL);
|
||||||
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
|
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
|
||||||
continue;
|
continue;
|
||||||
|
} else if (pBlock->info.type == STREAM_GET_ALL &&
|
||||||
|
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
|
||||||
|
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
|
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
|
||||||
}
|
}
|
||||||
|
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
|
||||||
|
|
||||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
|
|
||||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
|
|
||||||
taosArrayAddAll(pUpdated, pClosed);
|
|
||||||
|
|
||||||
taosArrayDestroy(pClosed);
|
|
||||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
|
@ -1898,7 +1913,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
||||||
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
|
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
|
||||||
TSDB_ORDER_ASC);
|
TSDB_ORDER_ASC);
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
|
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
|
||||||
saveResult(pResult, tableGroupId, pUpdated);
|
saveResultRow(pResult, tableGroupId, pUpdated);
|
||||||
}
|
}
|
||||||
// window start(end) key interpolation
|
// window start(end) key interpolation
|
||||||
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
|
||||||
|
@ -1993,7 +2008,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||||
SArray* pClosed = taosArrayInit(4, POINTER_BYTES);
|
|
||||||
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2042,7 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
|
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
|
||||||
taosArrayDestroy(pUpWins);
|
taosArrayDestroy(pUpWins);
|
||||||
break;
|
break;
|
||||||
|
} else if (pBlock->info.type == STREAM_GET_ALL && isFinalInterval(pInfo) &&
|
||||||
|
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_MAX_DELAY) {
|
||||||
|
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isFinalInterval(pInfo)) {
|
if (isFinalInterval(pInfo)) {
|
||||||
int32_t chIndex = getChildIndex(pBlock);
|
int32_t chIndex = getChildIndex(pBlock);
|
||||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||||
|
@ -2064,13 +2083,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isFinalInterval(pInfo)) {
|
if (isFinalInterval(pInfo)) {
|
||||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed);
|
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated);
|
||||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset);
|
|
||||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
|
||||||
taosArrayAddAll(pUpdated, pClosed);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
taosArrayDestroy(pClosed);
|
|
||||||
|
|
||||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
|
|
Loading…
Reference in New Issue