fix: some minor modifications
This commit is contained in:
parent
20f5e2af5b
commit
7c39bc9890
|
@ -263,6 +263,7 @@ typedef struct STagScanInfo {
|
||||||
void* pCtbCursor;
|
void* pCtbCursor;
|
||||||
SNode* pTagCond;
|
SNode* pTagCond;
|
||||||
SNode* pTagIndexCond;
|
SNode* pTagIndexCond;
|
||||||
|
SStorageAPI* pStorageAPI;
|
||||||
} STagScanInfo;
|
} STagScanInfo;
|
||||||
|
|
||||||
typedef enum EStreamScanMode {
|
typedef enum EStreamScanMode {
|
||||||
|
|
|
@ -2767,9 +2767,10 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aUidTagIdxs, void* pVnode, SStorageAPI* pAPI) {
|
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t numOfTables = taosArrayGetSize(aUidTags);
|
int32_t numOfTables = taosArrayGetSize(aUidTags);
|
||||||
|
|
||||||
STagScanFilterContext ctx = {0};
|
STagScanFilterContext ctx = {0};
|
||||||
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||||
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
|
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
|
||||||
|
@ -2777,48 +2778,42 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aU
|
||||||
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&ctx);
|
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&ctx);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
|
SSDataBlock* pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
|
||||||
if (pResBlock == NULL) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES);
|
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES);
|
||||||
taosArrayPush(pBlockList, &pResBlock);
|
taosArrayPush(pBlockList, &pResBlock);
|
||||||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||||
|
|
||||||
SScalarParam output = {0};
|
SScalarParam output = {0};
|
||||||
code = tagScanCreateResultData(&type, numOfTables, &output);
|
tagScanCreateResultData(&type, numOfTables, &output);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
|
|
||||||
}
|
scalarCalculate(pTagCond, pBlockList, &output);
|
||||||
|
|
||||||
code = scalarCalculate(pTagCond, pBlockList, &output);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
}
|
|
||||||
|
|
||||||
bool* result = (bool*)output.columnData->pData;
|
bool* result = (bool*)output.columnData->pData;
|
||||||
for (int32_t i = 0 ; i < numOfTables; ++i) {
|
for (int32_t i = 0 ; i < numOfTables; ++i) {
|
||||||
if (result[i]) {
|
if (result[i]) {
|
||||||
taosArrayPush(aUidTagIdxs, &i);
|
taosArrayPush(aFilterIdxs, &i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(ctx.colHash);
|
|
||||||
taosArrayDestroy(ctx.cInfoList);
|
|
||||||
blockDataDestroy(pResBlock);
|
|
||||||
taosArrayDestroy(pBlockList);
|
|
||||||
colDataDestroy(output.columnData);
|
colDataDestroy(output.columnData);
|
||||||
taosMemoryFreeClear(output.columnData);
|
taosMemoryFreeClear(output.columnData);
|
||||||
|
|
||||||
|
blockDataDestroy(pResBlock);
|
||||||
|
taosArrayDestroy(pBlockList);
|
||||||
|
|
||||||
|
taosHashCleanup(ctx.colHash);
|
||||||
|
taosArrayDestroy(ctx.cInfoList);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
|
static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
|
||||||
if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname
|
if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname
|
||||||
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(str, "zsl");
|
|
||||||
// if (pUidTagInfo->name != NULL) {
|
// if (pUidTagInfo->name != NULL) {
|
||||||
// STR_TO_VARSTR(str, pUidTagInfo->name);
|
// STR_TO_VARSTR(str, pUidTagInfo->name);
|
||||||
// } else { // name is not retrieved during filter
|
// } else { // name is not retrieved during filter
|
||||||
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
|
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
|
||||||
// }
|
// }
|
||||||
|
STR_TO_VARSTR(str, "zsl");
|
||||||
|
|
||||||
colDataSetVal(pColInfo, rowIndex, str, false);
|
colDataSetVal(pColInfo, rowIndex, str, false);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2846,13 +2841,15 @@ static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aUidTagIdxs,
|
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs,
|
||||||
SStorageAPI* pAPI) {
|
SStorageAPI* pAPI) {
|
||||||
STagScanInfo* pInfo = pOperator->info;
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(aUidTagIdxs); ++i) {
|
size_t szTables = taosArrayGetSize(aFilterIdxs);
|
||||||
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, *(int32_t*)taosArrayGet(aUidTagIdxs, i));
|
for (int i = 0; i < szTables; ++i) {
|
||||||
|
int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i);
|
||||||
|
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx);
|
||||||
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||||
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
||||||
|
@ -2920,8 +2917,10 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pCtbCursor == NULL) {
|
if (pInfo->pCtbCursor == NULL) {
|
||||||
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
|
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
|
SArray* aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
|
||||||
SArray* aUidTagIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
|
SArray* aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
while (count < pOperator->resultInfo.capacity) {
|
while (count < pOperator->resultInfo.capacity) {
|
||||||
SMCtbCursor* pCur = pInfo->pCtbCursor;
|
SMCtbCursor* pCur = pInfo->pCtbCursor;
|
||||||
|
@ -2936,20 +2935,26 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numTables = taosArrayGetSize(aUidTags);
|
int32_t numTables = taosArrayGetSize(aUidTags);
|
||||||
if (numTables != 0 && pInfo->pTagCond != NULL) {
|
if (numTables == 0) {
|
||||||
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aUidTagIdxs, pAPI);
|
|
||||||
}
|
|
||||||
tagScanFillResultBlock(pOperator, pRes, aUidTags, aUidTagIdxs, pAPI);
|
|
||||||
count = taosArrayGetSize(aUidTagIdxs);
|
|
||||||
|
|
||||||
if (taosArrayGetSize(aUidTagIdxs) != 0) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
|
|
||||||
taosArrayClear(aUidTagIdxs);
|
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aFilterIdxs, pAPI);
|
||||||
|
|
||||||
|
tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, pAPI);
|
||||||
|
count = taosArrayGetSize(aFilterIdxs);
|
||||||
|
|
||||||
|
if (count != 0) {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
taosArrayDestroy(aUidTagIdxs);
|
|
||||||
|
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
|
||||||
|
taosArrayClear(aFilterIdxs);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(aFilterIdxs);
|
||||||
taosArrayDestroyEx(aUidTags, tagScanFreeUidTag);
|
taosArrayDestroyEx(aUidTags, tagScanFreeUidTag);
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += count;
|
pOperator->resultInfo.totalRows += count;
|
||||||
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -3012,6 +3017,9 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
static void destroyTagScanOperatorInfo(void* param) {
|
static void destroyTagScanOperatorInfo(void* param) {
|
||||||
STagScanInfo* pInfo = (STagScanInfo*)param;
|
STagScanInfo* pInfo = (STagScanInfo*)param;
|
||||||
|
if (pInfo->pCtbCursor != NULL) {
|
||||||
|
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1);
|
||||||
|
}
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||||
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
|
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
|
||||||
|
@ -3043,6 +3051,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
pInfo->pTagCond = pTagCond;
|
pInfo->pTagCond = pTagCond;
|
||||||
pInfo->pTagIndexCond = pTagIndexCond;
|
pInfo->pTagIndexCond = pTagIndexCond;
|
||||||
pInfo->pTableListInfo = pTableListInfo;
|
pInfo->pTableListInfo = pTableListInfo;
|
||||||
|
pInfo->pStorageAPI = &pTaskInfo->storageAPI;
|
||||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||||
pInfo->readHandle = *pReadHandle;
|
pInfo->readHandle = *pReadHandle;
|
||||||
pInfo->curPos = 0;
|
pInfo->curPos = 0;
|
||||||
|
|
Loading…
Reference in New Issue