fix(query): set the correct buffer to accommodate the tag values.
This commit is contained in:
parent
1fd3ad502a
commit
c17a921a03
|
@ -1186,6 +1186,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||||
|
pColumn->hasNull = false;
|
||||||
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
|
||||||
pColumn->varmeta.length = 0;
|
pColumn->varmeta.length = 0;
|
||||||
if (pColumn->varmeta.offset != NULL) {
|
if (pColumn->varmeta.offset != NULL) {
|
||||||
|
|
|
@ -921,7 +921,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo);
|
||||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
||||||
SSDataBlock* pBlock, const char* idStr);
|
SSDataBlock* pBlock, int32_t rows, const char* idStr);
|
||||||
|
|
||||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||||
|
|
|
@ -152,25 +152,27 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
pInfo->indexOfBufferedRes = 0;
|
pInfo->indexOfBufferedRes = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
|
|
||||||
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
|
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
|
||||||
SColMatchItem* pMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
SColMatchItem* pMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
|
||||||
int32_t slotId = pMatchInfo->dstSlotId;
|
int32_t slotId = pMatchInfo->dstSlotId;
|
||||||
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
|
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
|
||||||
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId);
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
|
||||||
|
|
||||||
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
|
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
|
||||||
bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
|
bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
|
||||||
colDataAppend(pDst, 0, p, isNull);
|
colDataAppend(pDst, 0, p, isNull);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
|
pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
|
||||||
pInfo->pRes->info.rows = 1;
|
pRes->info.rows = 1;
|
||||||
|
|
||||||
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
if (pInfo->pseudoExprSup.numOfExprs > 0) {
|
||||||
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
SExprSupp* pSup = &pInfo->pseudoExprSup;
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
@ -178,10 +180,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid);
|
pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid);
|
||||||
|
|
||||||
pInfo->indexOfBufferedRes += 1;
|
pInfo->indexOfBufferedRes += 1;
|
||||||
return pInfo->pRes;
|
return pRes;
|
||||||
} else {
|
} else {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -221,7 +222,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
|
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
|
||||||
|
|
||||||
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
|
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
|
||||||
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
|
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
|
|
@ -344,11 +344,11 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) {
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||||
|
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -426,7 +426,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
|
|
||||||
// NOTE: here the tag value only load for one row
|
// NOTE: here the tag value only load for one row
|
||||||
ensureBlockCapacity(pBlock, 1);
|
ensureBlockCapacity(pBlock, 1);
|
||||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
|
||||||
pCost->skipBlocks += 1;
|
pCost->skipBlocks += 1;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||||
|
@ -437,7 +437,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
ensureBlockCapacity(pBlock, 1);
|
ensureBlockCapacity(pBlock, 1);
|
||||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
|
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
|
||||||
|
@ -488,7 +488,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
|
|
||||||
ensureBlockCapacity(pBlock, pBlock->info.rows);
|
ensureBlockCapacity(pBlock, pBlock->info.rows);
|
||||||
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
||||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
|
||||||
|
|
||||||
// restore the previous value
|
// restore the previous value
|
||||||
pCost->totalRows -= pBlock->info.rows;
|
pCost->totalRows -= pBlock->info.rows;
|
||||||
|
@ -528,27 +528,30 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
||||||
SSDataBlock* pBlock, const char* idStr) {
|
SSDataBlock* pBlock, int32_t rows, const char* idStr) {
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (numOfPseudoExpr == 0) {
|
if (numOfPseudoExpr == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// backup the rows
|
||||||
|
int32_t backupRows = pBlock->info.rows;
|
||||||
|
pBlock->info.rows = rows;
|
||||||
|
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
||||||
|
metaReaderReleaseLock(&mr);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
metaReaderReleaseLock(&mr);
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
|
for (int32_t j = 0; j < numOfPseudoExpr; ++j) {
|
||||||
SExprInfo* pExpr = &pPseudoExpr[j];
|
SExprInfo* pExpr = &pPseudoExpr[j];
|
||||||
|
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
||||||
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
colInfoDataCleanup(pColInfoData, pBlock->info.rows);
|
colInfoDataCleanup(pColInfoData, pBlock->info.rows);
|
||||||
|
@ -587,6 +590,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
||||||
}
|
}
|
||||||
|
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
|
// restore the rows
|
||||||
|
pBlock->info.rows = backupRows;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -631,7 +637,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
|
|
||||||
SDataBlockInfo* pBInfo = &pBlock->info;
|
SDataBlockInfo* pBInfo = &pBlock->info;
|
||||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window);
|
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window);
|
||||||
|
|
||||||
|
@ -1167,7 +1172,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
||||||
pBlock->info.rows = rows;
|
pBlock->info.rows = rows;
|
||||||
|
|
||||||
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
||||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, rows);
|
||||||
|
|
||||||
pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid);
|
pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid);
|
||||||
}
|
}
|
||||||
|
@ -1638,7 +1643,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pInfo->numOfPseudoExpr > 0) {
|
if (pInfo->numOfPseudoExpr > 0) {
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||||
GET_TASKID(pTaskInfo));
|
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -1648,11 +1653,11 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
if (filter) {
|
if (filter) {
|
||||||
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
|
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||||
|
|
||||||
calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes);
|
calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4368,7 +4373,7 @@ static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeS
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
|
||||||
pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
|
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -4485,7 +4490,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
|
||||||
pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
|
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -497,7 +497,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
|
static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElem = 0;
|
int32_t numOfElem = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Reference in New Issue