enhance: tag scan code refactoring
This commit is contained in:
parent
646e52ae7f
commit
57d1957dee
|
@ -263,6 +263,8 @@ typedef struct STagScanInfo {
|
||||||
void* pCtbCursor;
|
void* pCtbCursor;
|
||||||
SNode* pTagCond;
|
SNode* pTagCond;
|
||||||
SNode* pTagIndexCond;
|
SNode* pTagIndexCond;
|
||||||
|
SArray* aUidTags; // SArray<STUidTagInfo>
|
||||||
|
SArray* aFilterIdxs; // SArray<int32_t>
|
||||||
SStorageAPI* pStorageAPI;
|
SStorageAPI* pStorageAPI;
|
||||||
} STagScanInfo;
|
} STagScanInfo;
|
||||||
|
|
||||||
|
|
|
@ -2813,7 +2813,7 @@ static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo
|
||||||
// } else { // name is not retrieved during filter
|
// } else { // name is not retrieved during filter
|
||||||
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
|
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
|
||||||
// }
|
// }
|
||||||
STR_TO_VARSTR(str, "zsl");
|
STR_TO_VARSTR(str, "ctbidx");
|
||||||
|
|
||||||
colDataSetVal(pColInfo, rowIndex, str, false);
|
colDataSetVal(pColInfo, rowIndex, str, false);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2841,66 +2841,32 @@ static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs,
|
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs, bool ignoreFilterIdx,
|
||||||
SStorageAPI* pAPI) {
|
SStorageAPI* pAPI) {
|
||||||
STagScanInfo* pInfo = pOperator->info;
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||||
|
if (!ignoreFilterIdx) {
|
||||||
size_t szTables = taosArrayGetSize(aFilterIdxs);
|
size_t szTables = taosArrayGetSize(aFilterIdxs);
|
||||||
for (int i = 0; i < szTables; ++i) {
|
for (int i = 0; i < szTables; ++i) {
|
||||||
int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i);
|
int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i);
|
||||||
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx);
|
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx);
|
||||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||||
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
return 0;
|
size_t szTables = taosArrayGetSize(aUidTags);
|
||||||
}
|
for (int i = 0; i < szTables; ++i) {
|
||||||
|
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i);
|
||||||
#if 0
|
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||||
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags,
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||||
SStorageAPI* pAPI) {
|
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
||||||
STagScanInfo* pInfo = pOperator->info;
|
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
|
||||||
|
|
||||||
int32_t nTbls = taosArrayGetSize(aUidTags);
|
|
||||||
for (int i = 0; i < nTbls; ++i) {
|
|
||||||
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i);
|
|
||||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
|
||||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
|
||||||
|
|
||||||
// refactor later
|
|
||||||
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
|
|
||||||
char str[512];
|
|
||||||
|
|
||||||
STR_TO_VARSTR(str, "zsl");
|
|
||||||
colDataSetVal(pDst, (i), str, false);
|
|
||||||
} else { // it is a tag value
|
|
||||||
STagVal val = {0};
|
|
||||||
val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
|
|
||||||
const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pDst->info.type, &val);
|
|
||||||
|
|
||||||
char* data = NULL;
|
|
||||||
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
|
|
||||||
data = tTagValToData((const STagVal*)p, false);
|
|
||||||
} else {
|
|
||||||
data = (char*)p;
|
|
||||||
}
|
|
||||||
colDataSetVal(pDst, i, data,
|
|
||||||
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
|
|
||||||
|
|
||||||
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
|
|
||||||
data != NULL) {
|
|
||||||
taosMemoryFree(data);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
|
|
||||||
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
@ -2912,16 +2878,19 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
||||||
STagScanInfo* pInfo = pOperator->info;
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
int32_t count = 0;
|
|
||||||
|
|
||||||
if (pInfo->pCtbCursor == NULL) {
|
if (pInfo->pCtbCursor == NULL) {
|
||||||
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
|
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
|
SArray* aUidTags = pInfo->aUidTags;
|
||||||
SArray* aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
|
SArray* aFilterIdxs = pInfo->aFilterIdxs;
|
||||||
|
int32_t count = 0;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
|
||||||
|
taosArrayClear(aFilterIdxs);
|
||||||
|
|
||||||
int32_t numTables = 0;
|
int32_t numTables = 0;
|
||||||
while (numTables < pOperator->resultInfo.capacity) {
|
while (numTables < pOperator->resultInfo.capacity) {
|
||||||
SMCtbCursor* pCur = pInfo->pCtbCursor;
|
SMCtbCursor* pCur = pInfo->pCtbCursor;
|
||||||
|
@ -2939,34 +2908,29 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
||||||
if (numTables == 0) {
|
if (numTables == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
bool ignoreFilterIdx = true;
|
||||||
if (pInfo->pTagCond != NULL) {
|
if (pInfo->pTagCond != NULL) {
|
||||||
|
ignoreFilterIdx = false;
|
||||||
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI);
|
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI);
|
||||||
} else {
|
} else {
|
||||||
for (int i = 0; i < numTables; ++i) {
|
ignoreFilterIdx = true;
|
||||||
taosArrayPush(aFilterIdxs, &i);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, pAPI);
|
tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, ignoreFilterIdx, pAPI);
|
||||||
count = taosArrayGetSize(aFilterIdxs);
|
|
||||||
|
count = ignoreFilterIdx ? taosArrayGetSize(aUidTags): taosArrayGetSize(aFilterIdxs);
|
||||||
|
|
||||||
if (count != 0) {
|
if (count != 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
|
|
||||||
taosArrayClear(aFilterIdxs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(aFilterIdxs);
|
|
||||||
taosArrayDestroyEx(aUidTags, tagScanFreeUidTag);
|
|
||||||
|
|
||||||
pRes->info.rows = count;
|
pRes->info.rows = count;
|
||||||
pOperator->resultInfo.totalRows += count;
|
pOperator->resultInfo.totalRows += count;
|
||||||
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -3027,6 +2991,10 @@ static void destroyTagScanOperatorInfo(void* param) {
|
||||||
if (pInfo->pCtbCursor != NULL) {
|
if (pInfo->pCtbCursor != NULL) {
|
||||||
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1);
|
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pInfo->aFilterIdxs);
|
||||||
|
taosArrayDestroyEx(pInfo->aUidTags, tagScanFreeUidTag);
|
||||||
|
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||||
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
|
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
|
||||||
|
@ -3072,7 +3040,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
__optr_fn_t tagScanNextFn = (pPhyNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScan;
|
if (pPhyNode->onlyMetaCtbIdx) {
|
||||||
|
pInfo->aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
|
||||||
|
pInfo->aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
|
||||||
|
}
|
||||||
|
__optr_fn_t tagScanNextFn = (pPhyNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
|
createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue