fix(stream): skip check update for scalar

This commit is contained in:
Liu Jicong 2022-11-16 18:48:08 +08:00
parent ab57d069d4
commit e2005cfa86
2 changed files with 5 additions and 3 deletions

View File

@ -996,7 +996,7 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
mDebug("mnd show subscrptions: topic %s, consumer %" PRId64 "cgroup %s vgid %d", varDataVal(topic),
mDebug("mnd show subscriptions: topic %s, consumer %" PRId64 " cgroup %s vgid %d", varDataVal(topic),
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
// offset
@ -1044,7 +1044,7 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, NULL, true);
mDebug("mnd show subscrptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup),
mDebug("mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup),
pVgEp->vgId);
// offset

View File

@ -1980,7 +1980,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
if (pBlock != NULL) {
calBlockTbName(pInfo, pBlock);
if (pInfo->pUpdateInfo) {
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
}
qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
return pBlock;
}