set datablock table name

This commit is contained in:
jiajingbin 2023-11-23 19:52:32 +08:00 committed by 54liuyao
parent 7bfc64b8db
commit 9d9a88b3ed
1 changed files with 21 additions and 15 deletions

View File

@ -1525,6 +1525,17 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
return TSDB_CODE_SUCCESS;
}
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
blockDataCleanup(pInfo->pCreateTbRes);
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
pBlock->info.parTbName[0] = 0;
} else {
appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore);
}
}
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
blockDataCleanup(pDestBlock);
int32_t rows = pSrcBlock->info.rows;
@ -1549,15 +1560,21 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
uint64_t srcUid = srcUidData[i];
uint64_t groupId = srcGp[i];
char* tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
if (groupId == 0) {
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
}
if (pInfo->tbnameCalSup.pExprInfo) {
void* parTbname = NULL;
pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
if (code != TSDB_CODE_SUCCESS) {
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver);
printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo));
calBlockTbName(pInfo, pPreRes);
memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName));
} else {
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
}
varDataSetLen(tbname, strlen(varDataVal(tbname)));
pInfo->stateStore.streamStateFreeVal(parTbname);
}
@ -1583,17 +1600,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return code;
}
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
blockDataCleanup(pInfo->pCreateTbRes);
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
pBlock->info.parTbName[0] = 0;
} else {
appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore);
}
}
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);