Merge pull request #15308 from taosdata/feat/TD-17552
enh(query): add wal version to agg result datablock
This commit is contained in:
commit
39b768153f
|
@ -176,6 +176,7 @@ typedef struct SExecTaskInfo {
|
|||
int64_t owner; // if it is in execution
|
||||
int32_t code;
|
||||
|
||||
int64_t version; // used for stream to record wal version
|
||||
SStreamTaskInfo streamInfo;
|
||||
SSchemaInfo schemaInfo;
|
||||
STableListInfo tableqinfoList; // this is a table list
|
||||
|
|
|
@ -1602,6 +1602,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
SSDataBlock* pBlock = pbInfo->pRes;
|
||||
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||
|
||||
// set output datablock version
|
||||
pBlock->info.version = pTaskInfo->version;
|
||||
|
||||
blockDataCleanup(pBlock);
|
||||
if (!hasDataInGroupInfo(pGroupResInfo)) {
|
||||
return;
|
||||
|
|
|
@ -1084,6 +1084,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
pInfo->pRes->info.rows = pBlock->info.rows;
|
||||
pInfo->pRes->info.uid = pBlock->info.uid;
|
||||
pInfo->pRes->info.type = STREAM_NORMAL;
|
||||
pInfo->pRes->info.version = pBlock->info.version;
|
||||
|
||||
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||
if (groupIdPre) {
|
||||
|
|
|
@ -1560,6 +1560,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pBlock->info.type == STREAM_NORMAL) {
|
||||
//set input version
|
||||
pTaskInfo->version = pBlock->info.version;
|
||||
}
|
||||
|
||||
if (pInfo->scalarSupp.pExprInfo != NULL) {
|
||||
SExprSupp* pExprSup = &pInfo->scalarSupp;
|
||||
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||
|
|
Loading…
Reference in New Issue