feat(stream):stream interval delete data
This commit is contained in:
parent
5b8441f413
commit
b7bf2bf808
|
@ -51,6 +51,13 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
|
|||
|
||||
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
|
||||
|
||||
#define START_TS_COLUMN_INDEX 0
|
||||
#define END_TS_COLUMN_INDEX 1
|
||||
#define UID_COLUMN_INDEX 2
|
||||
#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX
|
||||
#define DELETE_GROUPID_COLUMN_INDEX 2
|
||||
|
||||
|
||||
enum {
|
||||
// when this task starts to execute, this status will set
|
||||
TASK_NOT_COMPLETED = 0x1u,
|
||||
|
@ -364,6 +371,8 @@ typedef struct SStreamBlockScanInfo {
|
|||
int32_t scanWinIndex; // for state operator
|
||||
int32_t pullDataResIndex;
|
||||
SSDataBlock* pPullDataRes; // pull data SSDataBlock
|
||||
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
|
||||
int32_t deleteDataIndex;
|
||||
} SStreamBlockScanInfo;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
|
@ -429,6 +438,10 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
bool invertible;
|
||||
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
|
||||
bool ignoreExpiredData;
|
||||
SArray* pRecycledPages;
|
||||
SArray* pDelWins; // SWinRes
|
||||
int32_t delIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
} SIntervalAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamFinalIntervalOperatorInfo {
|
||||
|
@ -451,6 +464,10 @@ typedef struct SStreamFinalIntervalOperatorInfo {
|
|||
int32_t pullIndex;
|
||||
SSDataBlock* pPullDataRes;
|
||||
bool ignoreExpiredData;
|
||||
SArray* pRecycledPages;
|
||||
SArray* pDelWins; // SWinRes
|
||||
int32_t delIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
} SStreamFinalIntervalOperatorInfo;
|
||||
|
||||
typedef struct SAggOperatorInfo {
|
||||
|
|
|
@ -807,18 +807,38 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
|
|||
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
||||
}
|
||||
|
||||
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
|
||||
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
|
||||
if (groupId) {
|
||||
return *groupId;
|
||||
}
|
||||
return 0;
|
||||
/* Todo(liuyao) for partition by column
|
||||
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
|
||||
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
|
||||
uint64_t resId = 0;
|
||||
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
|
||||
if (groupId) {
|
||||
return *groupId;
|
||||
} else if (len != 0) {
|
||||
resId = calcGroupId(pTableScanInfo->keyBuf, len);
|
||||
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
|
||||
}
|
||||
return resId;
|
||||
*/
|
||||
}
|
||||
|
||||
static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
|
||||
ASSERT(rowIndex < pBlock->info.rows);
|
||||
switch (pBlock->info.type)
|
||||
{
|
||||
case STREAM_DELETE_DATA:
|
||||
case STREAM_RETRIEVE: {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
|
||||
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
|
||||
pInfo->groupId = groupCol[rowIndex];
|
||||
}
|
||||
break;
|
||||
case STREAM_DELETE_DATA:
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -840,14 +860,14 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
|
|||
int64_t gap = pInfo->sessionSup.gap;
|
||||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pCurWin =
|
||||
getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
|
||||
getSessionTimeWindow(pAggSup, tsCols[*pRowIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
|
||||
win = pCurWin->win;
|
||||
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
|
||||
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
|
||||
} else {
|
||||
win =
|
||||
getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL);
|
||||
setGroupId(pInfo, pSDB, 2, *pRowIndex);
|
||||
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL,
|
||||
getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
|
||||
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
||||
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL,
|
||||
TSDB_ORDER_ASC);
|
||||
}
|
||||
needRead = true;
|
||||
|
@ -891,27 +911,6 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow
|
|||
dest->info.rows++;
|
||||
}
|
||||
|
||||
static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t rowId) {
|
||||
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||
if (groupId) {
|
||||
return *groupId;
|
||||
}
|
||||
return 0;
|
||||
/* Todo(liuyao) for partition by column
|
||||
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
|
||||
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
|
||||
uint64_t resId = 0;
|
||||
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
|
||||
if (groupId) {
|
||||
return *groupId;
|
||||
} else if (len != 0) {
|
||||
resId = calcGroupId(pTableScanInfo->keyBuf, len);
|
||||
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
|
||||
}
|
||||
return resId;
|
||||
*/
|
||||
}
|
||||
|
||||
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||
while (1) {
|
||||
SSDataBlock* pResult = NULL;
|
||||
|
@ -935,7 +934,7 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
|
|||
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
|
||||
blockDataCleanup(pResult);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
|
||||
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock->info.uid);
|
||||
if (id == pInfo->groupId) {
|
||||
copyOneRow(pResult, pBlock, i);
|
||||
}
|
||||
|
@ -944,6 +943,40 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
|
|||
*/
|
||||
}
|
||||
|
||||
static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) {
|
||||
if (pDelBlock->info.rows == 0) {
|
||||
return;
|
||||
}
|
||||
blockDataCleanup(pUpdateRes);
|
||||
blockDataEnsureCapacity(pUpdateRes, 64);
|
||||
ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3);
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||
SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
|
||||
|
||||
SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, DELETE_GROUPID_COLUMN_INDEX);
|
||||
for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows &&
|
||||
i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) {
|
||||
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
|
||||
for (TSKEY startTs = startData[i]; startTs <= endData[i]; ) {
|
||||
colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false);
|
||||
colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false);
|
||||
pUpdateRes->info.rows++;
|
||||
startTs = taosTimeAdd(startTs, pInfo->interval.interval, pInfo->interval.intervalUnit, pInfo->interval.precision);
|
||||
}
|
||||
pInfo->deleteDataIndex++;
|
||||
}
|
||||
|
||||
if (pInfo->deleteDataIndex > 0 && pInfo->deleteDataIndex == pDelBlock->info.rows) {
|
||||
blockDataCleanup(pDelBlock);
|
||||
pInfo->deleteDataIndex = 0;
|
||||
}
|
||||
}
|
||||
|
||||
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
|
||||
blockDataCleanup(pUpdateBlock);
|
||||
int32_t size = taosArrayGetSize(pInfo->tsArray);
|
||||
|
@ -953,11 +986,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
|
|||
blockDataEnsureCapacity(pUpdateBlock, size);
|
||||
|
||||
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
|
||||
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
|
||||
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
|
||||
int32_t i = 0;
|
||||
for (; i < size; i++) {
|
||||
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
|
||||
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
|
||||
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
|
||||
if (pInfo->groupId != id) {
|
||||
break;
|
||||
}
|
||||
|
@ -974,28 +1007,32 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
|
|||
if (size > 0 && pInfo->tsArrayIndex == size) {
|
||||
taosArrayClear(pInfo->tsArray);
|
||||
}
|
||||
|
||||
if (size == 0) {
|
||||
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock);
|
||||
}
|
||||
}
|
||||
|
||||
static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
|
||||
SSDataBlock* pUpdateBlock) {
|
||||
static void checkUpdateData(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
|
||||
bool out) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
|
||||
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
|
||||
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId])) {
|
||||
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId]) && out) {
|
||||
taosArrayPush(pInfo->tsArray, &rowId);
|
||||
}
|
||||
}
|
||||
if (!pUpdateBlock) {
|
||||
taosArrayClear(pInfo->tsArray);
|
||||
return;
|
||||
}
|
||||
setUpdateData(pInfo, pBlock, pUpdateBlock);
|
||||
// Todo(liuyao) get from tsdb
|
||||
// SSDataBlock* p = createOneDataBlock(pBlock, true);
|
||||
// p->info.type = STREAM_INVERT;
|
||||
// taosArrayClear(pInfo->tsArray);
|
||||
// return p;
|
||||
|
||||
static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t uidColIndex) {
|
||||
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex);
|
||||
uint64_t* uidCol = (uint64_t*)pColDataInfo->pData;
|
||||
ASSERT(pBlock->info.rows > 0);
|
||||
for (int32_t i = 0 ; i < pBlock->info.rows; i++) {
|
||||
uidCol[i] = getGroupId(pOperator, uidCol[i]);
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||
|
@ -1020,13 +1057,29 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
int32_t current = pInfo->validBlockIndex++;
|
||||
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
|
||||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
if (pBlock->info.type == STREAM_RETRIEVE) {
|
||||
switch (pBlock->info.type) {
|
||||
case STREAM_RETRIEVE:{
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
|
||||
copyDataBlock(pInfo->pPullDataRes, pBlock);
|
||||
pInfo->pullDataResIndex = 0;
|
||||
prepareDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
|
||||
prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
|
||||
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
|
||||
}
|
||||
break;
|
||||
case STREAM_DELETE_DATA: {
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
|
||||
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes);
|
||||
pInfo->updateResIndex = 0;
|
||||
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
|
||||
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
|
||||
return pInfo->pUpdateRes;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return pBlock;
|
||||
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
|
@ -1043,39 +1096,33 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
|
||||
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
|
||||
if (pSDB != NULL) {
|
||||
getUpdateDataBlock(pInfo, true, pSDB, NULL);
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
pSDB->info.type = STREAM_PULL_DATA;
|
||||
return pSDB;
|
||||
}
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
} else {
|
||||
if (isStateWindow(pInfo)) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
|
||||
if (!prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
}
|
||||
}
|
||||
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
|
||||
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
|
||||
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
if (pSDB == NULL) {
|
||||
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
|
||||
if (pInfo->pUpdateRes->info.rows > 0) {
|
||||
if (!isStateWindow(pInfo)) {
|
||||
// Todo(liuyao) mybe can delete this.
|
||||
bool test = prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
ASSERT(test == false);
|
||||
}
|
||||
return pInfo->pUpdateRes;
|
||||
} else {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
}
|
||||
} else {
|
||||
if (pSDB) {
|
||||
pSDB->info.type = STREAM_NORMAL;
|
||||
getUpdateDataBlock(pInfo, true, pSDB, NULL);
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
return pSDB;
|
||||
}
|
||||
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
|
||||
if (pInfo->pUpdateRes->info.rows > 0) {
|
||||
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
return pInfo->pUpdateRes;
|
||||
}
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
} else if (isStateWindow(pInfo)) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
|
||||
if (prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
|
||||
ASSERT(pInfo->pUpdateRes->info.rows == 0);
|
||||
// return empty data blcok
|
||||
return pInfo->pUpdateRes;
|
||||
}
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
}
|
||||
|
||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
|
@ -1169,7 +1216,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
pOperator->status = OP_EXEC_DONE;
|
||||
} else if (pInfo->pUpdateInfo) {
|
||||
pInfo->tsArrayIndex = 0;
|
||||
getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
|
||||
checkUpdateData(pInfo, true, pInfo->pRes, true);
|
||||
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
|
||||
if (pInfo->pUpdateRes->info.rows > 0) {
|
||||
if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
|
||||
pInfo->updateResIndex = 0;
|
||||
|
@ -1180,9 +1228,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||
|
||||
} else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
|
||||
// check reader last status
|
||||
// if not match, reset status
|
||||
|
@ -1295,6 +1341,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->groupId = 0;
|
||||
pInfo->pPullDataRes = createPullDataBlock();
|
||||
pInfo->pStreamScanOp = pOperator;
|
||||
pInfo->deleteDataIndex = 0;
|
||||
pInfo->pDeleteDataRes = createPullDataBlock();
|
||||
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
|
|
|
@ -808,11 +808,31 @@ static void removeResult(SArray* pUpdated, TSKEY key) {
|
|||
static void removeResults(SArray* pWins, SArray* pUpdated) {
|
||||
int32_t size = taosArrayGetSize(pWins);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
STimeWindow* pW = taosArrayGet(pWins, i);
|
||||
removeResult(pUpdated, pW->skey);
|
||||
SWinRes* pW = taosArrayGet(pWins, i);
|
||||
removeResult(pUpdated, pW->ts);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t getWinReskey(void* data, int32_t index) {
|
||||
SArray* res = (SArray*)data;
|
||||
SWinRes* pos = taosArrayGet(res, index);
|
||||
return pos->ts;
|
||||
}
|
||||
|
||||
static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) {
|
||||
int32_t upSize = taosArrayGetSize(pUpdated);
|
||||
int32_t delSize = taosArrayGetSize(pDelWins);
|
||||
for (int32_t i = 0; i < upSize; i++) {
|
||||
SResKeyPos* pResKey = taosArrayGetP(pUpdated, i);
|
||||
int64_t key = *(int64_t*)pResKey->key;
|
||||
int32_t index = binarySearch(pDelWins, delSize, key, TSDB_ORDER_DESC, getWinReskey);
|
||||
if (index >= 0 && key == getWinReskey(pDelWins, index)) {
|
||||
taosArrayRemove(pDelWins, index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) {
|
||||
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0);
|
||||
return pSup->maxTs != INT64_MIN && ts < pSup->maxTs - pSup->waterMark;
|
||||
|
@ -1264,6 +1284,38 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t
|
|||
return true;
|
||||
}
|
||||
|
||||
bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) {
|
||||
size_t bytes = sizeof(TSKEY);
|
||||
SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
if (!p1) {
|
||||
// window has been closed
|
||||
return false;
|
||||
}
|
||||
SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId);
|
||||
// dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage);
|
||||
taosHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
return true;
|
||||
}
|
||||
|
||||
void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pUpWins, SInterval* pInterval) {
|
||||
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* tsStarts = (TSKEY*)pStartCol->pData;
|
||||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* groupIds = (uint64_t*)pGroupCol->pData;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, pInterval->precision, NULL);
|
||||
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
|
||||
if (pUpWins) {
|
||||
SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]};
|
||||
taosArrayPush(pUpWins, &winRes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t tsIndex,
|
||||
int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) {
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||
|
@ -1279,13 +1331,11 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
|
|||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, NULL);
|
||||
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
uint64_t groupId = pBlock->info.groupId;
|
||||
if (pGpDatas) {
|
||||
groupId = pGpDatas[i];
|
||||
}
|
||||
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), groupId, numOfOutput);
|
||||
uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId;
|
||||
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput);
|
||||
if (pUpWins && res) {
|
||||
taosArrayPush(pUpWins, &win);
|
||||
SWinRes winRes = {.ts = win.skey, .groupId = winGpId};
|
||||
taosArrayPush(pUpWins, &winRes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1307,8 +1357,9 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
||||
SHashObj* pPullDataMap, SArray* closeWins) {
|
||||
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
||||
SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins,
|
||||
SArray* pRecyPages, SDiskbasedBuf* pDiscBuf) {
|
||||
void* pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
||||
|
@ -1342,6 +1393,11 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
ASSERT(pRecyPages != NULL);
|
||||
taosArrayPush(pRecyPages, &pPos->pageId);
|
||||
} else {
|
||||
SFilePage* bufPage = getBufPage(pDiscBuf, pPos->pageId);
|
||||
// dBufSetBufPageRecycled(pDiscBuf, bufPage);
|
||||
}
|
||||
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
|
||||
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
|
||||
|
@ -1358,7 +1414,38 @@ static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) {
|
|||
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
|
||||
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
|
||||
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
|
||||
closeIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL);
|
||||
closeIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup,
|
||||
&pChInfo->interval, NULL, NULL, NULL, pChInfo->aggSup.pResultBuf);
|
||||
}
|
||||
}
|
||||
|
||||
static void freeAllPages(SArray* pageIds, SDiskbasedBuf* pDiskBuf) {
|
||||
int32_t size = taosArrayGetSize(pageIds);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
int32_t pageId = *(int32_t*)taosArrayGet(pageIds, i);
|
||||
SFilePage* bufPage = getBufPage(pDiskBuf, pageId);
|
||||
// dBufSetBufPageRecycled(pDiskBuf, bufPage);
|
||||
}
|
||||
taosArrayClear(pageIds);
|
||||
}
|
||||
|
||||
static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlock) {
|
||||
blockDataCleanup(pBlock);
|
||||
int32_t size = taosArrayGetSize(pWins);
|
||||
if (*index == size) {
|
||||
*index = 0;
|
||||
taosArrayClear(pWins);
|
||||
return;
|
||||
}
|
||||
blockDataEnsureCapacity(pBlock, size - *index);
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, DELETE_GROUPID_COLUMN_INDEX);
|
||||
for (int32_t i = *index; i < size; i++) {
|
||||
SWinRes* pWin = taosArrayGet(pWins, i);
|
||||
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false);
|
||||
colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&pWin->groupId, false);
|
||||
pBlock->info.rows++;
|
||||
(*index)++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1374,27 +1461,37 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex ,pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
||||
}
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
printDataBlock(pBlock, "single interval recv");
|
||||
|
||||
if (pBlock->info.type == STREAM_CLEAR) {
|
||||
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock,
|
||||
NULL);
|
||||
doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0,
|
||||
pOperator->exprSupp.numOfExprs, pBlock, NULL);
|
||||
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
|
||||
continue;
|
||||
} if (pBlock->info.type == STREAM_DELETE_DATA) {
|
||||
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
|
||||
continue;
|
||||
|
@ -1416,14 +1513,19 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated);
|
||||
}
|
||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated);
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
|
||||
&pInfo->interval, NULL, pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
||||
|
||||
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
removeDeleteResults(pUpdated, pInfo->pDelWins);
|
||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows > 0) {
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
printDataBlock(pInfo->binfo.pRes, "single interval");
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
}
|
||||
|
@ -1438,6 +1540,7 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
taosArrayDestroy(pInfo->pRecycledPages);
|
||||
}
|
||||
|
||||
void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
|
@ -1448,12 +1551,13 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosHashCleanup(pInfo->pPullDataMap);
|
||||
taosArrayDestroy(pInfo->pPullWins);
|
||||
blockDataDestroy(pInfo->pPullDataRes);
|
||||
taosArrayDestroy(pInfo->pRecycledPages);
|
||||
|
||||
if (pInfo->pChildren) {
|
||||
int32_t size = taosArrayGetSize(pInfo->pChildren);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i);
|
||||
destroyIntervalOperatorInfo(pChildOp->info, numOfOutput);
|
||||
destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput);
|
||||
taosMemoryFreeClear(pChildOp->info);
|
||||
taosMemoryFreeClear(pChildOp);
|
||||
}
|
||||
|
@ -1520,6 +1624,28 @@ void increaseTs(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
SSDataBlock* createDeleteBlock() {
|
||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
pBlock->info.hasVarCol = false;
|
||||
pBlock->info.groupId = 0;
|
||||
pBlock->info.rows = 0;
|
||||
pBlock->info.type = STREAM_DELETE_RESULT;
|
||||
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(uint64_t);
|
||||
|
||||
pBlock->pDataBlock = taosArrayInit(2, sizeof(SColumnInfoData));
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
infoData.info.bytes = sizeof(TSKEY);
|
||||
// window start ts
|
||||
taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||
|
||||
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
|
||||
infoData.info.bytes = sizeof(uint64_t);
|
||||
taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode,
|
||||
|
@ -1573,6 +1699,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
goto _error;
|
||||
}
|
||||
}
|
||||
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
|
||||
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
|
||||
pInfo->delIndex = 0;
|
||||
// pInfo->pDelRes = createDeleteBlock(); todo(liuyao) for delete
|
||||
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
|
||||
|
@ -2219,28 +2351,44 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
|
|||
}
|
||||
}
|
||||
|
||||
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray,
|
||||
int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
||||
bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) {
|
||||
int32_t bytes = sizeof(TSKEY);
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
return p1 != NULL;
|
||||
}
|
||||
|
||||
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup,
|
||||
SArray* pWinArray, int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SArray* pUpdated) {
|
||||
int32_t size = taosArrayGetSize(pWinArray);
|
||||
if (!pInfo->pChildren) {
|
||||
return;
|
||||
}
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
STimeWindow* pParentWin = taosArrayGet(pWinArray, i);
|
||||
SWinRes* pWinRes = taosArrayGet(pWinArray, i);
|
||||
SResultRow* pCurResult = NULL;
|
||||
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, pSup->pCtx, numOfOutput,
|
||||
STimeWindow ParentWin = {.skey = pWinRes->ts, .ekey = pWinRes->ts+1};
|
||||
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &ParentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
|
||||
pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
|
||||
bool find = true;
|
||||
for (int32_t j = 0; j < numOfChildren; j++) {
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j);
|
||||
SIntervalAggOperatorInfo* pChInfo = pChildOp->info;
|
||||
SExprSupp* pChildSup = &pChildOp->exprSupp;
|
||||
|
||||
if (!hasIntervalWindow(&pChInfo->aggSup, pWinRes->ts, pWinRes->groupId)) {
|
||||
continue;
|
||||
}
|
||||
find = true;
|
||||
SResultRow* pChResult = NULL;
|
||||
setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult, 0, pChildSup->pCtx,
|
||||
setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, &ParentWin, true, &pChResult, pWinRes->groupId, pChildSup->pCtx,
|
||||
pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup, pTaskInfo);
|
||||
compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo);
|
||||
}
|
||||
if (find && pUpdated) {
|
||||
saveResultRow(pCurResult, pWinRes->groupId, pUpdated);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2472,6 +2620,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
if (!IS_FINAL_OP(pInfo)) {
|
||||
// semi interval operator clear disk buffer
|
||||
clearStreamIntervalOperator(pInfo);
|
||||
} else {
|
||||
freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2497,12 +2647,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||
return pInfo->pPullDataRes;
|
||||
}
|
||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
}
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
clearSpecialDataBlock(pInfo->pUpdateRes);
|
||||
removeDeleteResults(pUpdated, pInfo->pDelWins);
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
qInfo("Stream Final Interval return data");
|
||||
break;
|
||||
|
@ -2514,7 +2671,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pBlock->info.type == STREAM_INVALID) {
|
||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||
} else if (pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
|
||||
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
|
||||
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs,
|
||||
pBlock, pUpWins);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
|
@ -2525,8 +2682,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
doClearWindows(&pChildInfo->aggSup, pChildSup, &pChildInfo->interval, pChildInfo->primaryTsIndex,
|
||||
pChildSup->numOfExprs, pBlock, NULL);
|
||||
rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs,
|
||||
pOperator->pTaskInfo);
|
||||
rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId,
|
||||
pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, NULL);
|
||||
taosArrayDestroy(pUpWins);
|
||||
continue;
|
||||
}
|
||||
|
@ -2535,11 +2692,25 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pInfo->returnUpdate = true;
|
||||
taosArrayDestroy(pUpWins);
|
||||
break;
|
||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
int32_t childIndex = getChildIndex(pBlock);
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||
SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info;
|
||||
SExprSupp* pChildSup = &pChildOp->exprSupp;
|
||||
doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval);
|
||||
rebuildIntervalWindow(pInfo, pSup, pInfo->pDelWins, pInfo->binfo.pRes->info.groupId,
|
||||
pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdated);
|
||||
continue;
|
||||
}
|
||||
removeResults(pInfo->pDelWins, pUpdated);
|
||||
break;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) {
|
||||
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
|
||||
SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow));
|
||||
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
|
||||
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
|
||||
removeResults(pUpWins, pUpdated);
|
||||
taosArrayDestroy(pUpWins);
|
||||
|
@ -2563,6 +2734,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
if (!pChildOp) {
|
||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info;
|
||||
pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||
}
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
|
||||
|
@ -2578,8 +2751,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
if (IS_FINAL_OP(pInfo)) {
|
||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
|
||||
pUpdated);
|
||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup,
|
||||
&pInfo->interval, pInfo->pPullDataMap, pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
||||
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
|
||||
}
|
||||
|
||||
|
@ -2607,6 +2780,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||
return pInfo->pPullDataRes;
|
||||
}
|
||||
|
||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
// ASSERT(false);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2680,6 +2860,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
for (int32_t i = 0; i < numOfChild; i++) {
|
||||
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
|
||||
if (pChildOp) {
|
||||
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
|
||||
pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||
continue;
|
||||
}
|
||||
|
@ -2711,6 +2893,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
|
||||
pInfo->pPullDataRes = createPullDataBlock();
|
||||
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
|
||||
// pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete
|
||||
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete
|
||||
pInfo->delIndex = 0;
|
||||
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
|
||||
|
||||
pOperator->operatorType = pPhyNode->type;
|
||||
pOperator->blocking = true;
|
||||
|
@ -2851,9 +3038,9 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
pInfo->pDelIterator = NULL;
|
||||
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||
blockDataEnsureCapacity(pInfo->pDelRes, 64);
|
||||
// pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete
|
||||
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete
|
||||
pInfo->pChildren = NULL;
|
||||
pInfo->isFinal = false;
|
||||
pInfo->pPhyNode = pPhyNode;
|
||||
|
@ -3205,6 +3392,11 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) {
|
|||
|
||||
void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
|
||||
blockDataCleanup(pBlock);
|
||||
int32_t size = taosHashGetSize(pStDeleted);
|
||||
if (size == 0) {
|
||||
return;
|
||||
}
|
||||
blockDataEnsureCapacity(pBlock, size);
|
||||
size_t keyLen = 0;
|
||||
while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
|
@ -3979,9 +4171,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||
pInfo->pDelIterator = NULL;
|
||||
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||
blockDataEnsureCapacity(pInfo->pDelRes, 64);
|
||||
// pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete
|
||||
pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete
|
||||
pInfo->pChildren = NULL;
|
||||
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
||||
|
||||
|
|
|
@ -637,6 +637,7 @@ void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
|
|||
SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
|
||||
taosMemoryFreeClear(ppi->pData);
|
||||
taosMemoryFreeClear(pNode);
|
||||
ppi->pn = NULL;
|
||||
|
||||
tdListAppend(pBuf->freePgList, &ppi);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue