enh(query): add wal version to agg result datablock

TD-17552
This commit is contained in:
Ganlin Zhao 2022-07-22 17:39:59 +08:00
parent db1d8fcd0e
commit 68854bdea5
4 changed files with 10 additions and 0 deletions

View File

@ -175,6 +175,7 @@ typedef struct SExecTaskInfo {
int64_t owner; // if it is in execution
int32_t code;
int64_t version; // used for stream to record wal version
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
STableListInfo tableqinfoList; // this is a table list

View File

@ -1602,6 +1602,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
SSDataBlock* pBlock = pbInfo->pRes;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
// set output datablock version
pBlock->info.version = pTaskInfo->version;
blockDataCleanup(pBlock);
if (!hasDataInGroupInfo(pGroupResInfo)) {
return;

View File

@ -1176,6 +1176,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.rows = pBlock->info.rows;
pInfo->pRes->info.uid = pBlock->info.uid;
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.version = pBlock->info.version;
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupIdPre) {

View File

@ -1562,6 +1562,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
continue;
}
if (pBlock->info.type == STREAM_NORMAL) {
//set input version
pTaskInfo->version = pBlock->info.version;
}
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);