enh: support tag filter

This commit is contained in:
yihaoDeng 2022-05-28 16:15:24 +08:00
parent 9678b81725
commit 6acbe7e777
17 changed files with 257 additions and 181 deletions

View File

@ -192,11 +192,16 @@ void indexTermDestroy(SIndexTerm* p);
void indexInit();
/* index filter */
typedef struct SIndexMetaArg {
void* metaHandle;
uint64_t suid;
} SIndexMetaArg;
typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus;
SIdxFltStatus idxGetFltStatus(SNode* pFilterNode);
int32_t doFilterTag(const SNode* pFilterNode, void* metaHandle, SArray* result);
int32_t doFilterTag(const SNode* pFilterNode, SIndexMetaArg* metaArg, SArray* result);
/*
* destory index env
*

View File

@ -1153,7 +1153,9 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
pColumn->varmeta.length = 0;
} else {
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
if (pColumn->nullbitmap != NULL) {
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
}
}
}

View File

@ -293,7 +293,7 @@ int32_t taosAddClientLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "idxDebugFlag", 0, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 1) != 0) return -1;
return 0;
}

View File

@ -105,10 +105,12 @@ tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STab
void *pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list);
int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list);
void * tsdbGetIdx(SMeta *pMeta);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
@ -174,7 +176,7 @@ struct SMetaEntry {
int64_t version;
int8_t type;
tb_uid_t uid;
char *name;
char * name;
union {
struct {
SSchemaWrapper schemaRow;
@ -202,17 +204,17 @@ struct SMetaEntry {
struct SMetaReader {
int32_t flags;
SMeta *pMeta;
SMeta * pMeta;
SDecoder coder;
SMetaEntry me;
void *pBuf;
void * pBuf;
int32_t szBuf;
};
struct SMTbCursor {
TBC *pDbc;
void *pKey;
void *pVal;
TBC * pDbc;
void * pKey;
void * pVal;
int32_t kLen;
int32_t vLen;
SMetaReader mr;

View File

@ -103,6 +103,7 @@ SArray* metaGetSmaTbUids(SMeta* pMeta);
int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
void* metaGetIdx(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);

View File

@ -31,9 +31,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int vLen = 0;
const void *pKey = NULL;
const void *pVal = NULL;
void *pBuf = NULL;
void * pBuf = NULL;
int32_t szBuf = 0;
void *p = NULL;
void * p = NULL;
SMetaReader mr = {0};
// validate req
@ -87,7 +87,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
}
// drop all child tables
TBC *pCtbIdxc = NULL;
TBC * pCtbIdxc = NULL;
SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
@ -142,8 +142,8 @@ _exit:
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
TBC *pUidIdxc = NULL;
TBC *pTbDbc = NULL;
TBC * pUidIdxc = NULL;
TBC * pTbDbc = NULL;
const void *pData;
int nData;
int64_t oversion;
@ -262,7 +262,7 @@ _err:
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
void *pData = NULL;
void * pData = NULL;
int nData = 0;
int rc = 0;
tb_uid_t uid;
@ -288,7 +288,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
}
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
void *pData = NULL;
void * pData = NULL;
int nData = 0;
int rc = 0;
int64_t version;
@ -324,14 +324,14 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
}
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
void *pVal = NULL;
void * pVal = NULL;
int nVal = 0;
const void *pData = NULL;
const void * pData = NULL;
int nData = 0;
int ret = 0;
tb_uid_t uid;
int64_t oversion;
SSchema *pColumn = NULL;
SSchema * pColumn = NULL;
SMetaEntry entry = {0};
SSchemaWrapper *pSchema;
int c;
@ -479,7 +479,7 @@ _err:
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry ctbEntry = {0};
SMetaEntry stbEntry = {0};
void *pVal = NULL;
void * pVal = NULL;
int nVal = 0;
int ret;
int c;
@ -510,7 +510,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
oversion = *(int64_t *)pData;
// search table.db
TBC *pTbDbc = NULL;
TBC * pTbDbc = NULL;
SDecoder dc1 = {0};
SDecoder dc2 = {0};
@ -534,7 +534,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaDecodeEntry(&dc2, &stbEntry);
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
SSchema *pColumn = NULL;
SSchema * pColumn = NULL;
int32_t iCol = 0;
for (;;) {
pColumn = NULL;
@ -639,8 +639,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey;
void *pKey = NULL;
void *pVal = NULL;
void * pKey = NULL;
void * pVal = NULL;
int kLen = 0;
int vLen = 0;
SEncoder coder = {0};
@ -755,14 +755,14 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
}
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
void *pData = NULL;
void * pData = NULL;
int nData = 0;
STbDbKey tbDbKey = {0};
SMetaEntry stbEntry = {0};
STagIdxKey *pTagIdxKey = NULL;
STagIdxKey * pTagIdxKey = NULL;
int32_t nTagIdxKey;
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
const void *pTagData = NULL; //
const void * pTagData = NULL; //
SDecoder dc = {0};
// get super table
@ -804,7 +804,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SEncoder coder = {0};
void *pVal = NULL;
void * pVal = NULL;
int vLen = 0;
int rcode = 0;
SSkmDbKey skmDbKey = {0};
@ -880,3 +880,11 @@ _err:
metaULock(pMeta);
return -1;
}
// refactor later
void *metaGetIdx(SMeta *pMeta) {
#ifdef USE_INVERTED_INDEX
return pMeta->pTagIvtIdx;
#else
return pMeta->pTagIdx;
#endif
}

View File

@ -259,8 +259,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
}
taosArrayPush(pTableCheckInfo, &info);
tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId,
info.lastKey, pTsdbReadHandle->idStr);
tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, info.lastKey,
pTsdbReadHandle->idStr);
}
// TODO group table according to the tag value.
@ -363,13 +363,16 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle,
}
if (level == TSDB_RETENTION_L0) {
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L0);
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
TSDB_RETENTION_L0);
return VND_RSMA0(pVnode);
} else if (level == TSDB_RETENTION_L1) {
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L1);
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
TSDB_RETENTION_L1);
return VND_RSMA1(pVnode);
} else {
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L2);
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle,
TSDB_RETENTION_L2);
return VND_RSMA2(pVnode);
}
}
@ -412,7 +415,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
if (pCond->numOfCols > 0) {
int32_t rowLen = 0;
for(int32_t i = 0; i < pCond->numOfCols; ++i) {
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
rowLen += pCond->colList[i].bytes;
}
@ -660,7 +663,7 @@ SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) {
}
// leave only one table for each group
//static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
// static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
// assert(pGroupList);
// size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
//
@ -692,7 +695,7 @@ SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) {
// return pNew;
//}
//tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
// tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
// uint64_t qId, uint64_t taskId) {
// STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
//
@ -1299,7 +1302,6 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey));
if (cacheDataInFileBlockHole) {
@ -1342,7 +1344,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
pTsdbReadHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
cur->win = binfo.window;
cur->win = binfo.window;
cur->mixBlock = false;
cur->blockCompleted = true;
@ -1353,9 +1355,9 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
cur->lastKey = binfo.window.skey - 1;
cur->pos = -1;
}
} else { // partially copy to dest buffer
} else { // partially copy to dest buffer
// make sure to only load once
bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows -1 && (!ascScan)));
bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows - 1 && (!ascScan)));
if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) {
code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot);
if (code != TSDB_CODE_SUCCESS) {
@ -1864,7 +1866,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
int32_t step = ascScan? 1 : -1;
int32_t step = ascScan ? 1 : -1;
int32_t start = cur->pos;
int32_t end = endPos;
@ -1879,8 +1881,8 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
// the time window should always be ascending order: skey <= ekey
cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
cur->mixBlock = (numOfRows != pBlockInfo->rows);
cur->lastKey = tsArray[endPos] + step;
cur->blockCompleted = (ascScan? (endPos == pBlockInfo->rows - 1):(endPos == 0));
cur->lastKey = tsArray[endPos] + step;
cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0));
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
int32_t pos = endPos + step;
@ -1896,7 +1898,7 @@ int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* p
// NOTE: reverse the order to find the end position in data block
int32_t endPos = -1;
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
int32_t order = ascScan? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pTsdbReadHandle->cur;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
@ -1956,7 +1958,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
int32_t step = ascScan ? 1 : -1;
// for search the endPos, so the order needs to reverse
@ -1967,8 +1969,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
STimeWindow* pWin = &blockInfo.window;
tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
" rows:%d, start:%d, end:%d, %s", pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows,
cur->pos, endPos, pTsdbReadHandle->idStr);
" rows:%d, start:%d, end:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos,
pTsdbReadHandle->idStr);
// compared with the data from in-memory buffer, to generate the correct timestamp array list
int32_t numOfRows = 0;
@ -2087,8 +2090,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
// still assign data into current row
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
numOfRows +=
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
@ -2153,8 +2157,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
* if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT
* copy them all to result buffer, since it may be overlapped with file data block.
*/
if (node == NULL ||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) {
@ -2175,7 +2178,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
if (!ascScan) {
TSWAP(cur->win.skey, cur->win.ekey);
@ -2794,6 +2797,12 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
return numOfRows;
}
void* tsdbGetIdx(SMeta* pMeta) {
if (pMeta == NULL) {
return NULL;
}
return metaGetIdx(pMeta);
}
int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
@ -3382,65 +3391,65 @@ int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
STimeWindow updateLastrowForEachGroup(STableListInfo* pList) {
STimeWindow window = {INT64_MAX, INT64_MIN};
// int32_t totalNumOfTable = 0;
// SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
//
// // NOTE: starts from the buffer in case of descending timestamp order check data blocks
// size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
// for (int32_t j = 0; j < numOfGroups; ++j) {
// SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
// TSKEY key = TSKEY_INITIAL_VAL;
//
// STableKeyInfo keyInfo = {0};
//
// size_t numOfTables = taosArrayGetSize(pGroup);
// for (int32_t i = 0; i < numOfTables; ++i) {
// STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
//
// // if the lastKey equals to INT64_MIN, there is no data in this table
// TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey;
// if (key < lastKey) {
// key = lastKey;
//
// // keyInfo.pTable = pInfo->pTable;
// keyInfo.lastKey = key;
// pInfo->lastKey = key;
//
// if (key < window.skey) {
// window.skey = key;
// }
//
// if (key > window.ekey) {
// window.ekey = key;
// }
// }
// }
//
// // more than one table in each group, only one table left for each group
// // if (keyInfo.pTable != NULL) {
// // totalNumOfTable++;
// // if (taosArrayGetSize(pGroup) == 1) {
// // // do nothing
// // } else {
// // taosArrayClear(pGroup);
// // taosArrayPush(pGroup, &keyInfo);
// // }
// // } else { // mark all the empty groups, and remove it later
// // taosArrayDestroy(pGroup);
// // taosArrayPush(emptyGroup, &j);
// // }
// }
//
// // window does not being updated, so set the original
// if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
// window = TSWINDOW_INITIALIZER;
// assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
// }
//
// taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup));
// taosArrayDestroy(emptyGroup);
//
// groupList->numOfTables = totalNumOfTable;
// int32_t totalNumOfTable = 0;
// SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
//
// // NOTE: starts from the buffer in case of descending timestamp order check data blocks
// size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
// for (int32_t j = 0; j < numOfGroups; ++j) {
// SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
// TSKEY key = TSKEY_INITIAL_VAL;
//
// STableKeyInfo keyInfo = {0};
//
// size_t numOfTables = taosArrayGetSize(pGroup);
// for (int32_t i = 0; i < numOfTables; ++i) {
// STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
//
// // if the lastKey equals to INT64_MIN, there is no data in this table
// TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey;
// if (key < lastKey) {
// key = lastKey;
//
// // keyInfo.pTable = pInfo->pTable;
// keyInfo.lastKey = key;
// pInfo->lastKey = key;
//
// if (key < window.skey) {
// window.skey = key;
// }
//
// if (key > window.ekey) {
// window.ekey = key;
// }
// }
// }
//
// // more than one table in each group, only one table left for each group
// // if (keyInfo.pTable != NULL) {
// // totalNumOfTable++;
// // if (taosArrayGetSize(pGroup) == 1) {
// // // do nothing
// // } else {
// // taosArrayClear(pGroup);
// // taosArrayPush(pGroup, &keyInfo);
// // }
// // } else { // mark all the empty groups, and remove it later
// // taosArrayDestroy(pGroup);
// // taosArrayPush(emptyGroup, &j);
// // }
// }
//
// // window does not being updated, so set the original
// if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
// window = TSWINDOW_INITIALIZER;
// assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
// }
//
// taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup));
// taosArrayDestroy(emptyGroup);
//
// groupList->numOfTables = totalNumOfTable;
return window;
}

View File

@ -4895,13 +4895,17 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
if (tableType == TSDB_SUPER_TABLE) {
if (pTagCond) {
SIndexMetaArg metaArg = {.metaHandle = tsdbGetIdx(metaHandle), .suid = tableUid};
SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, NULL, res);
code = doFilterTag(pTagCond, &metaArg, res);
if (code != TSDB_CODE_SUCCESS) {
qError("doFilterTag error:%d", code);
taosArrayDestroy(res);
terrno = code;
return code;
} else {
qDebug("doFilterTag error:%d, suid: %" PRIu64 "", code, tableUid);
}
for (int i = 0; i < taosArrayGetSize(res); i++) {
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};

View File

@ -98,13 +98,13 @@ typedef struct {
SArray *deled;
} SIdxTempResult;
SIdxTempResult *sIdxTempResultCreate();
SIdxTempResult *idxTempResultCreate();
void sIdxTempResultClear(SIdxTempResult *tr);
void idxTempResultClear(SIdxTempResult *tr);
void sIdxTempResultDestroy(SIdxTempResult *tr);
void idxTempResultDestroy(SIdxTempResult *tr);
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr);
void idxTempResultMergeTo(SIdxTempResult *tr, SArray *result);
#ifdef __cplusplus
}
#endif

View File

@ -201,6 +201,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
int32_t sz = indexSerialCacheKey(&key, buf);
indexDebug("suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
assert(*cache != NULL);
@ -328,6 +329,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
char buf[128] = {0};
ICacheKey key = {
.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
indexDebug("suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
int32_t sz = indexSerialCacheKey(&key, buf);
taosThreadMutexLock(&sIdx->mtx);
@ -341,7 +343,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
int64_t st = taosGetTimestampUs();
SIdxTempResult* tr = sIdxTempResultCreate();
SIdxTempResult* tr = idxTempResultCreate();
if (0 == indexCacheSearch(cache, query, tr, &s)) {
if (s == kTypeDeletion) {
indexInfo("col: %s already drop by", term->colName);
@ -363,12 +365,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
int64_t cost = taosGetTimestampUs() - st;
indexInfo("search cost: %" PRIu64 "us", cost);
sIdxTempResultMergeTo(*result, tr);
idxTempResultMergeTo(tr, *result);
sIdxTempResultDestroy(tr);
idxTempResultDestroy(tr);
return 0;
END:
sIdxTempResultDestroy(tr);
idxTempResultDestroy(tr);
return -1;
}
static void indexInterResultsDestroy(SArray* results) {
@ -409,13 +411,13 @@ static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdx
if (sz > 0) {
TFileValue* lv = taosArrayGetP(result, sz - 1);
if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
sIdxTempResultMergeTo(lv->tableId, tr);
sIdxTempResultClear(tr);
idxTempResultMergeTo(tr, lv->tableId);
idxTempResultClear(tr);
taosArrayPush(result, &tfv);
} else if (tfv == NULL) {
// handle last iterator
sIdxTempResultMergeTo(lv->tableId, tr);
idxTempResultMergeTo(tr, lv->tableId);
} else {
// temp result saved in help
tfileValueDestroy(tfv);
@ -489,7 +491,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
SIdxTempResult* tr = sIdxTempResultCreate();
SIdxTempResult* tr = idxTempResultCreate();
while (cn == true || tn == true) {
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
@ -515,7 +517,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
}
}
indexMayMergeTempToFinalResult(result, NULL, tr);
sIdxTempResultDestroy(tr);
idxTempResultDestroy(tr);
int ret = indexGenTFile(sIdx, pCache, result);
indexDestroyFinalResult(result);

View File

@ -133,6 +133,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal;
pCt->colType = term->colType;
pCt->version = atomic_load_64(&pCache->version);
char* key = indexCacheTermGet(pCt);
@ -597,10 +598,10 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result
indexMemRef(imm);
taosThreadMutexUnlock(&pCache->mtx);
int ret = indexQueryMem(mem, query, result, s);
int ret = (mem && mem->mem) ? indexQueryMem(mem, query, result, s) : 0;
if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm
ret = indexQueryMem(imm, query, result, s);
ret = (imm && imm->mem) ? indexQueryMem(imm, query, result, s) : 0;
}
indexMemUnRef(mem);
@ -709,7 +710,7 @@ static int32_t indexCacheJsonTermCompare(const void* l, const void* r) {
return cmp;
}
static MemTable* indexInternalCacheCreate(int8_t type) {
int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;
int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : TSDB_DATA_TYPE_BINARY;
int32_t (*cmpFn)(const void* l, const void* r) =
INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? indexCacheJsonTermCompare : indexCacheTermCompare;

View File

@ -38,13 +38,14 @@ typedef struct SIFParam {
char dbName[TSDB_DB_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
void *metaHandle;
SIndexMetaArg arg;
} SIFParam;
typedef struct SIFCtx {
int32_t code;
SHashObj *pRes; /* element is SIFParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan
int32_t code;
SHashObj * pRes; /* element is SIFParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan
SIndexMetaArg arg;
// SIdxFltStatus st;
} SIFCtx;
@ -259,7 +260,8 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
return TSDB_CODE_QRY_INVALID_INPUT;
}
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName),
SIndexMetaArg *arg = &output->arg;
SIndexTerm * tm = indexTermCreate(arg->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName),
right->condValue, strlen(right->condValue));
if (tm == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
@ -270,7 +272,8 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
indexMultiTermQueryAdd(mtm, tm, qtype);
int ret = indexSearch(NULL, mtm, output->result);
int ret = indexSearch(arg->metaHandle, mtm, output->result);
indexDebug("index filter data size: %d", (int)taosArrayGetSize(output->result));
indexMultiTermQueryDestroy(mtm);
return ret;
}
@ -374,6 +377,8 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
SIFParam *params = NULL;
SIF_ERR_RET(sifInitOperParams(&params, node, ctx));
// ugly code, refactor later
output->arg = ctx->arg;
sif_func_t operFn = sifGetOperFn(node->opType);
if (ctx->noExec && operFn == NULL) {
output->status = SFLT_NOT_INDEX;
@ -425,7 +430,7 @@ _return:
static EDealRes sifWalkFunction(SNode *pNode, void *context) {
SFunctionNode *node = (SFunctionNode *)pNode;
SIFParam output = {0};
SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
SIFCtx *ctx = context;
ctx->code = sifExecFunction(node, ctx, &output);
@ -441,7 +446,8 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) {
}
static EDealRes sifWalkLogic(SNode *pNode, void *context) {
SLogicConditionNode *node = (SLogicConditionNode *)pNode;
SIFParam output = {0};
SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
SIFCtx *ctx = context;
ctx->code = sifExecLogic(node, ctx, &output);
@ -457,7 +463,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) {
}
static EDealRes sifWalkOper(SNode *pNode, void *context) {
SOperatorNode *node = (SOperatorNode *)pNode;
SIFParam output = {0};
SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))};
SIFCtx *ctx = context;
ctx->code = sifExecOper(node, ctx, &output);
@ -509,8 +515,9 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SIFCtx ctx = {.code = 0, .noExec = false};
SIFCtx ctx = {.code = 0, .noExec = false, .arg = pDst->arg};
ctx.pRes = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
indexError("index-filter failed to taosHashInit");
return TSDB_CODE_QRY_OUT_OF_MEMORY;
@ -525,7 +532,9 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
indexError("no valid res in hash, node:(%p), type(%d)", (void *)&pNode, nodeType(pNode));
SIF_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
taosArrayAddAll(pDst->result, res->result);
if (res->result != NULL) {
taosArrayAddAll(pDst->result, res->result);
}
sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
@ -563,7 +572,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
SIF_RET(code);
}
int32_t doFilterTag(const SNode *pFilterNode, void *metaHandle, SArray *result) {
int32_t doFilterTag(const SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result) {
if (pFilterNode == NULL) {
return TSDB_CODE_SUCCESS;
}
@ -572,10 +581,12 @@ int32_t doFilterTag(const SNode *pFilterNode, void *metaHandle, SArray *result)
// todo move to the initialization function
// SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
SIFParam param = {.metaHandle = metaHandle};
SArray * output = taosArrayInit(8, sizeof(uint64_t));
SIFParam param = {.arg = *metaArg, .result = output};
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, &param));
taosArrayAddAll(result, param.result);
// taosArrayAddAll(result, param.result);
sifFreeParam(&param);
SIF_RET(TSDB_CODE_SUCCESS);
}

View File

@ -211,12 +211,12 @@ void tfileReaderDestroy(TFileReader* reader) {
}
// T_REF_INC(reader);
fstDestroy(reader->fst);
writerCtxDestroy(reader->ctx, reader->remove);
if (reader->remove) {
indexInfo("%s is removed", reader->ctx->file.buf);
} else {
indexInfo("%s is not removed", reader->ctx->file.buf);
}
writerCtxDestroy(reader->ctx, reader->remove);
taosMemoryFree(reader);
}

View File

@ -86,7 +86,7 @@ void iUnion(SArray *inters, SArray *final) {
mi[i].idx = 0;
}
while (1) {
uint64_t mVal = UINT_MAX;
uint64_t mVal = UINT64_MAX;
int mIdx = -1;
for (int j = 0; j < sz; j++) {
@ -158,7 +158,7 @@ int verdataCompare(const void *a, const void *b) {
return cmp;
}
SIdxTempResult *sIdxTempResultCreate() {
SIdxTempResult *idxTempResultCreate() {
SIdxTempResult *tr = taosMemoryCalloc(1, sizeof(SIdxTempResult));
tr->total = taosArrayInit(4, sizeof(uint64_t));
@ -166,7 +166,7 @@ SIdxTempResult *sIdxTempResultCreate() {
tr->deled = taosArrayInit(4, sizeof(uint64_t));
return tr;
}
void sIdxTempResultClear(SIdxTempResult *tr) {
void idxTempResultClear(SIdxTempResult *tr) {
if (tr == NULL) {
return;
}
@ -174,7 +174,7 @@ void sIdxTempResultClear(SIdxTempResult *tr) {
taosArrayClear(tr->added);
taosArrayClear(tr->deled);
}
void sIdxTempResultDestroy(SIdxTempResult *tr) {
void idxTempResultDestroy(SIdxTempResult *tr) {
if (tr == NULL) {
return;
}
@ -182,7 +182,7 @@ void sIdxTempResultDestroy(SIdxTempResult *tr) {
taosArrayDestroy(tr->added);
taosArrayDestroy(tr->deled);
}
void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) {
void idxTempResultMergeTo(SIdxTempResult *tr, SArray *result) {
taosArraySort(tr->total, uidCompare);
taosArraySort(tr->added, uidCompare);
taosArraySort(tr->deled, uidCompare);
@ -194,5 +194,10 @@ void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) {
iUnion(arrs, result);
taosArrayDestroy(arrs);
indexError("tmp result: total: %d, added: %d, del: %d", (int)taosArrayGetSize(tr->total),
(int)taosArrayGetSize(tr->added), (int)taosArrayGetSize(tr->deled));
if (taosArrayGetSize(tr->added) != 0 && taosArrayGetSize(result) == 0) {
indexError("except result: %d", (int)(taosArrayGetSize(result)));
}
iExcept(result, tr->deled);
}

View File

@ -1,74 +1,74 @@
add_executable(indexTest "")
add_executable(fstTest "")
add_executable(fstUT "")
add_executable(UtilUT "")
add_executable(jsonUT "")
add_executable(idxTest "")
add_executable(idxFstTest "")
add_executable(idxFstUT "")
add_executable(idxUtilUT "")
add_executable(idxJsonUT "")
target_sources(indexTest
target_sources(idxTest
PRIVATE
"indexTests.cc"
)
target_sources(fstTest
target_sources(idxFstTest
PRIVATE
"fstTest.cc"
)
target_sources(fstUT
target_sources(idxFstUT
PRIVATE
"fstUT.cc"
)
target_sources(UtilUT
target_sources(idxUtilUT
PRIVATE
"utilUT.cc"
)
target_sources(jsonUT
target_sources(idxJsonUT
PRIVATE
"jsonUT.cc"
)
target_include_directories ( indexTest
target_include_directories (idxTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories ( fstTest
target_include_directories (idxFstTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories ( fstUT
target_include_directories (idxFstUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories ( UtilUT
target_include_directories (idxUtilUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (jsonUT
target_include_directories (idxJsonUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (indexTest
target_link_libraries (idxTest
os
util
common
gtest_main
index
)
target_link_libraries (fstTest
target_link_libraries (idxFstTest
os
util
common
gtest_main
index
)
target_link_libraries (fstUT
target_link_libraries (idxFstUT
os
util
common
@ -76,7 +76,7 @@ target_link_libraries (fstUT
index
)
target_link_libraries (UtilUT
target_link_libraries (idxUtilUT
os
util
common
@ -84,7 +84,7 @@ target_link_libraries (UtilUT
index
)
target_link_libraries (jsonUT
target_link_libraries (idxJsonUT
os
util
common
@ -98,13 +98,13 @@ add_test(
)
add_test(
NAME idxJsonUT
COMMAND jsonUT
COMMAND idxJsonUT
)
add_test(
NAME idxUtilUT
COMMAND UtilUT
COMMAND idxUtilUT
)
add_test(
NAME idxFstUT
COMMAND fstUT
COMMAND idxFstUT
)

View File

@ -411,12 +411,12 @@ class TFileObj {
//
//
}
SIdxTempResult* tr = sIdxTempResultCreate();
SIdxTempResult* tr = idxTempResultCreate();
int ret = tfileReaderSearch(reader_, query, tr);
sIdxTempResultMergeTo(result, tr);
sIdxTempResultDestroy(tr);
idxTempResultMergeTo(tr, result);
idxTempResultDestroy(tr);
return ret;
}
~TFileObj() {
@ -531,11 +531,11 @@ class CacheObj {
indexCacheDebug(cache);
}
int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
SIdxTempResult* tr = sIdxTempResultCreate();
SIdxTempResult* tr = idxTempResultCreate();
int ret = indexCacheSearch(cache, query, tr, s);
sIdxTempResultMergeTo(result, tr);
sIdxTempResultDestroy(tr);
idxTempResultMergeTo(tr, result);
idxTempResultDestroy(tr);
if (ret != 0) {
std::cout << "failed to get from cache:" << ret << std::endl;

View File

@ -226,6 +226,22 @@ TEST_F(UtilEnv, 04union) {
iUnion(src, rslt);
assert(taosArrayGetSize(rslt) == 12);
}
TEST_F(UtilEnv, 05unionExcept) {
clearSourceArray(src);
clearFinalArray(rslt);
uint64_t arr2[] = {7};
SArray * f = (SArray *)taosArrayGetP(src, 1);
for (int i = 0; i < sizeof(arr2) / sizeof(arr2[0]); i++) {
taosArrayPush(f, &arr2[i]);
}
iUnion(src, rslt);
SArray *ept = taosArrayInit(0, sizeof(uint64_t));
iExcept(rslt, ept);
EXPECT_EQ(taosArrayGetSize(rslt), 1);
}
TEST_F(UtilEnv, 01Except) {
SArray *total = taosArrayInit(4, sizeof(uint64_t));
{
@ -308,16 +324,26 @@ TEST_F(UtilEnv, 01Except) {
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100);
}
TEST_F(UtilEnv, testFill) {
for (int i = 0; i < 10000000; i++) {
for (int i = 0; i < 1000000; i++) {
int64_t val = i;
char buf[65] = {0};
indexInt2str(val, buf, 1);
EXPECT_EQ(val, taosStr2int64(buf));
}
for (int i = 0; i < 10000000; i++) {
for (int i = 0; i < 1000000; i++) {
int64_t val = 0 - i;
char buf[65] = {0};
indexInt2str(val, buf, -1);
EXPECT_EQ(val, taosStr2int64(buf));
}
}
TEST_F(UtilEnv, TempResult) {
SIdxTempResult *relt = idxTempResultCreate();
SArray *f = taosArrayInit(0, sizeof(uint64_t));
uint64_t val = UINT64_MAX - 1;
taosArrayPush(relt->added, &val);
idxTempResultMergeTo(relt, f);
EXPECT_EQ(taosArrayGetSize(f), 1);
}