refactor json index
This commit is contained in:
parent
16f8450bbc
commit
0ca6a5ae27
|
@ -146,6 +146,6 @@ option(
|
||||||
option(
|
option(
|
||||||
BUILD_WITH_INVERTEDINDEX
|
BUILD_WITH_INVERTEDINDEX
|
||||||
"If use invertedIndex"
|
"If use invertedIndex"
|
||||||
OFF
|
ON
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -193,8 +193,9 @@ void indexInit();
|
||||||
|
|
||||||
/* index filter */
|
/* index filter */
|
||||||
typedef struct SIndexMetaArg {
|
typedef struct SIndexMetaArg {
|
||||||
void* metaHandle;
|
|
||||||
void* metaEx;
|
void* metaEx;
|
||||||
|
void* idx;
|
||||||
|
void* ivtIdx;
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
} SIndexMetaArg;
|
} SIndexMetaArg;
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,7 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||||
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg);
|
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
|
@ -86,7 +86,7 @@ typedef struct SMetaFltParam {
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
int16_t cid;
|
int16_t cid;
|
||||||
int16_t type;
|
int16_t type;
|
||||||
char *val;
|
char * val;
|
||||||
bool reverse;
|
bool reverse;
|
||||||
int (*filterFunc)(void *a, void *b, int16_t type);
|
int (*filterFunc)(void *a, void *b, int16_t type);
|
||||||
|
|
||||||
|
@ -121,7 +121,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo
|
||||||
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
||||||
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
|
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
|
||||||
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
|
int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list);
|
||||||
void *tsdbGetIdx(SMeta *pMeta);
|
void * tsdbGetIdx(SMeta *pMeta);
|
||||||
|
void * tsdbGetIvtIdx(SMeta *pMeta);
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
|
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
|
||||||
|
|
||||||
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
||||||
|
@ -197,7 +198,7 @@ struct SMetaEntry {
|
||||||
int64_t version;
|
int64_t version;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
char *name;
|
char * name;
|
||||||
union {
|
union {
|
||||||
struct {
|
struct {
|
||||||
SSchemaWrapper schemaRow;
|
SSchemaWrapper schemaRow;
|
||||||
|
@ -225,17 +226,17 @@ struct SMetaEntry {
|
||||||
|
|
||||||
struct SMetaReader {
|
struct SMetaReader {
|
||||||
int32_t flags;
|
int32_t flags;
|
||||||
SMeta *pMeta;
|
SMeta * pMeta;
|
||||||
SDecoder coder;
|
SDecoder coder;
|
||||||
SMetaEntry me;
|
SMetaEntry me;
|
||||||
void *pBuf;
|
void * pBuf;
|
||||||
int32_t szBuf;
|
int32_t szBuf;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SMTbCursor {
|
struct SMTbCursor {
|
||||||
TBC *pDbc;
|
TBC * pDbc;
|
||||||
void *pKey;
|
void * pKey;
|
||||||
void *pVal;
|
void * pVal;
|
||||||
int32_t kLen;
|
int32_t kLen;
|
||||||
int32_t vLen;
|
int32_t vLen;
|
||||||
SMetaReader mr;
|
SMetaReader mr;
|
||||||
|
|
|
@ -69,12 +69,11 @@ struct SMeta {
|
||||||
TTB* pUidIdx;
|
TTB* pUidIdx;
|
||||||
TTB* pNameIdx;
|
TTB* pNameIdx;
|
||||||
TTB* pCtbIdx;
|
TTB* pCtbIdx;
|
||||||
#ifdef USE_INVERTED_INDEX
|
// ivt idx and idx
|
||||||
void* pTagIvtIdx;
|
void* pTagIvtIdx;
|
||||||
#else
|
TTB* pTagIdx;
|
||||||
TTB* pTagIdx;
|
TTB* pTtlIdx;
|
||||||
#endif
|
|
||||||
TTB* pTtlIdx;
|
|
||||||
TTB* pSmaIdx;
|
TTB* pSmaIdx;
|
||||||
SMetaIdx* pIdx;
|
SMetaIdx* pIdx;
|
||||||
};
|
};
|
||||||
|
@ -117,7 +116,7 @@ typedef struct {
|
||||||
} SSmaIdxKey;
|
} SSmaIdxKey;
|
||||||
|
|
||||||
// metaTable ==================
|
// metaTable ==================
|
||||||
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
|
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
|
||||||
STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey);
|
STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey);
|
||||||
|
|
||||||
#ifndef META_REFACT
|
#ifndef META_REFACT
|
||||||
|
|
|
@ -87,7 +87,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
|
||||||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
|
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
|
||||||
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
|
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
|
||||||
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
|
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
|
||||||
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp *pMetaRsp);
|
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
|
||||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
|
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
|
||||||
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
||||||
|
@ -104,6 +104,7 @@ int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppRea
|
||||||
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
|
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
|
||||||
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
|
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
|
||||||
void* metaGetIdx(SMeta* pMeta);
|
void* metaGetIdx(SMeta* pMeta);
|
||||||
|
void* metaGetIvtIdx(SMeta* pMeta);
|
||||||
|
|
||||||
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
|
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
|
||||||
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
||||||
|
|
|
@ -93,7 +93,6 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// open pTagIdx
|
// open pTagIdx
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
// TODO(yihaoDeng), refactor later
|
// TODO(yihaoDeng), refactor later
|
||||||
char indexFullPath[128] = {0};
|
char indexFullPath[128] = {0};
|
||||||
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
|
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
|
||||||
|
@ -104,13 +103,11 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
#else
|
|
||||||
ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
|
ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
|
metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
// open pTtlIdx
|
// open pTtlIdx
|
||||||
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
|
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
|
||||||
|
@ -141,11 +138,8 @@ _err:
|
||||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||||
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
|
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
||||||
#else
|
|
||||||
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
|
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
|
||||||
#endif
|
|
||||||
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
|
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
|
||||||
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
|
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
|
||||||
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
|
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
|
||||||
|
|
|
@ -603,9 +603,6 @@ typedef struct {
|
||||||
} SIdxCursor;
|
} SIdxCursor;
|
||||||
|
|
||||||
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
return -1;
|
|
||||||
#else
|
|
||||||
SIdxCursor *pCursor = NULL;
|
SIdxCursor *pCursor = NULL;
|
||||||
|
|
||||||
int32_t ret = 0, valid = 0;
|
int32_t ret = 0, valid = 0;
|
||||||
|
@ -681,5 +678,4 @@ END:
|
||||||
taosMemoryFree(pCursor);
|
taosMemoryFree(pCursor);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -890,27 +890,12 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
||||||
nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
|
nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
|
||||||
return metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
|
return metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn);
|
||||||
}
|
}
|
||||||
|
|
||||||
// update tag index
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
tb_uid_t suid = pCtbEntry->ctbEntry.suid;
|
|
||||||
tb_uid_t tuid = pCtbEntry->uid;
|
|
||||||
|
|
||||||
SIndexMultiTerm *tmGroup = indexMultiTermCreate();
|
|
||||||
|
|
||||||
SIndexTerm *tm = indexTermCreate(suid, ADD_VALUE, pTagColumn->type, pTagColumn->name, sizeof(pTagColumn->name),
|
|
||||||
pTagData, pTagData == NULL ? 0 : strlen(pTagData));
|
|
||||||
indexMultiTermAdd(tmGroup, tm);
|
|
||||||
int ret = indexPut((SIndex *)pMeta->pTagIvtIdx, tmGroup, tuid);
|
|
||||||
indexMultiTermDestroy(tmGroup);
|
|
||||||
#else
|
|
||||||
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
|
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
|
||||||
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
|
tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
|
||||||
metaDestroyTagIdxKey(pTagIdxKey);
|
metaDestroyTagIdxKey(pTagIdxKey);
|
||||||
#endif
|
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
tdbFree(pData);
|
tdbFree(pData);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -995,10 +980,5 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// refactor later
|
// refactor later
|
||||||
void *metaGetIdx(SMeta *pMeta) {
|
void *metaGetIdx(SMeta *pMeta) { return pMeta->pTagIdx; }
|
||||||
#ifdef USE_INVERTED_INDEX
|
void *metaGetIvtIdx(SMeta *pMeta) { return pMeta->pTagIvtIdx; }
|
||||||
return pMeta->pTagIvtIdx;
|
|
||||||
#else
|
|
||||||
return pMeta->pTagIdx;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
|
@ -2829,6 +2829,12 @@ void* tsdbGetIdx(SMeta* pMeta) {
|
||||||
}
|
}
|
||||||
return metaGetIdx(pMeta);
|
return metaGetIdx(pMeta);
|
||||||
}
|
}
|
||||||
|
void* tsdbGetIvtIdx(SMeta* pMeta) {
|
||||||
|
if (pMeta == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return metaGetIvtIdx(pMeta);
|
||||||
|
}
|
||||||
int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
|
int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
|
||||||
SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
|
SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
|
||||||
|
|
||||||
|
|
|
@ -4052,12 +4052,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->limit = *pLimit;
|
pInfo->limit = *pLimit;
|
||||||
pInfo->slimit = *pSlimit;
|
pInfo->slimit = *pSlimit;
|
||||||
pInfo->curOffset = pLimit->offset;
|
pInfo->curOffset = pLimit->offset;
|
||||||
pInfo->curSOffset = pSlimit->offset;
|
pInfo->curSOffset = pSlimit->offset;
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->pFilterNode= pCondition;
|
pInfo->pFilterNode = pCondition;
|
||||||
|
|
||||||
int32_t numOfCols = num;
|
int32_t numOfCols = num;
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
|
@ -4404,7 +4404,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
STimeWindowAggSupp twSup = {
|
STimeWindowAggSupp twSup = {
|
||||||
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
|
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
|
||||||
tsdbReaderT pDataReader = NULL;
|
tsdbReaderT pDataReader = NULL;
|
||||||
if (pHandle->vnode) {
|
if (pHandle->vnode) {
|
||||||
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||||
|
@ -4487,7 +4487,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
|
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
|
||||||
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
|
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
|
||||||
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pProjPhyNode->node.pConditions, pTaskInfo);
|
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit,
|
||||||
|
pProjPhyNode->node.pConditions, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
|
||||||
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
|
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||||
|
@ -4836,7 +4837,8 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
|
||||||
|
|
||||||
if (tableType == TSDB_SUPER_TABLE) {
|
if (tableType == TSDB_SUPER_TABLE) {
|
||||||
if (pTagCond) {
|
if (pTagCond) {
|
||||||
SIndexMetaArg metaArg = {.metaEx = metaHandle, .metaHandle = tsdbGetIdx(metaHandle), .suid = tableUid};
|
SIndexMetaArg metaArg = {
|
||||||
|
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid};
|
||||||
|
|
||||||
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
||||||
code = doFilterTag(pTagCond, &metaArg, res);
|
code = doFilterTag(pTagCond, &metaArg, res);
|
||||||
|
@ -5187,8 +5189,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey,
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size_t size) {
|
||||||
size_t size) {
|
|
||||||
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
|
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
|
||||||
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
|
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
|
||||||
pSup->pResultRows = taosArrayInit(1024, size);
|
pSup->pResultRows = taosArrayInit(1024, size);
|
||||||
|
|
|
@ -335,44 +335,34 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
||||||
}
|
}
|
||||||
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
||||||
SIndexMetaArg *arg = &output->arg;
|
SIndexMetaArg *arg = &output->arg;
|
||||||
#ifdef USE_INVERTED_INDEX
|
int ret = 0;
|
||||||
SIndexTerm *tm = indexTermCreate(arg->suid, DEFAULT, right->colValType, left->colName, strlen(left->colName),
|
|
||||||
right->condValue, strlen(right->condValue));
|
|
||||||
if (tm == NULL) {
|
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ret = 0;
|
|
||||||
EIndexQueryType qtype = 0;
|
EIndexQueryType qtype = 0;
|
||||||
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
|
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
|
||||||
|
|
||||||
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
|
|
||||||
indexMultiTermQueryAdd(mtm, tm, qtype);
|
|
||||||
if (left->colValType == TSDB_DATA_TYPE_JSON) {
|
if (left->colValType == TSDB_DATA_TYPE_JSON) {
|
||||||
ret = tIndexJsonSearch(arg->metaHandle, mtm, output->result);
|
SIndexTerm *tm = indexTermCreate(arg->suid, DEFAULT, right->colValType, left->colName, strlen(left->colName),
|
||||||
|
right->condValue, strlen(right->condValue));
|
||||||
|
if (tm == NULL) {
|
||||||
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
|
||||||
|
indexMultiTermQueryAdd(mtm, tm, qtype);
|
||||||
|
ret = tIndexJsonSearch(arg->ivtIdx, mtm, output->result);
|
||||||
} else {
|
} else {
|
||||||
ret = indexSearch(arg->metaHandle, mtm, output->result);
|
bool reverse;
|
||||||
|
Filter filterFunc = sifGetFilterFunc(qtype, &reverse);
|
||||||
|
|
||||||
|
SMetaFltParam param = {.suid = arg->suid,
|
||||||
|
.cid = left->colId,
|
||||||
|
.type = left->colValType,
|
||||||
|
.val = right->condValue,
|
||||||
|
.reverse = reverse,
|
||||||
|
.filterFunc = filterFunc};
|
||||||
|
|
||||||
|
ret = metaFilteTableIds(arg->metaEx, ¶m, output->result);
|
||||||
}
|
}
|
||||||
indexDebug("index filter data size: %d", (int)taosArrayGetSize(output->result));
|
|
||||||
indexMultiTermQueryDestroy(mtm);
|
|
||||||
return ret;
|
return ret;
|
||||||
#else
|
|
||||||
EIndexQueryType qtype = 0;
|
|
||||||
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
|
|
||||||
bool reverse;
|
|
||||||
Filter filterFunc = sifGetFilterFunc(qtype, &reverse);
|
|
||||||
|
|
||||||
SMetaFltParam param = {.suid = arg->suid,
|
|
||||||
.cid = left->colId,
|
|
||||||
.type = left->colValType,
|
|
||||||
.val = right->condValue,
|
|
||||||
.reverse = reverse,
|
|
||||||
.filterFunc = filterFunc};
|
|
||||||
|
|
||||||
int ret = metaFilteTableIds(arg->metaEx, ¶m, output->result);
|
|
||||||
return ret;
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
|
|
Loading…
Reference in New Issue