refactor(tmq): rewrite tq read function

This commit is contained in:
Liu Jicong 2022-04-21 14:30:37 +08:00
parent a98bf9d1b5
commit 3dd3ad1e05
3 changed files with 129 additions and 138 deletions

View File

@ -109,8 +109,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows);
// need to reposition

View File

@ -82,16 +82,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false;
}
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
// currently only rows are used
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
return 0;
}
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, int16_t* pGroupId, int32_t* pNumOfRows) {
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
int32_t sversion = 0;

View File

@ -30,13 +30,12 @@
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "vnode.h"
#include "ttypes.h"
#include "vnode.h"
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
SWITCH_ORDER(pCtx[i].order);
@ -189,13 +188,13 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
taosMemoryFreeClear(pBlock->pBlockAgg);
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
@ -249,8 +248,8 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
doFilter(pTableScanInfo->pFilterNode, pBlock);
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
}
return TSDB_CODE_SUCCESS;
@ -376,9 +375,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return p;
}
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag,
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock,
SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime,
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition,
SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0);
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
@ -558,18 +558,18 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->readerHandle)) {
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
terrno = pTaskInfo->code;
pOperator->status = OP_EXEC_DONE;
SArray* pCols = NULL;
uint64_t groupId;
int32_t numOfRows;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows);
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
pTaskInfo->code = code;
return NULL;
}
if (pBlockInfo->rows == 0) {
break;
}
SArray* pCols = tqRetrieveDataBlock(pInfo->readerHandle);
pInfo->pRes->info.groupId = groupId;
pInfo->pRes->info.rows = numOfRows;
int32_t numOfCols = pInfo->pRes->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
@ -618,7 +618,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
}
}
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {