diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 96feee3d7d..1e271bc8d8 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -37,6 +37,9 @@ typedef struct SMSmaCursor SMSmaCursor; // clang-format on // metaOpen ================== +int32_t metaRLock(SMeta* pMeta); +int32_t metaWLock(SMeta* pMeta); +int32_t metaULock(SMeta* pMeta); // metaEntry ================== int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME); @@ -57,6 +60,8 @@ int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid); static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); } struct SMeta { + TdThreadRwlock lock; + char* path; SVnode* pVnode; TENV* pEnv; diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 85a49ffabd..8b3f555f4f 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -22,6 +22,9 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); +static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } +static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } + int metaOpen(SVnode *pVnode, SMeta **ppMeta) { SMeta *pMeta = NULL; int ret; @@ -36,6 +39,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { return -1; } + metaInitLock(pMeta); pMeta->path = (char *)&pMeta[1]; sprintf(pMeta->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, VNODE_META_DIR); @@ -121,6 +125,7 @@ _err: if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb); if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb); if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv); + metaDestroyLock(pMeta); taosMemoryFree(pMeta); return -1; } @@ -136,12 +141,19 @@ int metaClose(SMeta *pMeta) { if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb); if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb); if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv); + metaDestroyLock(pMeta); taosMemoryFree(pMeta); } return 0; } +int32_t metaRLock(SMeta *pMeta) { return taosThreadRwlockRdlock(&pMeta->lock); } + +int32_t metaWLock(SMeta *pMeta) { return taosThreadRwlockWrlock(&pMeta->lock); } + +int32_t metaULock(SMeta *pMeta) { return taosThreadRwlockUnlock(&pMeta->lock); } + static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1; STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2; diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 68fbf01e28..8d2a4ebcf3 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -19,9 +19,13 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) { memset(pReader, 0, sizeof(*pReader)); pReader->flags = flags; pReader->pMeta = pMeta; + metaRLock(pMeta); } void metaReaderClear(SMetaReader *pReader) { + if (pReader->pMeta) { + metaULock(pReader->pMeta); + } tDecoderClear(&pReader->coder); tdbFree(pReader->pBuf); } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 7663047429..20248f39fb 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -439,29 +439,36 @@ _exit: } static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) { + metaWLock(pMeta); + // save to table.db - if (metaSaveToTbDb(pMeta, pME) < 0) return -1; + if (metaSaveToTbDb(pMeta, pME) < 0) goto _err; // update uid.idx - if (metaUpdateUidIdx(pMeta, pME) < 0) return -1; + if (metaUpdateUidIdx(pMeta, pME) < 0) goto _err; // update name.idx - if (metaUpdateNameIdx(pMeta, pME) < 0) return -1; + if (metaUpdateNameIdx(pMeta, pME) < 0) goto _err; if (pME->type == TSDB_CHILD_TABLE) { // update ctb.idx - if (metaUpdateCtbIdx(pMeta, pME) < 0) return -1; + if (metaUpdateCtbIdx(pMeta, pME) < 0) goto _err; // update tag.idx - if (metaUpdateTagIdx(pMeta, pME) < 0) return -1; + if (metaUpdateTagIdx(pMeta, pME) < 0) goto _err; } else { // update schema.db - if (metaSaveToSkmDb(pMeta, pME) < 0) return -1; + if (metaSaveToSkmDb(pMeta, pME) < 0) goto _err; } if (pME->type != TSDB_SUPER_TABLE) { - if (metaUpdateTtlIdx(pMeta, pME) < 0) return -1; + if (metaUpdateTtlIdx(pMeta, pME) < 0) goto _err; } + metaULock(pMeta); return 0; + +_err: + metaULock(pMeta); + return -1; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4638066935..2511e3a570 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3870,6 +3870,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch if (metaGetTableEntryByUid(&mr, uid) < 0) { tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); + metaReaderClear(&mr); terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; goto _error; } else { @@ -3880,6 +3881,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client + metaReaderClear(&mr); goto _error; } diff --git a/source/libs/executor/inc/indexoperator.h b/source/libs/executor/inc/indexoperator.h index 9e67ac7f41..d033c63ef8 100644 --- a/source/libs/executor/inc/indexoperator.h +++ b/source/libs/executor/inc/indexoperator.h @@ -13,11 +13,23 @@ * along with this program. If not, see . */ -#include "filter.h" +#ifndef _INDEX_OPERATOR_H +#define _INDEX_OPERATOR_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "nodes.h" #include "tglobal.h" typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus; SIdxFltStatus idxGetFltStatus(SNode *pFilterNode); // construct tag filter operator later -int32_t doFilterTag(const SNode *pFilterNode, SArray *resutl); +int32_t doFilterTag(const SNode *pFilterNode, SArray *result); + +#ifdef __cplusplus +} +#endif + +#endif /*INDEX_OPERATOR_*/ diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index fe30ebb2ea..c17fcacf1f 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -583,7 +583,7 @@ int32_t doFilterTag(const SNode *pFilterNode, SArray *result) { SFilterInfo *filter = NULL; // todo move to the initialization function - SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0)); + // SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0)); SIFParam param = {0}; SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m)); @@ -598,9 +598,9 @@ SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) { if (pFilterNode == NULL) { return SFLT_NOT_INDEX; } - SFilterInfo *filter = NULL; + // SFilterInfo *filter = NULL; // todo move to the initialization function - SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0)); + // SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0)); SIF_ERR_RET(sifGetFltHint((SNode *)pFilterNode, &st)); return st; diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 5ed960af82..c7ec0eece2 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -948,10 +948,6 @@ TEST(testCase, build_executor_tree_Test) { code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); ASSERT_EQ(code, 0); } -TEST(testCase, index_plan_test) { - // add later - EXPECT_EQ(0, 0); -} #if 0 TEST(testCase, inMem_sort_Test) { diff --git a/source/libs/executor/test/indexexcutorTests.cpp b/source/libs/executor/test/indexexcutorTests.cpp new file mode 100644 index 0000000000..5f1bff45a3 --- /dev/null +++ b/source/libs/executor/test/indexexcutorTests.cpp @@ -0,0 +1,253 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#include "executor.h" +#include "executorimpl.h" +#include "indexoperator.h" +#include "os.h" + +#include "stub.h" +#include "taos.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "tdef.h" +#include "trpc.h" +#include "tvariant.h" + +namespace { +SColumnInfo createColumnInfo(int32_t colId, int32_t type, int32_t bytes) { + SColumnInfo info = {0}; + info.colId = colId; + info.type = type; + info.bytes = bytes; + return info; +} + +int64_t sifLeftV = 21, sifRightV = 10; +double sifLeftVd = 21.0, sifRightVd = 10.0; + +void sifFreeDataBlock(void *block) { blockDataDestroy(*(SSDataBlock **)block); } + +void sifInitLogFile() { + const char * defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; + + tsAsyncLog = 0; + qDebugFlag = 159; + strcpy(tsLogDir, "/tmp/sif"); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); + + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} + +void sifAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *slotId, bool newBlock, int32_t rows, + SColumnInfo *colInfo) { + if (newBlock) { + SSDataBlock *res = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); + res->info.numOfCols = 1; + res->info.rows = rows; + res->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + SColumnInfoData idata = {0}; + idata.info = *colInfo; + + taosArrayPush(res->pDataBlock, &idata); + taosArrayPush(pBlockList, &res); + + blockDataEnsureCapacity(res, rows); + + *dataBlockId = taosArrayGetSize(pBlockList) - 1; + res->info.blockId = *dataBlockId; + *slotId = 0; + } else { + SSDataBlock *res = *(SSDataBlock **)taosArrayGetLast(pBlockList); + res->info.numOfCols++; + SColumnInfoData idata = {0}; + idata.info = *colInfo; + + colInfoDataEnsureCapacity(&idata, 0, rows); + + taosArrayPush(res->pDataBlock, &idata); + + *dataBlockId = taosArrayGetSize(pBlockList) - 1; + *slotId = taosArrayGetSize(res->pDataBlock) - 1; + } +} + +void sifMakeValueNode(SNode **pNode, int32_t dataType, void *value) { + SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_VALUE); + SValueNode *vnode = (SValueNode *)node; + vnode->node.resType.type = dataType; + + if (IS_VAR_DATA_TYPE(dataType)) { + vnode->datum.p = (char *)taosMemoryMalloc(varDataTLen(value)); + varDataCopy(vnode->datum.p, value); + vnode->node.resType.bytes = varDataTLen(value); + } else { + vnode->node.resType.bytes = tDataTypes[dataType].bytes; + assignVal((char *)nodesGetValueFromNode(vnode), (const char *)value, 0, dataType); + } + + *pNode = (SNode *)vnode; +} + +void sifMakeColumnNode(SNode **pNode, const char *db, const char *colName, EColumnType colType, uint8_t colValType) { + SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_COLUMN); + SColumnNode *rnode = (SColumnNode *)node; + memcpy(rnode->dbName, db, strlen(db)); + memcpy(rnode->colName, colName, strlen(colName)); + + rnode->colType = colType; + rnode->node.resType.type = colValType; + + *pNode = (SNode *)rnode; +} + +void sifMakeOpNode(SNode **pNode, EOperatorType opType, int32_t resType, SNode *pLeft, SNode *pRight) { + SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_OPERATOR); + SOperatorNode *onode = (SOperatorNode *)node; + onode->node.resType.type = resType; + onode->node.resType.bytes = tDataTypes[resType].bytes; + + onode->opType = opType; + onode->pLeft = pLeft; + onode->pRight = pRight; + + *pNode = (SNode *)onode; +} + +void sifMakeListNode(SNode **pNode, SNodeList *list, int32_t resType) { + SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + SNodeListNode *lnode = (SNodeListNode *)node; + lnode->dataType.type = resType; + lnode->pNodeList = list; + + *pNode = (SNode *)lnode; +} + +void sifMakeLogicNode(SNode **pNode, ELogicConditionType opType, SNode **nodeList, int32_t nodeNum) { + SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + SLogicConditionNode *onode = (SLogicConditionNode *)node; + onode->condType = opType; + onode->node.resType.type = TSDB_DATA_TYPE_BOOL; + onode->node.resType.bytes = sizeof(bool); + + onode->pParameterList = nodesMakeList(); + for (int32_t i = 0; i < nodeNum; ++i) { + nodesListAppend(onode->pParameterList, nodeList[i]); + } + + *pNode = (SNode *)onode; +} + +void sifMakeTargetNode(SNode **pNode, int16_t dataBlockId, int16_t slotId, SNode *snode) { + SNode * node = (SNode *)nodesMakeNode(QUERY_NODE_TARGET); + STargetNode *onode = (STargetNode *)node; + onode->pExpr = snode; + onode->dataBlockId = dataBlockId; + onode->slotId = slotId; + + *pNode = (SNode *)onode; +} + +} // namespace + +#if 1 +TEST(testCase, index_filter) { + { + SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; + sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT); + sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV); + sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_INT, pLeft, pRight); + SArray *result = taosArrayInit(4, sizeof(uint64_t)); + doFilterTag(opNode, result); + EXPECT_EQ(1, taosArrayGetSize(result)); + taosArrayDestroy(result); + nodesDestroyNode(res); + } + { + SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; + sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT); + sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV); + sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight); + + SArray *result = taosArrayInit(4, sizeof(uint64_t)); + doFilterTag(opNode, result); + EXPECT_EQ(1, taosArrayGetSize(result)); + + taosArrayDestroy(result); + nodesDestroyNode(res); + } +} + +TEST(testCase, index_filter_varify) { + { + SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; + sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT); + sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV); + sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_INT, pLeft, pRight); + nodesDestroyNode(res); + + SIdxFltStatus st = idxGetFltStatus(opNode); + EXPECT_EQ(st, SFLT_ACCURATE_INDEX); + } + { + SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; + sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT); + sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV); + sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight); + + SIdxFltStatus st = idxGetFltStatus(opNode); + EXPECT_EQ(st, SFLT_COARSE_INDEX); + nodesDestroyNode(res); + } + { + SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; + sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT); + sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV); + sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_INT, pLeft, pRight); + nodesDestroyNode(res); + + SIdxFltStatus st = idxGetFltStatus(opNode); + EXPECT_EQ(st, SFLT_ACCURATE_INDEX); + } + { + SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; + sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT); + sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV); + sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight); + + SIdxFltStatus st = idxGetFltStatus(opNode); + EXPECT_EQ(st, SFLT_COARSE_INDEX); + nodesDestroyNode(res); + } +} + +#endif + +#pragma GCC diagnostic pop diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 3ee952fcdf..8aeb9c4856 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -62,7 +62,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { const char *errStr = tstrerror(err); int32_t linuxErr = errno; const char *linuxErrMsg = strerror(errno); - sError("walWriteWithSyncInfo error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg); + sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); ASSERT(0); } //assert(code == 0); @@ -83,7 +83,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { const char *errStr = tstrerror(err); int32_t linuxErr = errno; const char *linuxErrMsg = strerror(errno); - sError("walReadWithHandle error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg); + sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); ASSERT(0); } //assert(walReadWithHandle(pWalHandle, index) == 0); @@ -119,7 +119,7 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { const char *errStr = tstrerror(err); int32_t linuxErr = errno; const char *linuxErrMsg = strerror(errno); - sError("walRollback error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg); + sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); ASSERT(0); } return 0; // to avoid compiler error @@ -152,7 +152,7 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { const char *errStr = tstrerror(err); int32_t linuxErr = errno; const char *linuxErrMsg = strerror(errno); - sError("walCommit error, err:%d, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, errStr, linuxErr, linuxErrMsg); + sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); ASSERT(0); } return 0; // to avoid compiler error