Merge pull request #15644 from taosdata/feature/TD-17811
feat(stream):optimize stream update scan
This commit is contained in:
commit
a01ebbbf45
|
@ -34,11 +34,16 @@ typedef struct SUpdateInfo {
|
||||||
TSKEY minTS;
|
TSKEY minTS;
|
||||||
SScalableBf* pCloseWinSBF;
|
SScalableBf* pCloseWinSBF;
|
||||||
SHashObj* pMap;
|
SHashObj* pMap;
|
||||||
|
STimeWindow scanWindow;
|
||||||
|
uint64_t scanGroupId;
|
||||||
|
uint64_t maxVersion;
|
||||||
} SUpdateInfo;
|
} SUpdateInfo;
|
||||||
|
|
||||||
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
|
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
|
||||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
||||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||||
|
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
|
||||||
|
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
|
||||||
void updateInfoDestroy(SUpdateInfo *pInfo);
|
void updateInfoDestroy(SUpdateInfo *pInfo);
|
||||||
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
||||||
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
|
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
|
||||||
|
|
|
@ -1231,9 +1231,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
||||||
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
||||||
}
|
}
|
||||||
|
|
||||||
dst->info.rows = src->info.rows;
|
dst->info = src->info;
|
||||||
dst->info.window = src->info.window;
|
|
||||||
dst->info.type = src->info.type;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1708,9 +1706,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
||||||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
int32_t rows = pDataBlock->info.rows;
|
int32_t rows = pDataBlock->info.rows;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d\n", flag,
|
len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d|version:%" PRIu64 "\n", flag,
|
||||||
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
|
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
|
||||||
pDataBlock->info.uid, pDataBlock->info.rows);
|
pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version);
|
||||||
if (len >= size - 1) return dumpBuf;
|
if (len >= size - 1) return dumpBuf;
|
||||||
|
|
||||||
for (int32_t j = 0; j < rows; j++) {
|
for (int32_t j = 0; j < rows; j++) {
|
||||||
|
|
|
@ -137,6 +137,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTa
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||||
void *tsdbGetIdx(SMeta *pMeta);
|
void *tsdbGetIdx(SMeta *pMeta);
|
||||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||||
|
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
||||||
|
|
||||||
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
|
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
|
||||||
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
||||||
|
|
|
@ -2502,6 +2502,10 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
|
||||||
return metaGetIvtIdx(pMeta);
|
return metaGetIvtIdx(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t getReaderMaxVersion(STsdbReader *pReader) {
|
||||||
|
return pReader->verRange.maxVer;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Get all suids since suid
|
* @brief Get all suids since suid
|
||||||
*
|
*
|
||||||
|
|
|
@ -437,6 +437,7 @@ typedef struct SessionWindowSupporter {
|
||||||
SStreamAggSupporter* pStreamAggSup;
|
SStreamAggSupporter* pStreamAggSup;
|
||||||
int64_t gap;
|
int64_t gap;
|
||||||
uint8_t parentType;
|
uint8_t parentType;
|
||||||
|
SAggSupporter* pIntervalAggSup;
|
||||||
} SessionWindowSupporter;
|
} SessionWindowSupporter;
|
||||||
|
|
||||||
typedef struct STimeWindowSupp {
|
typedef struct STimeWindowSupp {
|
||||||
|
@ -1009,6 +1010,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
|
||||||
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
|
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
|
||||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||||
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
||||||
|
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
|
||||||
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
|
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
|
||||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||||
|
|
||||||
|
|
|
@ -1131,7 +1131,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
||||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
|
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
|
||||||
// must check update info first.
|
// must check update info first.
|
||||||
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
|
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
|
||||||
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup))) && out) {
|
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup) &&
|
||||||
|
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) {
|
||||||
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
|
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1337,6 +1338,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
|
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
|
||||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||||
if (pSDB) {
|
if (pSDB) {
|
||||||
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
|
uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
|
||||||
|
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version);
|
||||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||||
checkUpdateData(pInfo, true, pSDB, false);
|
checkUpdateData(pInfo, true, pSDB, false);
|
||||||
return pSDB;
|
return pSDB;
|
||||||
|
@ -1390,6 +1394,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
setBlockIntoRes(pInfo, &block);
|
setBlockIntoRes(pInfo, &block);
|
||||||
|
|
||||||
|
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) {
|
||||||
|
printDataBlock(pInfo->pRes, "stream scan ignore");
|
||||||
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlockInfo->rows > 0) {
|
if (pBlockInfo->rows > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1406,6 +1416,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
// record the scan action.
|
// record the scan action.
|
||||||
pInfo->numOfExec++;
|
pInfo->numOfExec++;
|
||||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||||
|
printDataBlock(pInfo->pRes, "stream scan");
|
||||||
|
|
||||||
if (pBlockInfo->rows == 0) {
|
if (pBlockInfo->rows == 0) {
|
||||||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
||||||
|
|
|
@ -1456,6 +1456,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
|
||||||
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
||||||
SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages,
|
SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages,
|
||||||
SDiskbasedBuf* pDiscBuf) {
|
SDiskbasedBuf* pDiscBuf) {
|
||||||
|
qDebug("===stream===close interval window");
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
||||||
|
@ -1772,10 +1773,11 @@ SSDataBlock* createDeleteBlock() {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type) {
|
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type, SAggSupporter* pSup) {
|
||||||
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
||||||
SStreamScanInfo* pScanInfo = downstream->info;
|
SStreamScanInfo* pScanInfo = downstream->info;
|
||||||
pScanInfo->sessionSup.parentType = type;
|
pScanInfo->sessionSup.parentType = type;
|
||||||
|
pScanInfo->sessionSup.pIntervalAggSup = pSup;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
@ -1851,7 +1853,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
|
|
||||||
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
|
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
|
||||||
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL);
|
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -3111,7 +3113,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||||
initIntervalDownStream(downstream, pPhyNode->type);
|
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup);
|
||||||
}
|
}
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -125,6 +125,9 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
|
||||||
pInfo->pCloseWinSBF = NULL;
|
pInfo->pCloseWinSBF = NULL;
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
|
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
|
||||||
|
pInfo->maxVersion = 0;
|
||||||
|
pInfo->scanGroupId = 0;
|
||||||
|
pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
return pInfo;
|
return pInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,15 +188,36 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ts < pInfo->minTS) {
|
if (ts < pInfo->minTS) {
|
||||||
|
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
|
||||||
return true;
|
return true;
|
||||||
} else if (res == TSDB_CODE_SUCCESS) {
|
} else if (res == TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
qDebug("===stream===bucket:%d, tableId:%" PRIu64 ", maxTs:" PRIu64 ", maxMapTs:" PRIu64 ", ts:%" PRIu64, index, tableId, maxTs, *pMapMaxTs, ts);
|
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
|
||||||
// check from tsdb api
|
// check from tsdb api
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
|
||||||
|
qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
||||||
|
pInfo->scanWindow = *pWin;
|
||||||
|
pInfo->scanGroupId = groupId;
|
||||||
|
pInfo->maxVersion = version;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
|
||||||
|
if (!pInfo) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
||||||
|
if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey &&
|
||||||
|
pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) {
|
||||||
|
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void updateInfoDestroy(SUpdateInfo *pInfo) {
|
void updateInfoDestroy(SUpdateInfo *pInfo) {
|
||||||
if (pInfo == NULL) {
|
if (pInfo == NULL) {
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Reference in New Issue