enhance: code refactor and operator full implement slimit
This commit is contained in:
parent
758427b3c4
commit
93ab81437f
|
@ -2512,9 +2512,11 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTaskInfo, STagScanInfo* pInfo,
|
||||
const SExprInfo* pExprInfo, const SSDataBlock* pRes, int32_t size, const char* str,
|
||||
int32_t* count, SMetaReader* mr) {
|
||||
static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STagScanInfo* pInfo = pOperator->info;
|
||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||
|
||||
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
|
||||
int32_t code = metaGetTableEntryByUid(mr, item->uid);
|
||||
tDecoderClear(&(*mr).coder);
|
||||
|
@ -2525,13 +2527,14 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTa
|
|||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
char str[512];
|
||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||
|
||||
// refactor later
|
||||
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
|
||||
STR_TO_VARSTR(str, (*mr).me.name);
|
||||
colDataSetVal(pDst, (*count), str, false);
|
||||
colDataSetVal(pDst, (count), str, false);
|
||||
} else { // it is a tag value
|
||||
STagVal val = {0};
|
||||
val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
|
||||
|
@ -2543,7 +2546,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTa
|
|||
} else {
|
||||
data = (char*)p;
|
||||
}
|
||||
colDataSetVal(pDst, (*count), data,
|
||||
colDataSetVal(pDst, (count), data,
|
||||
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
|
||||
|
||||
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
|
||||
|
@ -2552,11 +2555,6 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTa
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
(*count) += 1;
|
||||
if (++pInfo->curPos >= size) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||
|
@ -2583,9 +2581,20 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
|||
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
|
||||
|
||||
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
||||
doTagScanOneTable(pOperator, pTaskInfo, pInfo, pExprInfo, pRes, size, str, &count, &mr);
|
||||
doTagScanOneTable(pOperator, pRes, count, &mr);
|
||||
++count;
|
||||
if (++pInfo->curPos >= size) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
// each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason.
|
||||
if (pInfo->pSlimit != NULL) {
|
||||
if (pInfo->curPos < pInfo->pSlimit->offset) {
|
||||
continue;
|
||||
}
|
||||
pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
|
||||
if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2442,6 +2442,8 @@ static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) {
|
|||
if (NULL != pNode) {
|
||||
//TODO: only set the slimit now. push down slimit later
|
||||
pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit);
|
||||
((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset;
|
||||
((SLimitNode*)pTableScanNode->pSlimit)->offset = 0;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue