Merge pull request #19904 from taosdata/fix/liaohj
fix(query): add limit/offset for sys table scan operator.
This commit is contained in:
commit
a93fc323c0
|
@ -65,7 +65,7 @@ typedef struct SSysTableScanInfo {
|
|||
SSDataBlock* pRes;
|
||||
int64_t numOfBlocks; // extract basic running information.
|
||||
SLoadRemoteDataInfo loadInfo;
|
||||
|
||||
SLimitInfo limitInfo;
|
||||
int32_t tbnameSlotId;
|
||||
} SSysTableScanInfo;
|
||||
|
||||
|
@ -133,7 +133,6 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac
|
|||
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName);
|
||||
static void destroySysScanOperator(void* param);
|
||||
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code);
|
||||
static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo);
|
||||
static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse);
|
||||
|
||||
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
|
||||
|
@ -351,7 +350,7 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
|
|||
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode);
|
||||
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
|
||||
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
|
||||
const char* name, SSDataBlock* pBlock);
|
||||
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
|
||||
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
|
||||
|
@ -556,7 +555,7 @@ void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfR
|
|||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false);
|
||||
doFilterResult(pInfo->pRes, pFilterInfo);
|
||||
doFilter(pInfo->pRes, pFilterInfo, NULL);
|
||||
blockDataCleanup(dataBlock);
|
||||
}
|
||||
|
||||
|
@ -975,7 +974,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
|
||||
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
|
||||
blockDataCleanup(p);
|
||||
numOfRows = 0;
|
||||
|
@ -991,7 +990,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
|
|||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
|
||||
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
|
||||
blockDataCleanup(p);
|
||||
numOfRows = 0;
|
||||
|
@ -1152,7 +1151,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
|
||||
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
|
||||
blockDataCleanup(p);
|
||||
numOfRows = 0;
|
||||
|
@ -1168,7 +1167,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
|
|||
pInfo->pRes->info.rows = numOfRows;
|
||||
|
||||
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
|
||||
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
|
||||
blockDataCleanup(p);
|
||||
numOfRows = 0;
|
||||
|
@ -1199,7 +1198,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
|
|||
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
|
||||
if (pInfo->readHandle.mnd != NULL) {
|
||||
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
|
||||
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
|
@ -1324,6 +1323,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
getDBNameFromCondition(pInfo->pCondition, dbName);
|
||||
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
|
||||
pBlock = sysTableScanUserTables(pOperator);
|
||||
|
@ -1336,30 +1336,37 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
|||
pBlock = sysTableScanFromMNode(pOperator, pInfo, name, pTaskInfo);
|
||||
}
|
||||
|
||||
return sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
|
||||
}
|
||||
|
||||
static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo,
|
||||
const char* name, SSDataBlock* pBlock) {
|
||||
sysTableScanFillTbName(pOperator, pInfo, name, pBlock);
|
||||
if (pBlock != NULL) {
|
||||
if (pInfo->tbnameSlotId != -1) {
|
||||
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
|
||||
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
|
||||
memcpy(varDataVal(varTbName), name, strlen(name));
|
||||
varDataSetLen(varTbName, strlen(name));
|
||||
for (int i = 0; i < pBlock->info.rows; ++i) {
|
||||
colDataAppend(pColumnInfoData, i, varTbName, NULL);
|
||||
}
|
||||
doFilterResult(pBlock, pOperator->exprSupp.pFilterInfo);
|
||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pBlock, pTaskInfo);
|
||||
if (limitReached) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
}
|
||||
if (pBlock && pBlock->info.rows != 0) {
|
||||
return pBlock;
|
||||
|
||||
return pBlock->info.rows > 0? pBlock:NULL;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
|
||||
SSDataBlock* pBlock) {
|
||||
if (pBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pInfo->tbnameSlotId != -1) {
|
||||
SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId);
|
||||
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
|
||||
memcpy(varDataVal(varTbName), name, strlen(name));
|
||||
varDataSetLen(varTbName, strlen(name));
|
||||
|
||||
colDataAppendNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows);
|
||||
}
|
||||
|
||||
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
}
|
||||
|
||||
static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -1423,7 +1430,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
|
|||
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
|
||||
|
||||
// todo log the filter info
|
||||
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
taosMemoryFree(pRsp);
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
return pInfo->pRes;
|
||||
|
@ -1457,13 +1464,13 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
|||
pInfo->sysInfo = pScanPhyNode->sysInfo;
|
||||
pInfo->showRewrite = pScanPhyNode->showRewrite;
|
||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
|
||||
pInfo->pCondition = pScanNode->node.pConditions;
|
||||
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initLimitInfo(pScanPhyNode->scan.node.pLimit, pScanPhyNode->scan.node.pSlimit, &pInfo->limitInfo);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
|
@ -1556,15 +1563,6 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo) {
|
||||
if (pFilterInfo == NULL) {
|
||||
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
|
||||
}
|
||||
|
||||
doFilter(pDataBlock, pFilterInfo, NULL);
|
||||
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
|
||||
}
|
||||
|
||||
static int32_t sysChkFilter__Comm(SNode* pNode) {
|
||||
// impl
|
||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
|
|
|
@ -186,4 +186,10 @@ if $data11 != 4 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print ===============================================> TS-2613
|
||||
sql select * from information_schema.ins_databases limit 1 offset 1;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue