feat(stream): session delete

This commit is contained in:
54liuyao 2022-07-06 20:18:50 +08:00
parent a6545b600c
commit 888f79d3f4
4 changed files with 349 additions and 77 deletions

View File

@ -319,8 +319,9 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER,
STREAM_SCAN_FROM_DATAREADER, // todo(liuyao) delete it
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode;
typedef struct SCatchSupporter {
@ -612,6 +613,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes; // delete result
bool returnDelete;
SSDataBlock* pUpdateRes; // update window
SHashObj* pStDeleted;
void* pDelIterator;
@ -889,6 +891,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);

View File

@ -792,13 +792,20 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
}
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
}
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
}
static bool isIntervalWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
}
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
if (groupId) {
@ -836,6 +843,49 @@ static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t
}
}
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
pTableScanInfo->cond.twindows[0] = *pWin;
pTableScanInfo->curTWinIdx = 0;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
}
static bool prepareRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
if ((*pRowIndex) == pBlock->info.rows) {
return false;
}
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex)++;
for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
if (win.skey == startData[*pRowIndex]) {
win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
continue;
}
if (win.skey == endData[*pRowIndex]) {
win.skey = TMIN(win.skey, startData[*pRowIndex]);
continue;
}
ASSERT( (win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
( isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0) ) );
break;
}
resetTableScanInfo(pInfo->pSnapshotReadOp->info, &win);
return true;
}
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = {
.skey = INT64_MIN,
@ -854,6 +904,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[*pRowIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win;
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else {
win =
@ -878,15 +929,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
if (!needRead) {
return false;
}
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0;
// tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
resetTableScanInfo(pInfo->pSnapshotReadOp->info, &win);
return true;
}
@ -903,6 +946,26 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow
dest->info.rows++;
}
static SSDataBlock* doRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pSnapshotReadOp);
if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
// scan next window data
pResult = doTableScan(pInfo->pSnapshotReadOp);
}
if (!pResult) {
blockDataCleanup(pSDB);
*pRowIndex = 0;
return NULL;
}
if (pResult->info.groupId == pInfo->groupId) {
return pResult;
}
}
}
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) {
SSDataBlock* pResult = NULL;
@ -935,7 +998,7 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
*/
}
static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) {
static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) {
if (pDelBlock->info.rows == 0) {
return;
}
@ -950,7 +1013,7 @@ static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBl
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, DELETE_GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows &&
i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
@ -969,6 +1032,40 @@ static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBl
}
}
static void generateScanRange(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) {
if (pBlock->info.rows == 0) {
return;
}
blockDataCleanup(pUpdateRes);
blockDataEnsureCapacity(pUpdateRes, pBlock->info.rows);
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pDestStartCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
int32_t dummy = 0;
for (int32_t i = 0 ; i < pBlock->info.rows; i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
//gap must be 0.
SResultWindowInfo* pStartWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
if (!pStartWin) {
// window has been closed.
continue;
}
SResultWindowInfo* pEndWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
ASSERT(pEndWin);
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
pUpdateRes->info.rows++;
}
}
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
blockDataCleanup(pUpdateBlock);
int32_t size = taosArrayGetSize(pInfo->tsArray);
@ -1001,7 +1098,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
}
if (size == 0) {
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock);
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock);
}
}
@ -1061,11 +1158,17 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
break;
case STREAM_DELETE_DATA: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes);
pInfo->updateResIndex = 0;
if (isIntervalWindow(pInfo)) {
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes);
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else {
generateScanRange(pInfo, pBlock, pInfo->pSnapshotReadOp, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
}
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
return pInfo->pUpdateRes;
}
@ -1080,8 +1183,10 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
return pInfo->pRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
if (!isStateWindow(pInfo)) {
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
}
return pInfo->pUpdateRes;
@ -1106,11 +1211,19 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return pInfo->pUpdateRes;
}
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE) {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
pSDB->info.type = STREAM_NORMAL;
checkUpdateData(pInfo, true, pSDB, false);
return pSDB;
}
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
if (prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
ASSERT(pInfo->pUpdateRes->info.rows == 0);
blockDataCleanup(pInfo->pUpdateRes);
// return empty data blcok
return pInfo->pUpdateRes;
}

View File

@ -1323,13 +1323,13 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
}
}
static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t tsIndex,
static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval,
int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) {
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsCols = (TSKEY*)pTsCol->pData;
uint64_t* pGpDatas = NULL;
if (pBlock->info.type == STREAM_RETRIEVE) {
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, 2);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
pGpDatas = (uint64_t*)pGpCol->pData;
}
int32_t step = 0;
@ -1492,7 +1492,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
printDataBlock(pBlock, "single interval recv");
if (pBlock->info.type == STREAM_CLEAR) {
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0,
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval,
pOperator->exprSupp.numOfExprs, pBlock, NULL);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue;
@ -1710,7 +1710,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
pInfo->delIndex = 0;
// pInfo->pDelRes = createDeleteBlock(); todo(liuyao) for delete
// pInfo->pDelRes = createPullDataBlock(); todo(liuyao) for delete
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete
@ -2571,13 +2571,13 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock));
for (; (*pIndex) < size; (*pIndex)++) {
SPullWindowInfo* pWin = taosArrayGet(array, (*pIndex));
SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pStartTs, pBlock->info.rows, (const char*)&pWin->window.skey, false);
SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, 1);
SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
colDataAppend(pEndTs, pBlock->info.rows, (const char*)&pWin->window.ekey, false);
SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, 2);
SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
colDataAppend(pGroupId, pBlock->info.rows, (const char*)&pWin->groupId, false);
pBlock->info.rows++;
}
@ -2589,9 +2589,9 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
}
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, 2);
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
@ -2680,7 +2680,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs,
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs,
pBlock, pUpWins);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
@ -2688,7 +2688,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info;
SExprSupp* pChildSup = &pChildOp->exprSupp;
doClearWindows(&pChildInfo->aggSup, pChildSup, &pChildInfo->interval, pChildInfo->primaryTsIndex,
doClearWindows(&pChildInfo->aggSup, pChildSup, &pChildInfo->interval,
pChildSup->numOfExprs, pBlock, NULL);
rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId,
pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, NULL);
@ -2719,7 +2719,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
removeResults(pUpWins, pUpdated);
taosArrayDestroy(pUpWins);
if (taosArrayGetSize(pUpdated) > 0) {
@ -2901,7 +2901,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createPullDataBlock();
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
// pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete
// pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete
pInfo->delIndex = 0;
@ -3046,13 +3046,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL;
// pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete
// pInfo->pDelRes = createPullDataBlock();
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete
pInfo->pChildren = NULL;
pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
pInfo->returnDelete = false;
pOperator->name = "StreamSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
@ -3087,15 +3088,23 @@ int64_t getSessionWindowEndkey(void* data, int32_t index) {
SResultWindowInfo* pWin = taosArrayGet(pWinInfos, index);
return pWin->win.ekey;
}
static bool isInWindow(SResultWindowInfo* pWin, TSKEY ts, int64_t gap) {
int64_t sGap = ts - pWin->win.skey;
int64_t eGap = pWin->win.ekey - ts;
if ((sGap < 0 && sGap >= -gap) || (eGap < 0 && eGap >= -gap) || (sGap >= 0 && eGap >= 0)) {
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
int64_t sGap = ts - pWin->skey + gap;
int64_t eGap = pWin->ekey - ts + gap;
// if ((sGap < 0 && sGap >= -gap) || (eGap < 0 && eGap >= -gap) || (sGap >= 0 && eGap >= 0)) {
// return true;
// }
if (sGap >= 0 && eGap >= 0) {
return true;
}
return false;
}
bool isInWindow(SResultWindowInfo* pWinInfo, TSKEY ts, int64_t gap) {
return isInTimeWindow(&pWinInfo->win, ts, gap);
}
static SResultWindowInfo* insertNewSessionWindow(SArray* pWinInfos, TSKEY ts, int32_t index) {
SResultWindowInfo win = {.pos.offset = -1, .pos.pageId = -1, .win.skey = ts, .win.ekey = ts, .isOutput = false};
return taosArrayInsert(pWinInfos, index, &win);
@ -3118,6 +3127,41 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) {
return pWinInfos;
}
// don't add new window
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex) {
SArray* pWinInfos = getWinInfos(pAggSup, groupId);
pAggSup->pCurWins = pWinInfos;
int32_t size = taosArrayGetSize(pWinInfos);
if (size == 0) {
return NULL;
}
// find the first position which is smaller than the key
int32_t index = binarySearch(pWinInfos, size, startTs, TSDB_ORDER_DESC, getSessionWindowEndkey);
SResultWindowInfo* pWin = NULL;
if (index >= 0) {
pWin = taosArrayGet(pWinInfos, index);
if (isInWindow(pWin, startTs, gap)) {
*pIndex = index;
return pWin;
}
}
if (index + 1 < size) {
pWin = taosArrayGet(pWinInfos, index + 1);
if (isInWindow(pWin, startTs, gap)) {
*pIndex = index + 1;
return pWin;
} else if (endTs != INT64_MIN && isInWindow(pWin, endTs, gap)) {
*pIndex = index;
return pWin;
}
}
return NULL;
}
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
int64_t gap, int32_t* pIndex) {
SArray* pWinInfos = getWinInfos(pAggSup, groupId);
@ -3358,6 +3402,34 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
}
void deleteWindow(SArray* pWinInfos, int32_t index) {
ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos));
taosArrayRemove(pWinInfos, index);
}
static void doDeleteSessionWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endDatas = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* gpDatas = (uint64_t*)pGroupCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
int32_t winIndex = 0;
while(1) {
SResultWindowInfo* pCurWin =
getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], gap, &winIndex);
if (!pCurWin) {
break;
}
deleteWindow(pAggSup->pCurWins, winIndex);
if (result) {
taosArrayPush(result, pCurWin);
}
}
}
}
static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, SSDataBlock* pBlock, int32_t tsIndex,
int32_t numOfOutput, int64_t gap, SArray* result) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
@ -3366,13 +3438,14 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0;
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
ASSERT(isInWindow(pCurWin, tsCols[i], gap));
if (pCurWin->pos.pageId == -1) {
getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
if (!pCurWin || pCurWin->pos.pageId == -1) {
// window has been closed.
step = 1;
continue;
}
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
ASSERT(isInWindow(pCurWin, tsCols[i], gap));
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput);
if (result) {
taosArrayPush(result, pCurWin);
@ -3407,7 +3480,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
blockDataEnsureCapacity(pBlock, size);
size_t keyLen = 0;
while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pColInfoData, pBlock->info.rows, *Ite, false);
for (int32_t i = 1; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
@ -3495,7 +3568,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup,
pSeWin->isOutput = true;
}
if (delete) {
taosArrayRemove(pWins, i);
deleteWindow(pWins, i);
i--;
size = taosArrayGetSize(pWins);
}
@ -3535,6 +3608,14 @@ int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_
return TSDB_CODE_SUCCESS;
}
static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) {
int32_t size = taosArrayGetSize(pResWins);
for (int32_t i = 0; i < size; i++) {
SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i);
taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &pWinInfo->win.skey, sizeof(TSKEY));
}
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
@ -3570,17 +3651,32 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs,
pInfo->gap, pWins);
0, pWins);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, 0, pChildOp->exprSupp.numOfExprs,
pChildInfo->gap, NULL);
0, NULL);
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
}
taosArrayDestroy(pWins);
continue;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
// gap must be 0
doDeleteSessionWindows(&pInfo->streamAggSup, pBlock, 0, pWins);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
// gap must be 0
doDeleteSessionWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL);
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
}
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
taosArrayDestroy(pWins);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForSession);
continue;
@ -3664,26 +3760,29 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, "Semi Session");
return pBInfo->pRes;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
pInfo->returnDelete = true;
printDataBlock(pInfo->pDelRes, "Semi Session");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) {
// semi interval operator clear disk buffer
clearStreamSessionOperator(pInfo);
return NULL;
}
if (pInfo->pUpdateRes->info.rows > 0) {
// process the rest of the data
pOperator->status = OP_OPENED;
printDataBlock(pInfo->pUpdateRes, "Semi Session");
return pInfo->pUpdateRes;
}
printDataBlock(pBInfo->pRes, "Semi Session");
return pBInfo->pRes;
// semi interval operator clear disk buffer
clearStreamSessionOperator(pInfo);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -3699,11 +3798,17 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, pInfo->gap, pWins);
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, 0, pWins);
removeSessionResults(pStUpdated, pWins);
taosArrayDestroy(pWins);
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
break;
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
// gap must be 0
doDeleteSessionWindows(&pInfo->streamAggSup, pBlock, 0, NULL);
copyDataBlock(pInfo->pDelRes, pBlock);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
break;
} else if (pBlock->info.type == STREAM_GET_ALL) {
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForSession);
continue;
@ -3728,24 +3833,29 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, "Semi Session");
return pBInfo->pRes;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
pInfo->returnDelete = true;
printDataBlock(pInfo->pDelRes, "Semi Session");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) {
return NULL;
}
if (pInfo->pUpdateRes->info.rows > 0) {
// process the rest of the data
pOperator->status = OP_OPENED;
printDataBlock(pInfo->pUpdateRes, "Semi Session");
return pInfo->pUpdateRes;
}
printDataBlock(pBInfo->pRes, "Semi Session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
pOperator->status = OP_EXEC_DONE;
return NULL;
}
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
@ -3971,11 +4081,6 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, S
return rows - start;
}
void deleteWindow(SArray* pWinInfos, int32_t index) {
ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos));
taosArrayRemove(pWinInfos, index);
}
static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int32_t tsIndex, SColumn* pCol,
int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) {
SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
@ -4179,7 +4284,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
pInfo->pDelIterator = NULL;
// pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete
// pInfo->pDelRes = createPullDataBlock(); // todo(liuyao) for delete
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete
pInfo->pChildren = NULL;

View File

@ -449,4 +449,53 @@ if $data26 != 14 then
return -1
endi
sql create database test1 vgroups 1
sql show databases
print $data00 $data01 $data02
sql use test1
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
sql create stream streams2 trigger at_once into streamt1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
sql insert into t1 values(1648791212000,2,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,4,1.0,2);
$loop_count = 0
loop5:
sleep 300
sql select * from streamt1 order by c desc;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop5
endi
if $data01 != 1 then
print =====data01=$data01
goto loop5
endi
if $data05 != 4 then
print =====data05=$data05
goto loop5
endi
if $data11 != 1 then
print =====data11=$data11
goto loop5
endi
if $data15 != 3 then
print =====data15=$data15
goto loop5
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT