Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last
This commit is contained in:
commit
c7d492dae1
|
@ -1343,12 +1343,14 @@ SSDataBlock* createDataBlock() {
|
||||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
|
||||||
if (pBlock->pDataBlock == NULL) {
|
if (pBlock->pDataBlock == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFree(pBlock);
|
taosMemoryFree(pBlock);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
|
@ -1423,6 +1425,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void colDataDestroy(SColumnInfoData* pColData) {
|
void colDataDestroy(SColumnInfoData* pColData) {
|
||||||
|
if(!pColData) return;
|
||||||
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
|
||||||
taosMemoryFreeClear(pColData->varmeta.offset);
|
taosMemoryFreeClear(pColData->varmeta.offset);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -91,8 +91,9 @@ typedef struct SMetaEntry SMetaEntry;
|
||||||
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||||
void metaReaderClear(SMetaReader *pReader);
|
void metaReaderClear(SMetaReader *pReader);
|
||||||
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||||
|
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags);
|
||||||
int32_t metaReadNext(SMetaReader *pReader);
|
int32_t metaReadNext(SMetaReader *pReader);
|
||||||
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *tagVal);
|
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
|
||||||
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
|
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
|
||||||
|
|
||||||
typedef struct SMetaFltParam {
|
typedef struct SMetaFltParam {
|
||||||
|
|
|
@ -87,7 +87,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// open pCtbIdx
|
// open pCtbIdx
|
||||||
ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
|
ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("vgId:%d, failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno));
|
metaError("vgId:%d, failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -53,6 +53,80 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// int metaGetTableEntryByUidTest(void* meta, SArray *uidList) {
|
||||||
|
//
|
||||||
|
// SArray* readerList = taosArrayInit(taosArrayGetSize(uidList), sizeof(SMetaReader));
|
||||||
|
// SArray* uidVersion = taosArrayInit(taosArrayGetSize(uidList), sizeof(STbDbKey));
|
||||||
|
// SMeta *pMeta = meta;
|
||||||
|
// int64_t version;
|
||||||
|
// SHashObj *uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
//
|
||||||
|
// int64_t stt1 = taosGetTimestampUs();
|
||||||
|
// for(int i = 0; i < taosArrayGetSize(uidList); i++) {
|
||||||
|
// void* ppVal = NULL;
|
||||||
|
// int vlen = 0;
|
||||||
|
// uint64_t * uid = taosArrayGet(uidList, i);
|
||||||
|
// // query uid.idx
|
||||||
|
// if (tdbTbGet(pMeta->pUidIdx, uid, sizeof(*uid), &ppVal, &vlen) < 0) {
|
||||||
|
// continue;
|
||||||
|
// }
|
||||||
|
// version = *(int64_t *)ppVal;
|
||||||
|
//
|
||||||
|
// STbDbKey tbDbKey = {.version = version, .uid = *uid};
|
||||||
|
// taosArrayPush(uidVersion, &tbDbKey);
|
||||||
|
// taosHashPut(uHash, uid, sizeof(int64_t), ppVal, sizeof(int64_t));
|
||||||
|
// }
|
||||||
|
// int64_t stt2 = taosGetTimestampUs();
|
||||||
|
// qDebug("metaGetTableEntryByUidTest1 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt2-stt1);
|
||||||
|
//
|
||||||
|
// TBC *pCur = NULL;
|
||||||
|
// tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
|
||||||
|
// tdbTbcMoveToFirst(pCur);
|
||||||
|
// void *pKey = NULL;
|
||||||
|
// int kLen = 0;
|
||||||
|
//
|
||||||
|
// while(1){
|
||||||
|
// SMetaReader pReader = {0};
|
||||||
|
// int32_t ret = tdbTbcNext(pCur, &pKey, &kLen, &pReader.pBuf, &pReader.szBuf);
|
||||||
|
// if (ret < 0) break;
|
||||||
|
// STbDbKey *tmp = (STbDbKey*)pKey;
|
||||||
|
// int64_t *ver = (int64_t*)taosHashGet(uHash, &tmp->uid, sizeof(int64_t));
|
||||||
|
// if(ver == NULL || *ver != tmp->version) continue;
|
||||||
|
// taosArrayPush(readerList, &pReader);
|
||||||
|
// }
|
||||||
|
// tdbTbcClose(pCur);
|
||||||
|
//
|
||||||
|
// taosArrayClear(readerList);
|
||||||
|
// int64_t stt3 = taosGetTimestampUs();
|
||||||
|
// qDebug("metaGetTableEntryByUidTest2 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt3-stt2);
|
||||||
|
// for(int i = 0; i < taosArrayGetSize(uidVersion); i++) {
|
||||||
|
// SMetaReader pReader = {0};
|
||||||
|
//
|
||||||
|
// STbDbKey *tbDbKey = taosArrayGet(uidVersion, i);
|
||||||
|
// // query table.db
|
||||||
|
// if (tdbTbGet(pMeta->pTbDb, tbDbKey, sizeof(STbDbKey), &pReader.pBuf, &pReader.szBuf) < 0) {
|
||||||
|
// continue;
|
||||||
|
// }
|
||||||
|
// taosArrayPush(readerList, &pReader);
|
||||||
|
// }
|
||||||
|
// int64_t stt4 = taosGetTimestampUs();
|
||||||
|
// qDebug("metaGetTableEntryByUidTest3 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt4-stt3);
|
||||||
|
//
|
||||||
|
// for(int i = 0; i < taosArrayGetSize(readerList); i++){
|
||||||
|
// SMetaReader* pReader = taosArrayGet(readerList, i);
|
||||||
|
// metaReaderInit(pReader, meta, 0);
|
||||||
|
// // decode the entry
|
||||||
|
// tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
|
||||||
|
//
|
||||||
|
// if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
|
||||||
|
// }
|
||||||
|
// metaReaderClear(pReader);
|
||||||
|
// }
|
||||||
|
// int64_t stt5 = taosGetTimestampUs();
|
||||||
|
// qDebug("metaGetTableEntryByUidTest4 rows:%d, cost:%ld us", taosArrayGetSize(readerList), stt5-stt4);
|
||||||
|
// return 0;
|
||||||
|
// }
|
||||||
|
|
||||||
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||||
SMeta *pMeta = pReader->pMeta;
|
SMeta *pMeta = pReader->pMeta;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
|
@ -794,9 +868,8 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) {
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
|
const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
|
||||||
ASSERT(pEntry->type == TSDB_CHILD_TABLE);
|
STag *tag = (STag *)pTag;
|
||||||
STag *tag = (STag *)pEntry->ctbEntry.pTags;
|
|
||||||
if (type == TSDB_DATA_TYPE_JSON) {
|
if (type == TSDB_DATA_TYPE_JSON) {
|
||||||
return tag;
|
return tag;
|
||||||
}
|
}
|
||||||
|
@ -898,6 +971,9 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (p->suid != pKey->suid) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
first = false;
|
first = false;
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
|
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
|
||||||
|
@ -934,6 +1010,38 @@ END:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
|
||||||
|
SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid);
|
||||||
|
|
||||||
|
SHashObj *uHash = NULL;
|
||||||
|
size_t len = taosArrayGetSize(uidList); // len > 0 means there already have uids
|
||||||
|
if (len > 0) {
|
||||||
|
uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
for (int i = 0; i < len; i++) {
|
||||||
|
int64_t *uid = taosArrayGet(uidList, i);
|
||||||
|
taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (1) {
|
||||||
|
tb_uid_t id = metaCtbCursorNext(pCur);
|
||||||
|
if (id == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len > 0 && taosHashGet(uHash, &id, sizeof(int64_t)) == NULL) {
|
||||||
|
continue;
|
||||||
|
} else if (len == 0) {
|
||||||
|
taosArrayPush(uidList, &id);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashPut(tags, &id, sizeof(int64_t), pCur->pVal, pCur->vLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashCleanup(uHash);
|
||||||
|
metaCloseCtbCursor(pCur);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
|
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
void *pData = NULL;
|
void *pData = NULL;
|
||||||
|
|
|
@ -883,6 +883,9 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
||||||
metaUpdateTagIdx(pMeta, &ctbEntry);
|
metaUpdateTagIdx(pMeta, &ctbEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid};
|
||||||
|
tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags, ((STag*)(ctbEntry.ctbEntry.pTags))->len, &pMeta->txn);
|
||||||
|
|
||||||
tDecoderClear(&dc1);
|
tDecoderClear(&dc1);
|
||||||
tDecoderClear(&dc2);
|
tDecoderClear(&dc2);
|
||||||
if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
|
if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
|
||||||
|
@ -1087,7 +1090,8 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
|
|
||||||
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
|
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
|
||||||
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
|
|
||||||
|
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), pME->ctbEntry.pTags, ((STag*)(pME->ctbEntry.pTags))->len, &pMeta->txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
|
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
|
||||||
|
|
|
@ -221,7 +221,7 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
||||||
|
|
||||||
STagVal tagVal = {0};
|
STagVal tagVal = {0};
|
||||||
tagVal.cid = pSColumnNode->colId;
|
tagVal.cid = pSColumnNode->colId;
|
||||||
const char* p = metaGetTableTagVal(&mr->me, pSColumnNode->node.resType.type, &tagVal);
|
const char* p = metaGetTableTagVal(mr->me.ctbEntry.pTags, pSColumnNode->node.resType.type, &tagVal);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
res->node.resType.type = TSDB_DATA_TYPE_NULL;
|
res->node.resType.type = TSDB_DATA_TYPE_NULL;
|
||||||
} else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
} else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
@ -298,6 +298,209 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct tagFilterAssist{
|
||||||
|
SHashObj *colHash;
|
||||||
|
int32_t index;
|
||||||
|
SArray *cInfoList;
|
||||||
|
}tagFilterAssist;
|
||||||
|
|
||||||
|
static EDealRes getColumn(SNode** pNode, void* pContext) {
|
||||||
|
SColumnNode* pSColumnNode = NULL;
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
|
||||||
|
pSColumnNode = *(SColumnNode**)pNode;
|
||||||
|
}else if(QUERY_NODE_FUNCTION == nodeType((*pNode))){
|
||||||
|
SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
|
||||||
|
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
|
||||||
|
pSColumnNode = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||||
|
if (NULL == pSColumnNode) {
|
||||||
|
return DEAL_RES_ERROR;
|
||||||
|
}
|
||||||
|
pSColumnNode->colId = -1;
|
||||||
|
pSColumnNode->colType = COLUMN_TYPE_TBNAME;
|
||||||
|
pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
|
||||||
|
pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
|
||||||
|
nodesDestroyNode(*pNode);
|
||||||
|
*pNode = (SNode*)pSColumnNode;
|
||||||
|
}else{
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
tagFilterAssist *pData = (tagFilterAssist *)pContext;
|
||||||
|
void *data = taosHashGet(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
|
||||||
|
if(!data){
|
||||||
|
taosHashPut(pData->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
|
||||||
|
pSColumnNode->slotId = pData->index++;
|
||||||
|
SColumnInfo cInfo = {.colId = pSColumnNode->colId, .type = pSColumnNode->node.resType.type, .bytes = pSColumnNode->node.resType.bytes};
|
||||||
|
#if TAG_FILTER_DEBUG
|
||||||
|
qDebug("tagfilter build column info, slotId:%d, colId:%d, type:%d", pSColumnNode->slotId, cInfo.colId, cInfo.type);
|
||||||
|
#endif
|
||||||
|
taosArrayPush(pData->cInfoList, &cInfo);
|
||||||
|
}else{
|
||||||
|
SColumnNode* col = *(SColumnNode**)data;
|
||||||
|
pSColumnNode->slotId = col->slotId;
|
||||||
|
}
|
||||||
|
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
|
||||||
|
SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
||||||
|
if (pColumnData == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pColumnData->info.type = pType->type;
|
||||||
|
pColumnData->info.bytes = pType->bytes;
|
||||||
|
pColumnData->info.scale = pType->scale;
|
||||||
|
pColumnData->info.precision = pType->precision;
|
||||||
|
|
||||||
|
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosMemoryFree(pColumnData);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pParam->columnData = pColumnData;
|
||||||
|
pParam->colAlloced = true;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray* uidList, SNode* pTagCond){
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SArray* pBlockList = NULL;
|
||||||
|
SSDataBlock* pResBlock = NULL;
|
||||||
|
SHashObj * tags = NULL;
|
||||||
|
SScalarParam output = {0};
|
||||||
|
|
||||||
|
tagFilterAssist ctx = {0};
|
||||||
|
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||||
|
if(ctx.colHash == NULL){
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
ctx.index = 0;
|
||||||
|
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
|
||||||
|
if(ctx.cInfoList == NULL){
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesRewriteExprPostOrder(&pTagCond, getColumn, (void *)&ctx);
|
||||||
|
|
||||||
|
pResBlock = createDataBlock();
|
||||||
|
if (pResBlock == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(ctx.cInfoList); ++i) {
|
||||||
|
SColumnInfoData colInfo = {{0}, 0};
|
||||||
|
colInfo.info = *(SColumnInfo*)taosArrayGet(ctx.cInfoList, i);
|
||||||
|
blockDataAppendColInfo(pResBlock, &colInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
// int64_t stt = taosGetTimestampUs();
|
||||||
|
tags = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
code = metaGetTableTags(metaHandle, suid, uidList, tags);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t rows = taosArrayGetSize(uidList);
|
||||||
|
if(rows == 0){
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
// int64_t stt1 = taosGetTimestampUs();
|
||||||
|
// qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt);
|
||||||
|
|
||||||
|
code = blockDataEnsureCapacity(pResBlock, rows);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
// int64_t st = taosGetTimestampUs();
|
||||||
|
for (int32_t i = 0; i < rows; i++) {
|
||||||
|
int64_t* uid = taosArrayGet(uidList, i);
|
||||||
|
void* tag = taosHashGet(tags, uid, sizeof(int64_t));
|
||||||
|
ASSERT(tag);
|
||||||
|
for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){
|
||||||
|
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
|
||||||
|
|
||||||
|
if(pColInfo->info.colId == -1){ // tbname
|
||||||
|
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
metaGetTableNameByUid(metaHandle, *uid, str);
|
||||||
|
colDataAppend(pColInfo, i, str, false);
|
||||||
|
#if TAG_FILTER_DEBUG
|
||||||
|
qDebug("tagfilter uid:%ld, tbname:%s", *uid, str+2);
|
||||||
|
#endif
|
||||||
|
}else{
|
||||||
|
STagVal tagVal = {0};
|
||||||
|
tagVal.cid = pColInfo->info.colId;
|
||||||
|
const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal);
|
||||||
|
|
||||||
|
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)){
|
||||||
|
colDataAppend(pColInfo, i, p, true);
|
||||||
|
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colDataAppend(pColInfo, i, p, false);
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||||
|
char *tmp = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
|
||||||
|
varDataSetLen(tmp, tagVal.nData);
|
||||||
|
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
||||||
|
colDataAppend(pColInfo, i, tmp, false);
|
||||||
|
#if TAG_FILTER_DEBUG
|
||||||
|
qDebug("tagfilter varch:%s", tmp+2);
|
||||||
|
#endif
|
||||||
|
taosMemoryFree(tmp);
|
||||||
|
} else {
|
||||||
|
colDataAppend(pColInfo, i, (const char*)&tagVal.i64, false);
|
||||||
|
#if TAG_FILTER_DEBUG
|
||||||
|
if(pColInfo->info.type == TSDB_DATA_TYPE_INT){
|
||||||
|
qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
|
||||||
|
}else if(pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE){
|
||||||
|
qDebug("tagfilter double:%f", *(double *)(&tagVal.i64));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pResBlock->info.rows = rows;
|
||||||
|
|
||||||
|
// int64_t st1 = taosGetTimestampUs();
|
||||||
|
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
|
||||||
|
|
||||||
|
pBlockList = taosArrayInit(2, POINTER_BYTES);
|
||||||
|
taosArrayPush(pBlockList, &pResBlock);
|
||||||
|
|
||||||
|
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||||
|
code = createResultData(&type, rows, &output);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = scalarCalculate(pTagCond, pBlockList, &output);
|
||||||
|
if(code != TSDB_CODE_SUCCESS){
|
||||||
|
terrno = code;
|
||||||
|
}
|
||||||
|
// int64_t st2 = taosGetTimestampUs();
|
||||||
|
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
|
||||||
|
|
||||||
|
end:
|
||||||
|
taosHashCleanup(tags);
|
||||||
|
taosHashCleanup(ctx.colHash);
|
||||||
|
taosArrayDestroy(ctx.cInfoList);
|
||||||
|
blockDataDestroy(pResBlock);
|
||||||
|
taosArrayDestroy(pBlockList);
|
||||||
|
return output.columnData;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
||||||
STableListInfo* pListInfo) {
|
STableListInfo* pListInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -308,62 +511,67 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t tableUid = pScanNode->uid;
|
uint64_t tableUid = pScanNode->uid;
|
||||||
|
|
||||||
pListInfo->suid = pScanNode->suid;
|
pListInfo->suid = pScanNode->suid;
|
||||||
|
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
||||||
|
|
||||||
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
|
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
|
||||||
if (pTagIndexCond) {
|
if (pTagIndexCond) {
|
||||||
SIndexMetaArg metaArg = {
|
SIndexMetaArg metaArg = {
|
||||||
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid};
|
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid};
|
||||||
|
|
||||||
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
// int64_t stt = taosGetTimestampUs();
|
||||||
SIdxFltStatus status = SFLT_NOT_INDEX;
|
SIdxFltStatus status = SFLT_NOT_INDEX;
|
||||||
code = doFilterTag(pTagIndexCond, &metaArg, res, &status);
|
code = doFilterTag(pTagIndexCond, &metaArg, res, &status);
|
||||||
if (code != 0 || status == SFLT_NOT_INDEX) {
|
if (code != 0 || status == SFLT_NOT_INDEX) {
|
||||||
qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
|
qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
|
||||||
// code = TSDB_CODE_INDEX_REBUILDING;
|
code = TDB_CODE_SUCCESS;
|
||||||
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
|
}
|
||||||
} else {
|
|
||||||
qDebug("success to get tableIds, size:%d, suid:%" PRIu64, (int)taosArrayGetSize(res), tableUid);
|
// int64_t stt1 = taosGetTimestampUs();
|
||||||
|
// qDebug("generate table list, cost:%ld us", stt1-stt);
|
||||||
|
}else if(!pTagCond){
|
||||||
|
vnodeGetCtbIdList(pVnode, pScanNode->suid, res);
|
||||||
|
}
|
||||||
|
} else { // Create one table group.
|
||||||
|
taosArrayPush(res, &tableUid);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTagCond) {
|
||||||
|
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond);
|
||||||
|
if(terrno != TDB_CODE_SUCCESS){
|
||||||
|
colDataDestroy(pColInfoData);
|
||||||
|
taosMemoryFreeClear(pColInfoData);
|
||||||
|
taosArrayDestroy(res);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
|
int32_t j = 0;
|
||||||
|
int32_t len = taosArrayGetSize(res);
|
||||||
|
while (i < taosArrayGetSize(res) && j < len && pColInfoData) {
|
||||||
|
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
|
||||||
|
|
||||||
|
int64_t* uid = taosArrayGet(res, i);
|
||||||
|
qDebug("tagfilter get uid:%ld, res:%d", *uid, *(bool*)var);
|
||||||
|
if (*(bool*)var == false) {
|
||||||
|
taosArrayRemove(res, i);
|
||||||
|
j++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
j++;
|
||||||
|
}
|
||||||
|
colDataDestroy(pColInfoData);
|
||||||
|
taosMemoryFreeClear(pColInfoData);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
||||||
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
|
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
|
||||||
taosArrayPush(pListInfo->pTableList, &info);
|
taosArrayPush(pListInfo->pTableList, &info);
|
||||||
|
qDebug("tagfilter get uid:%ld", info.uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(res);
|
taosArrayDestroy(res);
|
||||||
} else {
|
|
||||||
code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
qError("failed to get tableIds, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
|
|
||||||
terrno = code;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
} else { // Create one table group.
|
|
||||||
STableKeyInfo info = {.uid = tableUid, .groupId = 0};
|
|
||||||
taosArrayPush(pListInfo->pTableList, &info);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTagCond) {
|
|
||||||
int32_t i = 0;
|
|
||||||
while (i < taosArrayGetSize(pListInfo->pTableList)) {
|
|
||||||
STableKeyInfo* info = taosArrayGet(pListInfo->pTableList, i);
|
|
||||||
|
|
||||||
bool qualified = true;
|
|
||||||
code = isQualifiedTable(info, pTagCond, metaHandle, &qualified);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!qualified) {
|
|
||||||
taosArrayRemove(pListInfo->pTableList, i);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
|
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
|
||||||
if (pListInfo->pGroupList == NULL) {
|
if (pListInfo->pGroupList == NULL) {
|
||||||
|
|
|
@ -440,7 +440,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
||||||
} else { // these are tags
|
} else { // these are tags
|
||||||
STagVal tagVal = {0};
|
STagVal tagVal = {0};
|
||||||
tagVal.cid = pExpr->base.pParam[0].pCol->colId;
|
tagVal.cid = pExpr->base.pParam[0].pCol->colId;
|
||||||
const char* p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal);
|
const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal);
|
||||||
|
|
||||||
char* data = NULL;
|
char* data = NULL;
|
||||||
if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
|
if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
|
||||||
|
@ -2506,7 +2506,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
} else { // it is a tag value
|
} else { // it is a tag value
|
||||||
STagVal val = {0};
|
STagVal val = {0};
|
||||||
val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
|
val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
|
||||||
const char* p = metaGetTableTagVal(&mr.me, pDst->info.type, &val);
|
const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
|
||||||
|
|
||||||
char* data = NULL;
|
char* data = NULL;
|
||||||
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
|
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
|
||||||
|
|
|
@ -292,6 +292,9 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
|
SColumnInfoData *columnData = (SColumnInfoData *)taosArrayGet(block->pDataBlock, ref->slotId);
|
||||||
|
#if TAG_FILTER_DEBUG
|
||||||
|
qDebug("tagfilter column info, slotId:%d, colId:%d, type:%d", ref->slotId, columnData->info.colId, columnData->info.type);
|
||||||
|
#endif
|
||||||
param->numOfRows = block->info.rows;
|
param->numOfRows = block->info.rows;
|
||||||
param->columnData = columnData;
|
param->columnData = columnData;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -386,6 +386,7 @@ void* taosArrayDestroy(SArray* pArray) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArrayDestroyP(SArray* pArray, FDelete fp) {
|
void taosArrayDestroyP(SArray* pArray, FDelete fp) {
|
||||||
|
if(!pArray) return;
|
||||||
for (int32_t i = 0; i < pArray->size; i++) {
|
for (int32_t i = 0; i < pArray->size; i++) {
|
||||||
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
|
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
|
||||||
}
|
}
|
||||||
|
|
|
@ -197,7 +197,7 @@ class TDTestCase:
|
||||||
|
|
||||||
# test where with json tag
|
# test where with json tag
|
||||||
tdSql.query(f"select * from {dbname}.jsons1_1 where jtag is not null")
|
tdSql.query(f"select * from {dbname}.jsons1_1 where jtag is not null")
|
||||||
tdSql.query(f"select * from {dbname}.jsons1 where jtag='{{\"tag1\":11,\"tag2\":\"\"}}'")
|
tdSql.error(f"select * from {dbname}.jsons1 where jtag='{{\"tag1\":11,\"tag2\":\"\"}}'")
|
||||||
tdSql.error(f"select * from {dbname}.jsons1 where jtag->'tag1'={{}}")
|
tdSql.error(f"select * from {dbname}.jsons1 where jtag->'tag1'={{}}")
|
||||||
|
|
||||||
# test json error
|
# test json error
|
||||||
|
|
Loading…
Reference in New Issue