fix index bug
This commit is contained in:
parent
570b36cb9b
commit
5a4b51a968
|
@ -127,7 +127,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
|
|||
* @parma opt (input, rebuild index opts)
|
||||
* @return error code
|
||||
*/
|
||||
int indexRebuild(SIndex* index, SIndexOpts* opt);
|
||||
// int indexRebuild(SIndex* index, SIndexOpts* opt);
|
||||
|
||||
/*
|
||||
* open index
|
||||
|
@ -185,6 +185,25 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t c
|
|||
int32_t nColName, const char* colVal, int32_t nColVal);
|
||||
void indexTermDestroy(SIndexTerm* p);
|
||||
|
||||
/*
|
||||
* rebuild index
|
||||
*/
|
||||
void indexRebuild(SIndexJson* idx, void* iter);
|
||||
|
||||
/*
|
||||
* check index json status
|
||||
**/
|
||||
bool indexIsRebuild(SIndex* idx);
|
||||
/*
|
||||
* rebuild index json
|
||||
*/
|
||||
void indexJsonRebuild(SIndexJson* idx, void* iter);
|
||||
|
||||
/*
|
||||
* check index json status
|
||||
**/
|
||||
bool indexJsonIsRebuild(SIndexJson* idx);
|
||||
|
||||
/*
|
||||
* init index env
|
||||
*
|
||||
|
@ -203,7 +222,7 @@ typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltS
|
|||
|
||||
SIdxFltStatus idxGetFltStatus(SNode* pFilterNode);
|
||||
|
||||
int32_t doFilterTag(const SNode* pFilterNode, SIndexMetaArg* metaArg, SArray* result);
|
||||
int32_t doFilterTag(SNode* pFilterNode, SIndexMetaArg* metaArg, SArray* result, SIdxFltStatus* status);
|
||||
/*
|
||||
* destory index env
|
||||
*
|
||||
|
|
|
@ -413,7 +413,6 @@ static int metaDeleteTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
|||
return tdbTbDelete(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), &pMeta->txn);
|
||||
}
|
||||
|
||||
|
||||
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||
void * pData = NULL;
|
||||
int nData = 0;
|
||||
|
|
|
@ -13,10 +13,10 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "index.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
#include "index.h"
|
||||
#include "os.h"
|
||||
#include "tdatablock.h"
|
||||
#include "thash.h"
|
||||
#include "tmsg.h"
|
||||
|
@ -46,13 +46,9 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
|
|||
// do nothing
|
||||
}
|
||||
|
||||
bool isResultRowClosed(SResultRow* pRow) {
|
||||
return (pRow->closed == true);
|
||||
}
|
||||
bool isResultRowClosed(SResultRow* pRow) { return (pRow->closed == true); }
|
||||
|
||||
void closeResultRow(SResultRow* pResultRow) {
|
||||
pResultRow->closed = true;
|
||||
}
|
||||
void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; }
|
||||
|
||||
// TODO refactor: use macro
|
||||
SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) {
|
||||
|
@ -96,9 +92,7 @@ static int32_t resultrowComparAsc(const void* p1, const void* p2) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t resultrowComparDesc(const void* p1, const void* p2) {
|
||||
return resultrowComparAsc(p2, p1);
|
||||
}
|
||||
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
|
||||
|
||||
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order) {
|
||||
if (pGroupResInfo->pRows != NULL) {
|
||||
|
@ -194,7 +188,8 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
|||
// continue;
|
||||
// }
|
||||
|
||||
SColumnInfoData idata = createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
|
||||
SColumnInfoData idata =
|
||||
createColumnInfoData(pDescNode->dataType.type, pDescNode->dataType.bytes, pDescNode->slotId);
|
||||
idata.info.scale = pDescNode->dataType.scale;
|
||||
idata.info.precision = pDescNode->dataType.precision;
|
||||
|
||||
|
@ -301,8 +296,11 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
|||
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid};
|
||||
|
||||
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
||||
//code = doFilterTag(pTagIndexCond, &metaArg, res);
|
||||
SIdxFltStatus status = SFLT_NOT_INDEX;
|
||||
code = doFilterTag(pTagIndexCond, &metaArg, res, &status);
|
||||
if (code != 0 || status == SFLT_NOT_INDEX) {
|
||||
code = TSDB_CODE_INDEX_REBUILDING;
|
||||
}
|
||||
if (code == TSDB_CODE_INDEX_REBUILDING) {
|
||||
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
|
||||
} else if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -378,7 +376,6 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) {
|
|||
return pList;
|
||||
}
|
||||
|
||||
|
||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
||||
int32_t type) {
|
||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||
|
@ -670,8 +667,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
}
|
||||
|
||||
for (int32_t i = 1; i < numOfOutput; ++i) {
|
||||
(*rowEntryInfoOffset)[i] =
|
||||
(int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize);
|
||||
(*rowEntryInfoOffset)[i] = (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) +
|
||||
pFuncCtx[i - 1].resDataInfo.interBufSize);
|
||||
}
|
||||
|
||||
setSelectValueColumnInfo(pFuncCtx, numOfOutput);
|
||||
|
|
|
@ -45,6 +45,7 @@ extern "C" {
|
|||
|
||||
typedef enum { LT, LE, GT, GE, CONTAINS, EQ } RangeType;
|
||||
typedef enum { kTypeValue, kTypeDeletion } STermValueType;
|
||||
typedef enum { kRebuild, kFinished } SIdxStatus;
|
||||
|
||||
typedef struct SIndexStat {
|
||||
int32_t totalAdded; //
|
||||
|
@ -65,6 +66,7 @@ struct SIndex {
|
|||
|
||||
char* path;
|
||||
|
||||
int8_t status;
|
||||
SIndexStat stat;
|
||||
TdThreadMutex mtx;
|
||||
tsem_t sem;
|
||||
|
|
|
@ -230,7 +230,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
|
|||
}
|
||||
|
||||
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
|
||||
int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
|
||||
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
|
||||
|
||||
SIndexOpts* indexOptsCreate() { return NULL; }
|
||||
void indexOptsDestroy(SIndexOpts* opts) { return; }
|
||||
|
@ -299,6 +299,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
|
|||
|
||||
return tm;
|
||||
}
|
||||
|
||||
void indexTermDestroy(SIndexTerm* p) {
|
||||
taosMemoryFree(p->colName);
|
||||
taosMemoryFree(p->colVal);
|
||||
|
@ -319,6 +320,54 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms) {
|
|||
taosArrayDestroy(terms);
|
||||
}
|
||||
|
||||
/*
|
||||
* rebuild index
|
||||
*/
|
||||
|
||||
static void idxSchedRebuildIdx(SSchedMsg* msg) {
|
||||
// TODO
|
||||
SIndex* idx = msg->ahandle;
|
||||
|
||||
int8_t st = kFinished;
|
||||
atomic_store_8(&idx->status, st);
|
||||
idxReleaseRef(idx->refId);
|
||||
}
|
||||
void indexRebuild(SIndexJson* idx, void* iter) {
|
||||
// set up rebuild status
|
||||
int8_t st = kRebuild;
|
||||
atomic_store_8(&idx->status, st);
|
||||
|
||||
// task put into BG thread
|
||||
SSchedMsg schedMsg = {0};
|
||||
schedMsg.fp = idxSchedRebuildIdx;
|
||||
schedMsg.ahandle = idx;
|
||||
idxAcquireRef(idx->refId);
|
||||
taosScheduleTask(indexQhandle, &schedMsg);
|
||||
}
|
||||
|
||||
/*
|
||||
* check index json status
|
||||
**/
|
||||
bool indexIsRebuild(SIndex* idx) {
|
||||
// idx rebuild or not
|
||||
return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
|
||||
}
|
||||
/*
|
||||
* rebuild index
|
||||
*/
|
||||
void indexJsonRebuild(SIndexJson* idx, void* iter) {
|
||||
// idx rebuild or not
|
||||
indexRebuild(idx, iter);
|
||||
}
|
||||
|
||||
/*
|
||||
* check index json status
|
||||
**/
|
||||
bool indexJsonIsRebuild(SIndexJson* idx) {
|
||||
// load idx rebuild or not
|
||||
return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
|
||||
}
|
||||
|
||||
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
|
||||
SIndexTerm* term = query->term;
|
||||
const char* colName = term->colName;
|
||||
|
|
|
@ -192,7 +192,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
|||
switch (nodeType(node)) {
|
||||
case QUERY_NODE_VALUE: {
|
||||
SValueNode *vn = (SValueNode *)node;
|
||||
if (vn->typeData == TSDB_DATA_TYPE_NULL && (vn->literal == NULL || strlen(vn->literal) == 0)) {
|
||||
if (vn->typeData == TSDB_DATA_TYPE_NULL || (vn->literal == NULL || strlen(vn->literal) == 0)) {
|
||||
param->status = SFLT_NOT_INDEX;
|
||||
return 0;
|
||||
}
|
||||
|
@ -737,21 +737,20 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
|
|||
SIF_RET(code);
|
||||
}
|
||||
|
||||
int32_t doFilterTag(const SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result) {
|
||||
if (pFilterNode == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status) {
|
||||
SIdxFltStatus st = idxGetFltStatus(pFilterNode);
|
||||
if (st == SFLT_NOT_INDEX) {
|
||||
*status = st;
|
||||
return 0;
|
||||
}
|
||||
|
||||
SFilterInfo *filter = NULL;
|
||||
// todo move to the initialization function
|
||||
// SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
|
||||
|
||||
SArray * output = taosArrayInit(8, sizeof(uint64_t));
|
||||
SIFParam param = {.arg = *metaArg, .result = output};
|
||||
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m));
|
||||
|
||||
taosArrayAddAll(result, param.result);
|
||||
// taosArrayAddAll(result, param.result);
|
||||
sifFreeParam(¶m);
|
||||
SIF_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
@ -761,9 +760,6 @@ SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) {
|
|||
if (pFilterNode == NULL) {
|
||||
return SFLT_NOT_INDEX;
|
||||
}
|
||||
// SFilterInfo *filter = NULL;
|
||||
// todo move to the initialization function
|
||||
// SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
|
||||
|
||||
SIF_ERR_RET(sifGetFltHint((SNode *)pFilterNode, &st));
|
||||
return st;
|
||||
|
|
|
@ -19,17 +19,13 @@
|
|||
#ifdef WINDOWS
|
||||
|
||||
// add
|
||||
int8_t interlocked_add_fetch_8(int8_t volatile* ptr, int8_t val) {
|
||||
return _InterlockedExchangeAdd8(ptr, val) + val;
|
||||
}
|
||||
int8_t interlocked_add_fetch_8(int8_t volatile* ptr, int8_t val) { return _InterlockedExchangeAdd8(ptr, val) + val; }
|
||||
|
||||
int16_t interlocked_add_fetch_16(int16_t volatile* ptr, int16_t val) {
|
||||
return _InterlockedExchangeAdd16(ptr, val) + val;
|
||||
}
|
||||
|
||||
int32_t interlocked_add_fetch_32(int32_t volatile* ptr, int32_t val) {
|
||||
return _InterlockedExchangeAdd(ptr, val) + val;
|
||||
}
|
||||
int32_t interlocked_add_fetch_32(int32_t volatile* ptr, int32_t val) { return _InterlockedExchangeAdd(ptr, val) + val; }
|
||||
|
||||
int64_t interlocked_add_fetch_64(int64_t volatile* ptr, int64_t val) {
|
||||
return InterlockedExchangeAdd64(ptr, val) + val;
|
||||
|
@ -43,17 +39,11 @@ void* interlocked_add_fetch_ptr(void* volatile* ptr, void* val) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int8_t interlocked_and_fetch_8(int8_t volatile* ptr, int8_t val) {
|
||||
return _InterlockedAnd8(ptr, val) & val;
|
||||
}
|
||||
int8_t interlocked_and_fetch_8(int8_t volatile* ptr, int8_t val) { return _InterlockedAnd8(ptr, val) & val; }
|
||||
|
||||
int16_t interlocked_and_fetch_16(int16_t volatile* ptr, int16_t val) {
|
||||
return _InterlockedAnd16(ptr, val) & val;
|
||||
}
|
||||
int16_t interlocked_and_fetch_16(int16_t volatile* ptr, int16_t val) { return _InterlockedAnd16(ptr, val) & val; }
|
||||
|
||||
int32_t interlocked_and_fetch_32(int32_t volatile* ptr, int32_t val) {
|
||||
return _InterlockedAnd(ptr, val) & val;
|
||||
}
|
||||
int32_t interlocked_and_fetch_32(int32_t volatile* ptr, int32_t val) { return _InterlockedAnd(ptr, val) & val; }
|
||||
|
||||
int64_t interlocked_and_fetch_64(int64_t volatile* ptr, int64_t val) {
|
||||
#ifdef WINDOWS
|
||||
|
@ -96,17 +86,11 @@ void* interlocked_fetch_and_ptr(void* volatile* ptr, void* val) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int8_t interlocked_or_fetch_8(int8_t volatile* ptr, int8_t val) {
|
||||
return _InterlockedOr8(ptr, val) | val;
|
||||
}
|
||||
int8_t interlocked_or_fetch_8(int8_t volatile* ptr, int8_t val) { return _InterlockedOr8(ptr, val) | val; }
|
||||
|
||||
int16_t interlocked_or_fetch_16(int16_t volatile* ptr, int16_t val) {
|
||||
return _InterlockedOr16(ptr, val) | val;
|
||||
}
|
||||
int16_t interlocked_or_fetch_16(int16_t volatile* ptr, int16_t val) { return _InterlockedOr16(ptr, val) | val; }
|
||||
|
||||
int32_t interlocked_or_fetch_32(int32_t volatile* ptr, int32_t val) {
|
||||
return _InterlockedOr(ptr, val) | val;
|
||||
}
|
||||
int32_t interlocked_or_fetch_32(int32_t volatile* ptr, int32_t val) { return _InterlockedOr(ptr, val) | val; }
|
||||
|
||||
int64_t interlocked_or_fetch_64(int64_t volatile* ptr, int64_t val) {
|
||||
#ifdef WINDOWS
|
||||
|
@ -149,17 +133,11 @@ void* interlocked_fetch_or_ptr(void* volatile* ptr, void* val) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int8_t interlocked_xor_fetch_8(int8_t volatile* ptr, int8_t val) {
|
||||
return _InterlockedXor8(ptr, val) ^ val;
|
||||
}
|
||||
int8_t interlocked_xor_fetch_8(int8_t volatile* ptr, int8_t val) { return _InterlockedXor8(ptr, val) ^ val; }
|
||||
|
||||
int16_t interlocked_xor_fetch_16(int16_t volatile* ptr, int16_t val) {
|
||||
return _InterlockedXor16(ptr, val) ^ val;
|
||||
}
|
||||
int16_t interlocked_xor_fetch_16(int16_t volatile* ptr, int16_t val) { return _InterlockedXor16(ptr, val) ^ val; }
|
||||
|
||||
int32_t interlocked_xor_fetch_32(int32_t volatile* ptr, int32_t val) {
|
||||
return _InterlockedXor(ptr, val) ^ val;
|
||||
}
|
||||
int32_t interlocked_xor_fetch_32(int32_t volatile* ptr, int32_t val) { return _InterlockedXor(ptr, val) ^ val; }
|
||||
|
||||
int64_t interlocked_xor_fetch_64(int64_t volatile* ptr, int64_t val) {
|
||||
#ifdef WINDOWS
|
||||
|
@ -202,13 +180,9 @@ void* interlocked_fetch_xor_ptr(void* volatile* ptr, void* val) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int32_t interlocked_sub_fetch_32(int32_t volatile* ptr, int32_t val) {
|
||||
return interlocked_add_fetch_32(ptr, -val);
|
||||
}
|
||||
int32_t interlocked_sub_fetch_32(int32_t volatile* ptr, int32_t val) { return interlocked_add_fetch_32(ptr, -val); }
|
||||
|
||||
int64_t interlocked_sub_fetch_64(int64_t volatile* ptr, int64_t val) {
|
||||
return interlocked_add_fetch_64(ptr, -val);
|
||||
}
|
||||
int64_t interlocked_sub_fetch_64(int64_t volatile* ptr, int64_t val) { return interlocked_add_fetch_64(ptr, -val); }
|
||||
|
||||
void* interlocked_sub_fetch_ptr(void* volatile* ptr, void* val) {
|
||||
#ifdef WINDOWS
|
||||
|
@ -217,13 +191,9 @@ void* interlocked_sub_fetch_ptr(void* volatile* ptr, void* val) {
|
|||
return (void*)interlocked_add_fetch_64((int64_t volatile*)ptr, (int64_t)val);
|
||||
#endif
|
||||
}
|
||||
int32_t interlocked_fetch_sub_32(int32_t volatile* ptr, int32_t val) {
|
||||
return _InterlockedExchangeAdd(ptr, -val);
|
||||
}
|
||||
int32_t interlocked_fetch_sub_32(int32_t volatile* ptr, int32_t val) { return _InterlockedExchangeAdd(ptr, -val); }
|
||||
|
||||
int64_t interlocked_fetch_sub_64(int64_t volatile* ptr, int64_t val) {
|
||||
return _InterlockedExchangeAdd64(ptr, -val);
|
||||
}
|
||||
int64_t interlocked_fetch_sub_64(int64_t volatile* ptr, int64_t val) { return _InterlockedExchangeAdd64(ptr, -val); }
|
||||
|
||||
void* interlocked_fetch_sub_ptr(void* volatile* ptr, void* val) {
|
||||
#ifdef WINDOWS
|
||||
|
@ -273,7 +243,6 @@ int64_t atomic_exchange_64_impl(int64_t* ptr, int64_t val ) {
|
|||
}
|
||||
#endif
|
||||
|
||||
|
||||
int8_t atomic_load_8(int8_t volatile* ptr) {
|
||||
#ifdef WINDOWS
|
||||
return (*(int8_t volatile*)(ptr));
|
||||
|
@ -977,4 +946,3 @@ void* atomic_fetch_xor_ptr(void *ptr, void *val) {
|
|||
return __atomic_fetch_xor((void**)(ptr), (val), __ATOMIC_SEQ_CST);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue