diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 54d5d9eec2..58513e564c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -118,7 +118,11 @@ typedef struct SMetaFltParam { } SMetaFltParam; +// TODO, refactor later int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *results); +int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *parm, SArray *pUids); +int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids); +int32_t metaFilterTtl(SMeta *pMeta, SMetaFltParam *param, SArray *pUids); #if 1 // refact APIs below (TODO) typedef SVCreateTbReq STbCfg; diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index d34f6c5eec..9e2fe4aaf0 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -152,7 +152,7 @@ typedef struct { } SCtimeIdxKey; typedef struct { - int16_t ncol; + int64_t ncol; tb_uid_t uid; } SNcolIdxKey; diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 631ef09d4b..118514e4c4 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1033,6 +1033,117 @@ typedef struct { int32_t vLen; } SIdxCursor; +int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { + int32_t ret = 0; + + SIdxCursor *pCursor = NULL; + pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor)); + pCursor->pMeta = pMeta; + pCursor->suid = param->suid; + pCursor->cid = param->cid; + pCursor->type = param->type; + + metaRLock(pMeta); + ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL); + if (ret != 0) { + goto END; + } + int64_t uidLimit = param->reverse ? INT64_MAX : 0; + + SCtimeIdxKey ctimeKey = {.ctime = *(int64_t *)(param->val), .uid = uidLimit}; + SCtimeIdxKey *pCtimeKey = &ctimeKey; + + int cmp = 0; + if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) { + goto END; + } + bool first = true; + int32_t valid = 0; + while (1) { + void *entryKey = NULL; + int32_t nEntryKey = -1; + valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL); + if (valid < 0) break; + + SCtimeIdxKey *p = entryKey; + if (first) { + valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); + if (valid < 0) break; + continue; + } else { + break; + } + int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type); + if (cmp == 0) taosArrayPush(pUids, &p->uid); + if (cmp == -1) break; + valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); + if (valid < 0) break; + } + +END: + if (pCursor->pMeta) metaULock(pCursor->pMeta); + if (pCursor->pCur) tdbTbcClose(pCursor->pCur); + + taosMemoryFree(pCursor); + return ret; +} + +int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { + int32_t ret = 0; + char *buf = NULL; + + STagIdxKey *pKey = NULL; + int32_t nKey = 0; + + SIdxCursor *pCursor = NULL; + pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor)); + pCursor->pMeta = pMeta; + pCursor->suid = param->suid; + pCursor->cid = param->cid; + pCursor->type = param->type; + + metaRLock(pMeta); + ret = tdbTbcOpen(pMeta->pNameIdx, &pCursor->pCur, NULL); + +END: + if (pCursor->pMeta) metaULock(pCursor->pMeta); + if (pCursor->pCur) tdbTbcClose(pCursor->pCur); + taosMemoryFree(buf); + taosMemoryFree(pKey); + + taosMemoryFree(pCursor); + + return ret; +} +int32_t metaFilterTtl(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { + int32_t ret = 0; + char *buf = NULL; + + STtlIdxKey *pKey = NULL; + int32_t nKey = 0; + + SIdxCursor *pCursor = NULL; + pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor)); + pCursor->pMeta = pMeta; + pCursor->suid = param->suid; + pCursor->cid = param->cid; + pCursor->type = param->type; + + metaRLock(pMeta); + ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL); + +END: + if (pCursor->pMeta) metaULock(pCursor->pMeta); + if (pCursor->pCur) tdbTbcClose(pCursor->pCur); + taosMemoryFree(buf); + taosMemoryFree(pKey); + + taosMemoryFree(pCursor); + + return ret; + // impl later + return 0; +} int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { int32_t ret = 0; char *buf = NULL; @@ -1048,7 +1159,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { pCursor->type = param->type; metaRLock(pMeta); - ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL); + ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL); if (ret < 0) { goto END; } @@ -1059,7 +1170,8 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { if (param->val == NULL) { metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode)); - return -1; + ret = -1; + goto END; } else { if (IS_VAR_DATA_TYPE(param->type)) { tagData = varDataVal(param->val); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index bafb3eeb82..72cdfd2bed 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1201,11 +1201,10 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { #if 1 // todo refactor: add the parameter for tbname function const char* name = "tbname"; - int32_t len = strlen(name); + int32_t len = strlen(name); if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) && pExprNode->_function.functionName[len] == 0) { - pFuncNode->pParameterList = nodesMakeList(); ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0); SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8edc5d4c51..17935e91de 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2906,7 +2906,8 @@ static int32_t sysFilte__DbName(void* pMeta, SNode* pNode, SArray* result) { SValueNode* pVal = (SValueNode*)pOper->pRight; - __optSysFilter func = optSysGetFilterFunc(pOper->operType); + bool reverse = false; + __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); if (func == NULL) return -1; int ret = func(dbname, pVal->datum.p, TSDB_DATA_TYPE_VARCHAR); @@ -2923,7 +2924,9 @@ static int32_t sysFilte__VgroupId(void* pMeta, SNode* pNode, SArray* result) { SOperatorNode* pOper = (SOperatorNode*)pNode; SValueNode* pVal = (SValueNode*)pOper->pRight; - __optSysFilter func = optSysGetFilterFunc(pOper->operType); + bool reverse = false; + + __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); if (func == NULL) return -1; int ret = func(&vgId, &pVal->datum.i, TSDB_DATA_TYPE_BIGINT); @@ -2932,33 +2935,92 @@ static int32_t sysFilte__VgroupId(void* pMeta, SNode* pNode, SArray* result) { return -1; } static int32_t sysFilte__TableName(void* pMeta, SNode* pNode, SArray* result) { - // impl later - return 0; + void* pVnode = pMeta; + + SOperatorNode* pOper = (SOperatorNode*)pNode; + SValueNode* pVal = (SValueNode*)pOper->pRight; + bool reverse = false; + + __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); + if (func == NULL) return -1; + + SMetaFltParam param = {.suid = 0, + .cid = 0, + .type = TSDB_DATA_TYPE_VARCHAR, + .val = pVal->datum.p, + .reverse = reverse, + .filterFunc = func}; + int32_t ret = metaFilterTableName(pVnode, ¶m, result); + if (ret == 0) return 0; + + return -1; } static int32_t sysFilte__CreateTime(void* pMeta, SNode* pNode, SArray* result) { - // impl later + void* pVnode = pMeta; + + SOperatorNode* pOper = (SOperatorNode*)pNode; + SValueNode* pVal = (SValueNode*)pOper->pRight; + bool reverse = false; + + __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); + if (func == NULL) return -1; return 0; } static int32_t sysFilte__Ncolumn(void* pMeta, SNode* pNode, SArray* result) { - // impl later + void* pVnode = pMeta; + + SOperatorNode* pOper = (SOperatorNode*)pNode; + SValueNode* pVal = (SValueNode*)pOper->pRight; + bool reverse = false; + + __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); + if (func == NULL) return -1; return 0; } static int32_t sysFilte__Ttl(void* pMeta, SNode* pNode, SArray* result) { - // impl later + void* pVnode = pMeta; + + SOperatorNode* pOper = (SOperatorNode*)pNode; + SValueNode* pVal = (SValueNode*)pOper->pRight; + bool reverse = false; + + __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); + if (func == NULL) return -1; return 0; } static int32_t sysFilte__STableName(void* pMeta, SNode* pNode, SArray* result) { - // impl later + void* pVnode = pMeta; + + SOperatorNode* pOper = (SOperatorNode*)pNode; + SValueNode* pVal = (SValueNode*)pOper->pRight; + bool reverse = false; + + __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); + if (func == NULL) return -1; return 0; } static int32_t sysFilte__Uid(void* pMeta, SNode* pNode, SArray* result) { - // impl later + void* pVnode = pMeta; + + SOperatorNode* pOper = (SOperatorNode*)pNode; + SValueNode* pVal = (SValueNode*)pOper->pRight; + bool reverse = false; + + __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); + if (func == NULL) return -1; return 0; } static int32_t sysFilte__Type(void* pMeta, SNode* pNode, SArray* result) { - // impl later + void* pVnode = pMeta; + + SOperatorNode* pOper = (SOperatorNode*)pNode; + SValueNode* pVal = (SValueNode*)pOper->pRight; + bool reverse = false; + + __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); + if (func == NULL) return -1; return 0; } static int32_t sysChkFilter__DBName(SNode* pNode) { diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c82af0d0e9..4095985a5d 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -73,6 +73,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + if (pRpc->tcphandle == NULL) { taosMemoryFree(pRpc); return NULL;