fix(query): set correct tag value during tag filter and do some internal refactor.
This commit is contained in:
parent
e46657ada2
commit
72287a3b5f
|
@ -367,11 +367,11 @@ typedef struct SSortExecInfo {
|
|||
int32_t readBytes; // read io bytes
|
||||
} SSortExecInfo;
|
||||
|
||||
typedef struct SFilterTableInfo {
|
||||
typedef struct STUidTagInfo {
|
||||
char* name;
|
||||
uint64_t uid;
|
||||
void* pTagVal;
|
||||
} SFilterTableInfo;
|
||||
} STUidTagInfo;
|
||||
|
||||
// stream special block column
|
||||
|
||||
|
|
|
@ -1546,7 +1546,10 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
|||
}
|
||||
|
||||
void colDataDestroy(SColumnInfoData* pColData) {
|
||||
if (!pColData) return;
|
||||
if (!pColData) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
|
||||
taosMemoryFreeClear(pColData->varmeta.offset);
|
||||
} else {
|
||||
|
|
|
@ -104,7 +104,7 @@ void metaReaderClear(SMetaReader *pReader);
|
|||
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||
int32_t metaGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
|
||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name);
|
||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
|
||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
|
||||
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList);
|
||||
int32_t metaReadNext(SMetaReader *pReader);
|
||||
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
|
||||
|
|
|
@ -1378,7 +1378,7 @@ int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList) {
|
|||
int32_t isLock = false;
|
||||
int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SFilterTableInfo *p = taosArrayGet(uidList, i);
|
||||
STUidTagInfo *p = taosArrayGet(uidList, i);
|
||||
|
||||
if (i % LIMIT == 0) {
|
||||
if (isLock) metaULock(pMeta);
|
||||
|
@ -1404,18 +1404,18 @@ int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
|
||||
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *pUidTagInfo) {
|
||||
SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid, 1);
|
||||
|
||||
// If len > 0 means there already have uids, and we only want the
|
||||
// tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept
|
||||
// in the hash map, that may require a lot of memory
|
||||
SHashObj *pSepecifiedUidMap = NULL;
|
||||
size_t len = taosArrayGetSize(uidList);
|
||||
if (len > 0) {
|
||||
pSepecifiedUidMap = taosHashInit(len / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
for (int i = 0; i < len; i++) {
|
||||
int64_t *uid = taosArrayGet(uidList, i);
|
||||
size_t numOfElems = taosArrayGetSize(pUidTagInfo);
|
||||
if (numOfElems > 0) {
|
||||
pSepecifiedUidMap = taosHashInit(numOfElems / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
for (int i = 0; i < numOfElems; i++) {
|
||||
int64_t *uid = taosArrayGet(pUidTagInfo, i);
|
||||
taosHashPut(pSepecifiedUidMap, uid, sizeof(int64_t), 0, 0);
|
||||
}
|
||||
}
|
||||
|
@ -1426,13 +1426,15 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj
|
|||
break;
|
||||
}
|
||||
|
||||
if (len > 0 && taosHashGet(pSepecifiedUidMap, &uid, sizeof(int64_t)) == NULL) {
|
||||
if (numOfElems > 0 && taosHashGet(pSepecifiedUidMap, &uid, sizeof(int64_t)) == NULL) {
|
||||
continue;
|
||||
} else if (len == 0) {
|
||||
taosArrayPush(uidList, &uid);
|
||||
}
|
||||
} else if (numOfElems == 0) {
|
||||
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
|
||||
info.pTagVal = taosMemoryMalloc(pCur->vLen);
|
||||
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
|
||||
|
||||
taosHashPut(tags, &uid, sizeof(uint64_t), pCur->pVal, pCur->vLen);
|
||||
taosArrayPush(pUidTagInfo, &info);
|
||||
}
|
||||
}
|
||||
|
||||
taosHashCleanup(pSepecifiedUidMap);
|
||||
|
|
|
@ -44,10 +44,11 @@ typedef struct tagFilterAssist {
|
|||
} tagFilterAssist;
|
||||
|
||||
static int32_t removeInvalidUid(SArray* uids, SHashObj* tags);
|
||||
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* pRes, SNode* pTagCond, SHashObj* tags);
|
||||
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* pRes, SNode* pTagCond);
|
||||
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* pExistedUidList, SNode* pTagCond);
|
||||
static int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond,
|
||||
SNode* pTagIndexCond, STableListInfo* pListInfo);
|
||||
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList);
|
||||
|
||||
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
|
||||
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
|
||||
|
@ -393,7 +394,7 @@ static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarPara
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray* uidList, SNode* pTagCond) {
|
||||
static void getColInfoResult(void* metaHandle, int64_t suid, SArray* pUidList, SNode* pTagCond) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* pBlockList = NULL;
|
||||
SSDataBlock* pResBlock = NULL;
|
||||
|
@ -401,7 +402,6 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
SScalarParam output = {0};
|
||||
|
||||
tagFilterAssist ctx = {0};
|
||||
|
||||
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||
if (ctx.colHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -419,10 +419,10 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||
|
||||
// int64_t stt = taosGetTimestampUs();
|
||||
SArray* pRes = taosArrayInit(10, sizeof(SFilterTableInfo));
|
||||
int32_t filter = optimizeTbnameInCond(metaHandle, suid, pRes, pTagCond, tags);
|
||||
SArray* pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
|
||||
int32_t filter = optimizeTbnameInCond(metaHandle, suid, pUidTagList, pTagCond);
|
||||
if (filter == 0) { // tbname in filter is activated, do nothing and return
|
||||
int32_t numOfRows = taosArrayGetSize(pRes);
|
||||
int32_t numOfRows = taosArrayGetSize(pUidTagList);
|
||||
code = createResultData(&type, numOfRows, &output);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
|
@ -430,20 +430,26 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
goto end;
|
||||
}
|
||||
|
||||
bool* b = (bool*)output.columnData->pData;
|
||||
taosArrayEnsureCap(uidList, numOfRows);
|
||||
|
||||
taosArrayEnsureCap(pUidList, numOfRows);
|
||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||
b[i] = true;
|
||||
SFilterTableInfo* pInfo = taosArrayGet(pRes, i);
|
||||
taosArrayPush(uidList, &pInfo->uid);
|
||||
STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
|
||||
taosArrayPush(pUidList, &pInfo->uid);
|
||||
}
|
||||
|
||||
terrno = 0;
|
||||
goto end;
|
||||
} else {
|
||||
// here we retrieve all tags from the vnode table-meta store
|
||||
code = metaGetTableTags(metaHandle, suid, uidList, tags);
|
||||
int32_t numOfExisted = taosArrayGetSize(pUidList);
|
||||
if (numOfExisted) {
|
||||
for(int32_t i = 0; i < numOfExisted; ++i) {
|
||||
uint64_t* uid = taosArrayGet(pUidList, i);
|
||||
STUidTagInfo info = {.uid = *uid};
|
||||
taosArrayPush(pUidTagList, &info);
|
||||
}
|
||||
}
|
||||
|
||||
code = metaGetTableTags(metaHandle, suid, pUidTagList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), suid);
|
||||
terrno = code;
|
||||
|
@ -451,10 +457,6 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
}
|
||||
}
|
||||
|
||||
if (suid != 0) {
|
||||
// removeInvalidUid(uidList, tags);
|
||||
}
|
||||
|
||||
pResBlock = createDataBlock();
|
||||
if (pResBlock == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -467,12 +469,12 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
blockDataAppendColInfo(pResBlock, &colInfo);
|
||||
}
|
||||
|
||||
int32_t size = taosArrayGetSize(pRes);
|
||||
if (size == 0) {
|
||||
int32_t numOfTables = taosArrayGetSize(pUidTagList);
|
||||
if (numOfTables == 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = blockDataEnsureCapacity(pResBlock, size);
|
||||
code = blockDataEnsureCapacity(pResBlock, numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto end;
|
||||
|
@ -480,8 +482,8 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
|
||||
int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
|
||||
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SFilterTableInfo* p1 = taosArrayGet(pRes, i);
|
||||
for (int32_t i = 0; i < numOfTables; i++) {
|
||||
STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
|
||||
|
||||
for (int32_t j = 0; j < numOfCols; j++) {
|
||||
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
|
||||
|
@ -496,10 +498,14 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
} else {
|
||||
STagVal tagVal = {0};
|
||||
tagVal.cid = pColInfo->info.colId;
|
||||
if (p1->pTagVal == NULL) {
|
||||
colDataAppendNULL(pColInfo, i);
|
||||
}
|
||||
|
||||
const char* p = metaGetTableTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
|
||||
|
||||
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
|
||||
colDataAppend(pColInfo, i, p, true);
|
||||
colDataAppendNULL(pColInfo, i);
|
||||
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
colDataAppend(pColInfo, i, p, false);
|
||||
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||
|
@ -524,22 +530,39 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
|
|||
}
|
||||
}
|
||||
|
||||
pResBlock->info.rows = size;
|
||||
pResBlock->info.rows = numOfTables;
|
||||
|
||||
// int64_t st1 = taosGetTimestampUs();
|
||||
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
|
||||
|
||||
pBlockList = taosArrayInit(2, POINTER_BYTES);
|
||||
taosArrayPush(pBlockList, &pResBlock);
|
||||
|
||||
code = createResultData(&type, numOfTables, &output);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = scalarCalculate(pTagCond, pBlockList, &output);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to calculate scalar, reason:%s", tstrerror(code));
|
||||
terrno = code;
|
||||
goto end;
|
||||
}
|
||||
// int64_t st2 = taosGetTimestampUs();
|
||||
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
|
||||
|
||||
taosArrayClear(pUidList);
|
||||
|
||||
bool* pResult = (bool*)output.columnData->pData;
|
||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
|
||||
qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResult[i]);
|
||||
|
||||
if (pResult[i]) {
|
||||
taosArrayPush(pUidList, &uid);
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
|
||||
end:
|
||||
taosHashCleanup(tags);
|
||||
|
@ -547,7 +570,10 @@ end:
|
|||
taosArrayDestroy(ctx.cInfoList);
|
||||
blockDataDestroy(pResBlock);
|
||||
taosArrayDestroy(pBlockList);
|
||||
return output.columnData;
|
||||
|
||||
colDataDestroy(output.columnData);
|
||||
taosMemoryFreeClear(output.columnData);
|
||||
// return output.columnData;
|
||||
}
|
||||
|
||||
static void releaseColInfoData(void* pCol) {
|
||||
|
@ -604,77 +630,28 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
|||
blockDataAppendColInfo(pResBlock, &colInfo);
|
||||
}
|
||||
|
||||
SArray* pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
|
||||
|
||||
uidList = taosArrayInit(rows, sizeof(uint64_t));
|
||||
for (int32_t i = 0; i < rows; ++i) {
|
||||
STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
|
||||
taosArrayPush(uidList, &pkeyInfo->uid);
|
||||
STUidTagInfo info = {.uid = pkeyInfo->uid};
|
||||
taosArrayPush(pUidTagList, &info);
|
||||
}
|
||||
|
||||
// int64_t stt = taosGetTimestampUs();
|
||||
tags = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
code = metaGetTableTags(metaHandle, pTableListInfo->suid, uidList, tags);
|
||||
code = metaGetTableTags(metaHandle, pTableListInfo->suid, pUidTagList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
// int64_t stt1 = taosGetTimestampUs();
|
||||
// qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt);
|
||||
|
||||
code = blockDataEnsureCapacity(pResBlock, rows);
|
||||
int32_t numOfTables = taosArrayGetSize(pUidTagList);
|
||||
pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, pUidTagList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
// int64_t st = taosGetTimestampUs();
|
||||
for (int32_t i = 0; i < rows; i++) {
|
||||
int64_t* uid = taosArrayGet(uidList, i);
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++) {
|
||||
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
|
||||
|
||||
if (pColInfo->info.colId == -1) { // tbname
|
||||
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
metaGetTableNameByUid(metaHandle, *uid, str);
|
||||
colDataAppend(pColInfo, i, str, false);
|
||||
#if TAG_FILTER_DEBUG
|
||||
qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
|
||||
#endif
|
||||
} else {
|
||||
void* tag = taosHashGet(tags, uid, sizeof(int64_t));
|
||||
ASSERT(tag);
|
||||
|
||||
STagVal tagVal = {0};
|
||||
tagVal.cid = pColInfo->info.colId;
|
||||
const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal);
|
||||
|
||||
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
|
||||
colDataAppend(pColInfo, i, p, true);
|
||||
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
colDataAppend(pColInfo, i, p, false);
|
||||
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||
char* tmp = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
|
||||
varDataSetLen(tmp, tagVal.nData);
|
||||
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
||||
colDataAppend(pColInfo, i, tmp, false);
|
||||
#if TAG_FILTER_DEBUG
|
||||
qDebug("tagfilter varch:%s", tmp + 2);
|
||||
#endif
|
||||
taosMemoryFree(tmp);
|
||||
} else {
|
||||
colDataAppend(pColInfo, i, (const char*)&tagVal.i64, false);
|
||||
#if TAG_FILTER_DEBUG
|
||||
if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
|
||||
qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
|
||||
} else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pResBlock->info.rows = rows;
|
||||
|
||||
// int64_t st1 = taosGetTimestampUs();
|
||||
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
|
||||
|
||||
|
@ -857,8 +834,8 @@ static int tableUidCompare(const void* a, const void* b) {
|
|||
}
|
||||
|
||||
static int32_t filterTableInfoCompare(const void* a, const void* b) {
|
||||
SFilterTableInfo* p1 = (SFilterTableInfo*) a;
|
||||
SFilterTableInfo* p2 = (SFilterTableInfo*) b;
|
||||
STUidTagInfo* p1 = (STUidTagInfo*) a;
|
||||
STUidTagInfo* p2 = (STUidTagInfo*) b;
|
||||
|
||||
if (p1->uid == p2->uid) {
|
||||
return 0;
|
||||
|
@ -867,7 +844,7 @@ static int32_t filterTableInfoCompare(const void* a, const void* b) {
|
|||
return p1->uid < p2->uid? -1:1;
|
||||
}
|
||||
|
||||
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* pRes, SNode* cond, SHashObj* tags) {
|
||||
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* pRes, SNode* cond) {
|
||||
int32_t ret = -1;
|
||||
int32_t ntype = nodeType(cond);
|
||||
|
||||
|
@ -903,12 +880,13 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* pRes
|
|||
|
||||
if (hasTbnameCond) {
|
||||
ret = metaGetTableTagsByUids(metaHandle, suid, pRes);
|
||||
removeInvalidUid(pRes, tags);
|
||||
// removeInvalidUid(pRes, tags);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/*
|
||||
* handle invalid uid
|
||||
*/
|
||||
|
@ -918,10 +896,10 @@ static int32_t removeInvalidUid(SArray* uids, SHashObj* tags) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SArray* validUid = taosArrayInit(size, sizeof(SFilterTableInfo));
|
||||
SArray* validUid = taosArrayInit(size, sizeof(STUidTagInfo));
|
||||
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SFilterTableInfo* p = taosArrayGet(uids, i);
|
||||
STUidTagInfo* p = taosArrayGet(uids, i);
|
||||
if (taosHashGet(tags, &p->uid, sizeof(int64_t)) != NULL) {
|
||||
taosArrayPush(validUid, p);
|
||||
}
|
||||
|
@ -932,6 +910,8 @@ static int32_t removeInvalidUid(SArray* uids, SHashObj* tags) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
// only return uid that does not contained in pExistedUidList
|
||||
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* pExistedUidList, SNode* pTagCond) {
|
||||
if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) {
|
||||
|
@ -961,7 +941,7 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* pExistedUidLis
|
|||
if (numOfExisted > 0) {
|
||||
uHash = taosHashInit(numOfExisted / 0.7, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
for (int i = 0; i < numOfExisted; i++) {
|
||||
SFilterTableInfo* pTInfo = taosArrayGet(pExistedUidList, i);
|
||||
STUidTagInfo* pTInfo = taosArrayGet(pExistedUidList, i);
|
||||
taosHashPut(uHash, &pTInfo->uid, sizeof(uint64_t), &i, sizeof(i));
|
||||
}
|
||||
}
|
||||
|
@ -974,7 +954,7 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* pExistedUidLis
|
|||
ETableType tbType = TSDB_TABLE_MAX;
|
||||
if (metaGetTableTypeByName(metaHandle, name, &tbType) == 0 && tbType == TSDB_CHILD_TABLE) {
|
||||
if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
|
||||
SFilterTableInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
|
||||
STUidTagInfo s = {.uid = uid, .name = name, .pTagVal = NULL};
|
||||
taosArrayPush(pExistedUidList, &s);
|
||||
}
|
||||
} else {
|
||||
|
@ -1012,45 +992,198 @@ static void genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
|
|||
taosMemoryFree(payload);
|
||||
}
|
||||
|
||||
static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pRes, SNode* pTagCond, void* metaHandle) {
|
||||
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList) {
|
||||
SSDataBlock* pResBlock = createDataBlock();
|
||||
if (pResBlock == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pColList); ++i) {
|
||||
SColumnInfoData colInfo = {0};
|
||||
colInfo.info = *(SColumnInfo*)taosArrayGet(pColList, i);
|
||||
blockDataAppendColInfo(pResBlock, &colInfo);
|
||||
}
|
||||
|
||||
int32_t code = blockDataEnsureCapacity(pResBlock, numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pResBlock->info.rows = numOfTables;
|
||||
|
||||
int32_t numOfCols = taosArrayGetSize(pResBlock->pDataBlock);
|
||||
|
||||
for (int32_t i = 0; i < numOfTables; i++) {
|
||||
STUidTagInfo* p1 = taosArrayGet(pUidTagList, i);
|
||||
|
||||
for (int32_t j = 0; j < numOfCols; j++) {
|
||||
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
|
||||
|
||||
if (pColInfo->info.colId == -1) { // tbname
|
||||
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_TO_VARSTR(str, p1->name);
|
||||
colDataAppend(pColInfo, i, str, false);
|
||||
#if TAG_FILTER_DEBUG
|
||||
qDebug("tagfilter uid:%ld, tbname:%s", *uid, str + 2);
|
||||
#endif
|
||||
} else {
|
||||
STagVal tagVal = {0};
|
||||
tagVal.cid = pColInfo->info.colId;
|
||||
if (p1->pTagVal == NULL) {
|
||||
colDataAppendNULL(pColInfo, i);
|
||||
}
|
||||
|
||||
const char* p = metaGetTableTagVal(p1->pTagVal, pColInfo->info.type, &tagVal);
|
||||
|
||||
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
|
||||
colDataAppendNULL(pColInfo, i);
|
||||
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
|
||||
colDataAppend(pColInfo, i, p, false);
|
||||
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||
char* tmp = alloca(tagVal.nData + VARSTR_HEADER_SIZE + 1);
|
||||
varDataSetLen(tmp, tagVal.nData);
|
||||
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
||||
colDataAppend(pColInfo, i, tmp, false);
|
||||
#if TAG_FILTER_DEBUG
|
||||
qDebug("tagfilter varch:%s", tmp + 2);
|
||||
#endif
|
||||
} else {
|
||||
colDataAppend(pColInfo, i, (const char*)&tagVal.i64, false);
|
||||
#if TAG_FILTER_DEBUG
|
||||
if (pColInfo->info.type == TSDB_DATA_TYPE_INT) {
|
||||
qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
|
||||
} else if (pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
qDebug("tagfilter double:%f", *(double*)(&tagVal.i64));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
static void doSetQualifiedUid(SArray* pUidList, const SArray* pUidTagList, bool* pResultList) {
|
||||
taosArrayClear(pUidList);
|
||||
|
||||
int32_t numOfTables = taosArrayGetSize(pUidTagList);
|
||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
||||
uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid;
|
||||
qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]);
|
||||
|
||||
if (pResultList[i]) {
|
||||
taosArrayPush(pUidList, &uid);
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
static void copyExistedUids(SArray* pUidTagList, const SArray* pUidList) {
|
||||
int32_t numOfExisted = taosArrayGetSize(pUidList);
|
||||
if (numOfExisted) {
|
||||
for(int32_t i = 0; i < numOfExisted; ++i) {
|
||||
uint64_t* uid = taosArrayGet(pUidList, i);
|
||||
STUidTagInfo info = {.uid = *uid};
|
||||
taosArrayPush(pUidTagList, &info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* metaHandle) {
|
||||
if (pTagCond == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
terrno = TDB_CODE_SUCCESS;
|
||||
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, pRes, pTagCond);
|
||||
if (terrno != TDB_CODE_SUCCESS) {
|
||||
colDataDestroy(pColInfoData);
|
||||
taosMemoryFreeClear(pColInfoData);
|
||||
taosArrayDestroy(pRes);
|
||||
qError("failed to getColInfoResult, code: %s", tstrerror(terrno));
|
||||
return terrno;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SArray* pBlockList = NULL;
|
||||
SSDataBlock* pResBlock = NULL;
|
||||
SScalarParam output = {0};
|
||||
|
||||
tagFilterAssist ctx = {0};
|
||||
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||
if (ctx.colHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
|
||||
int32_t i = 0;
|
||||
int32_t len = taosArrayGetSize(pRes);
|
||||
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
|
||||
if (ctx.cInfoList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (pColInfoData != NULL) {
|
||||
bool* pResult = (bool*)pColInfoData->pData;
|
||||
SArray* p = taosArrayInit(taosArrayGetSize(pRes), sizeof(uint64_t));
|
||||
nodesRewriteExprPostOrder(&pTagCond, getColumn, (void*)&ctx);
|
||||
|
||||
while (i < len && pColInfoData) {
|
||||
int64_t* uid = taosArrayGet(pRes, i);
|
||||
qDebug("tagfilter get uid:%" PRId64 ", res:%d", *uid, pResult[i]);
|
||||
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||
|
||||
if (pResult[i]) {
|
||||
taosArrayPush(p, uid);
|
||||
}
|
||||
|
||||
i += 1;
|
||||
// int64_t stt = taosGetTimestampUs();
|
||||
SArray* pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
|
||||
int32_t filter = optimizeTbnameInCond(metaHandle, pListInfo->suid, pUidTagList, pTagCond);
|
||||
if (filter == 0) { // tbname in filter is activated, do nothing and return
|
||||
int32_t numOfRows = taosArrayGetSize(pUidTagList);
|
||||
taosArrayEnsureCap(pUidList, numOfRows);
|
||||
for(int32_t i = 0; i < numOfRows; ++i) {
|
||||
STUidTagInfo* pInfo = taosArrayGet(pUidTagList, i);
|
||||
taosArrayPush(pUidList, &pInfo->uid);
|
||||
}
|
||||
|
||||
taosArraySwap(pRes, p);
|
||||
taosArrayDestroy(p);
|
||||
terrno = 0;
|
||||
goto end;
|
||||
} else {
|
||||
// here we retrieve all tags from the vnode table-meta store
|
||||
copyExistedUids(pUidTagList, pUidList);
|
||||
code = metaGetTableTags(metaHandle, pListInfo->suid, pUidTagList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->suid);
|
||||
terrno = code;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
colDataDestroy(pColInfoData);
|
||||
taosMemoryFreeClear(pColInfoData);
|
||||
int32_t numOfTables = taosArrayGetSize(pUidTagList);
|
||||
if (numOfTables == 0) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, pUidTagList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
// int64_t st1 = taosGetTimestampUs();
|
||||
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
|
||||
pBlockList = taosArrayInit(2, POINTER_BYTES);
|
||||
taosArrayPush(pBlockList, &pResBlock);
|
||||
|
||||
code = createResultData(&type, numOfTables, &output);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = scalarCalculate(pTagCond, pBlockList, &output);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to calculate scalar, reason:%s", tstrerror(code));
|
||||
terrno = code;
|
||||
goto end;
|
||||
}
|
||||
|
||||
doSetQualifiedUid(pUidList, pUidTagList, (bool*) output.columnData->pData);
|
||||
|
||||
end:
|
||||
taosHashCleanup(ctx.colHash);
|
||||
taosArrayDestroy(ctx.cInfoList);
|
||||
blockDataDestroy(pResBlock);
|
||||
taosArrayDestroy(pBlockList);
|
||||
|
||||
colDataDestroy(output.columnData);
|
||||
taosMemoryFreeClear(output.columnData);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1062,36 +1195,37 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
|
||||
uint64_t tableUid = pScanNode->uid;
|
||||
pListInfo->suid = pScanNode->suid;
|
||||
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
||||
|
||||
SArray* pRes = taosArrayInit(8, sizeof(uint64_t));
|
||||
|
||||
if (pScanNode->tableType != TSDB_SUPER_TABLE) {
|
||||
if (metaIsTableExist(metaHandle, tableUid)) {
|
||||
taosArrayPush(res, &tableUid);
|
||||
taosArrayPush(pRes, &tableUid);
|
||||
}
|
||||
|
||||
code = doFilterByTagCond(pListInfo, res, pTagCond, metaHandle);
|
||||
code = doFilterByTagCond(pListInfo, pRes, pTagCond, metaHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
|
||||
T_MD5_CTX context = {0};
|
||||
|
||||
if (tsTagFilterCache) {
|
||||
// try to retrieve the result from meta cache
|
||||
genTagFilterDigest(pTagCond, &context);
|
||||
SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
|
||||
|
||||
bool acquired = false;
|
||||
metaGetCachedTableUidList(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), res, &acquired);
|
||||
metaGetCachedTableUidList(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pUidList, &acquired);
|
||||
if (acquired) {
|
||||
qDebug("retrieve table uid list from cache, numOfTables:%d", (int32_t)taosArrayGetSize(res));
|
||||
qDebug("retrieve table uid list from cache, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
||||
if (!pTagCond) { // no tag filter condition exists, let's fetch all tables of this super table
|
||||
ASSERT(pTagIndexCond == NULL);
|
||||
vnodeGetCtbIdList(pVnode, pScanNode->suid, res);
|
||||
vnodeGetCtbIdList(pVnode, pScanNode->suid, pRes);
|
||||
} else {
|
||||
// failed to find the result in the cache, let try to calculate the results
|
||||
if (pTagIndexCond) {
|
||||
|
@ -1099,7 +1233,7 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid};
|
||||
|
||||
SIdxFltStatus status = SFLT_NOT_INDEX;
|
||||
code = doFilterTag(pTagIndexCond, &metaArg, res, &status);
|
||||
code = doFilterTag(pTagIndexCond, &metaArg, pRes, &status);
|
||||
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
|
||||
// qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
|
||||
code = TDB_CODE_SUCCESS;
|
||||
|
@ -1107,43 +1241,44 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
}
|
||||
}
|
||||
|
||||
code = doFilterByTagCond(pListInfo, res, pTagCond, metaHandle);
|
||||
code = doFilterByTagCond(pListInfo, pRes, pTagCond, metaHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// let's add the filter results into meta-cache
|
||||
numOfTables = taosArrayGetSize(res);
|
||||
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
|
||||
char* pPayload = taosMemoryMalloc(size);
|
||||
*(int32_t*)pPayload = numOfTables;
|
||||
|
||||
if (numOfTables > 0) {
|
||||
memcpy(pPayload + sizeof(int32_t), taosArrayGet(res, 0), numOfTables * sizeof(uint64_t));
|
||||
}
|
||||
numOfTables = taosArrayGetSize(pRes);
|
||||
|
||||
if (tsTagFilterCache) {
|
||||
metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1);
|
||||
}
|
||||
size_t size = numOfTables * sizeof(uint64_t) + sizeof(int32_t);
|
||||
char* pPayload = taosMemoryMalloc(size);
|
||||
|
||||
taosMemoryFree(pPayload);
|
||||
// todo convert to uid list
|
||||
if (numOfTables > 0) {
|
||||
*(int32_t*)pPayload = numOfTables;
|
||||
memcpy(pPayload + sizeof(int32_t), taosArrayGet(pRes, 0), numOfTables * sizeof(uint64_t));
|
||||
}
|
||||
|
||||
metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1);
|
||||
taosMemoryFree(pPayload);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
numOfTables = taosArrayGetSize(res);
|
||||
numOfTables = taosArrayGetSize(pRes);
|
||||
for (int i = 0; i < numOfTables; i++) {
|
||||
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
|
||||
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(pRes, i), .groupId = 0};
|
||||
|
||||
void* p = taosArrayPush(pListInfo->pTableList, &info);
|
||||
if (p == NULL) {
|
||||
taosArrayDestroy(res);
|
||||
taosArrayDestroy(pRes);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
qTrace("tagfilter get uid:%" PRIu64 "", info.uid);
|
||||
}
|
||||
|
||||
taosArrayDestroy(res);
|
||||
taosArrayDestroy(pRes);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue