add pre check
This commit is contained in:
parent
526f46dbfb
commit
12ec7d7193
|
@ -118,7 +118,11 @@ typedef struct SMetaFltParam {
|
|||
|
||||
} SMetaFltParam;
|
||||
|
||||
// TODO, refactor later
|
||||
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *results);
|
||||
int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *parm, SArray *pUids);
|
||||
int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids);
|
||||
int32_t metaFilterTtl(SMeta *pMeta, SMetaFltParam *param, SArray *pUids);
|
||||
|
||||
#if 1 // refact APIs below (TODO)
|
||||
typedef SVCreateTbReq STbCfg;
|
||||
|
|
|
@ -152,7 +152,7 @@ typedef struct {
|
|||
} SCtimeIdxKey;
|
||||
|
||||
typedef struct {
|
||||
int16_t ncol;
|
||||
int64_t ncol;
|
||||
tb_uid_t uid;
|
||||
} SNcolIdxKey;
|
||||
|
||||
|
|
|
@ -1033,6 +1033,117 @@ typedef struct {
|
|||
int32_t vLen;
|
||||
} SIdxCursor;
|
||||
|
||||
int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||
int32_t ret = 0;
|
||||
|
||||
SIdxCursor *pCursor = NULL;
|
||||
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
|
||||
pCursor->pMeta = pMeta;
|
||||
pCursor->suid = param->suid;
|
||||
pCursor->cid = param->cid;
|
||||
pCursor->type = param->type;
|
||||
|
||||
metaRLock(pMeta);
|
||||
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL);
|
||||
if (ret != 0) {
|
||||
goto END;
|
||||
}
|
||||
int64_t uidLimit = param->reverse ? INT64_MAX : 0;
|
||||
|
||||
SCtimeIdxKey ctimeKey = {.ctime = *(int64_t *)(param->val), .uid = uidLimit};
|
||||
SCtimeIdxKey *pCtimeKey = &ctimeKey;
|
||||
|
||||
int cmp = 0;
|
||||
if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) {
|
||||
goto END;
|
||||
}
|
||||
bool first = true;
|
||||
int32_t valid = 0;
|
||||
while (1) {
|
||||
void *entryKey = NULL;
|
||||
int32_t nEntryKey = -1;
|
||||
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL);
|
||||
if (valid < 0) break;
|
||||
|
||||
SCtimeIdxKey *p = entryKey;
|
||||
if (first) {
|
||||
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
||||
if (valid < 0) break;
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type);
|
||||
if (cmp == 0) taosArrayPush(pUids, &p->uid);
|
||||
if (cmp == -1) break;
|
||||
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
||||
if (valid < 0) break;
|
||||
}
|
||||
|
||||
END:
|
||||
if (pCursor->pMeta) metaULock(pCursor->pMeta);
|
||||
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
|
||||
|
||||
taosMemoryFree(pCursor);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||
int32_t ret = 0;
|
||||
char *buf = NULL;
|
||||
|
||||
STagIdxKey *pKey = NULL;
|
||||
int32_t nKey = 0;
|
||||
|
||||
SIdxCursor *pCursor = NULL;
|
||||
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
|
||||
pCursor->pMeta = pMeta;
|
||||
pCursor->suid = param->suid;
|
||||
pCursor->cid = param->cid;
|
||||
pCursor->type = param->type;
|
||||
|
||||
metaRLock(pMeta);
|
||||
ret = tdbTbcOpen(pMeta->pNameIdx, &pCursor->pCur, NULL);
|
||||
|
||||
END:
|
||||
if (pCursor->pMeta) metaULock(pCursor->pMeta);
|
||||
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(pKey);
|
||||
|
||||
taosMemoryFree(pCursor);
|
||||
|
||||
return ret;
|
||||
}
|
||||
int32_t metaFilterTtl(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||
int32_t ret = 0;
|
||||
char *buf = NULL;
|
||||
|
||||
STtlIdxKey *pKey = NULL;
|
||||
int32_t nKey = 0;
|
||||
|
||||
SIdxCursor *pCursor = NULL;
|
||||
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
|
||||
pCursor->pMeta = pMeta;
|
||||
pCursor->suid = param->suid;
|
||||
pCursor->cid = param->cid;
|
||||
pCursor->type = param->type;
|
||||
|
||||
metaRLock(pMeta);
|
||||
ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL);
|
||||
|
||||
END:
|
||||
if (pCursor->pMeta) metaULock(pCursor->pMeta);
|
||||
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(pKey);
|
||||
|
||||
taosMemoryFree(pCursor);
|
||||
|
||||
return ret;
|
||||
// impl later
|
||||
return 0;
|
||||
}
|
||||
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||
int32_t ret = 0;
|
||||
char *buf = NULL;
|
||||
|
@ -1048,7 +1159,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
|||
pCursor->type = param->type;
|
||||
|
||||
metaRLock(pMeta);
|
||||
ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
|
||||
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL);
|
||||
if (ret < 0) {
|
||||
goto END;
|
||||
}
|
||||
|
@ -1059,7 +1170,8 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
|||
|
||||
if (param->val == NULL) {
|
||||
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
|
||||
return -1;
|
||||
ret = -1;
|
||||
goto END;
|
||||
} else {
|
||||
if (IS_VAR_DATA_TYPE(param->type)) {
|
||||
tagData = varDataVal(param->val);
|
||||
|
|
|
@ -1201,11 +1201,10 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
|
|||
#if 1
|
||||
// todo refactor: add the parameter for tbname function
|
||||
const char* name = "tbname";
|
||||
int32_t len = strlen(name);
|
||||
int32_t len = strlen(name);
|
||||
|
||||
if (!pFuncNode->pParameterList && (memcmp(pExprNode->_function.functionName, name, len) == 0) &&
|
||||
pExprNode->_function.functionName[len] == 0) {
|
||||
|
||||
pFuncNode->pParameterList = nodesMakeList();
|
||||
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
|
|
|
@ -2906,7 +2906,8 @@ static int32_t sysFilte__DbName(void* pMeta, SNode* pNode, SArray* result) {
|
|||
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->operType);
|
||||
bool reverse = false;
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||
if (func == NULL) return -1;
|
||||
|
||||
int ret = func(dbname, pVal->datum.p, TSDB_DATA_TYPE_VARCHAR);
|
||||
|
@ -2923,7 +2924,9 @@ static int32_t sysFilte__VgroupId(void* pMeta, SNode* pNode, SArray* result) {
|
|||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->operType);
|
||||
bool reverse = false;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||
if (func == NULL) return -1;
|
||||
|
||||
int ret = func(&vgId, &pVal->datum.i, TSDB_DATA_TYPE_BIGINT);
|
||||
|
@ -2932,33 +2935,92 @@ static int32_t sysFilte__VgroupId(void* pMeta, SNode* pNode, SArray* result) {
|
|||
return -1;
|
||||
}
|
||||
static int32_t sysFilte__TableName(void* pMeta, SNode* pNode, SArray* result) {
|
||||
// impl later
|
||||
return 0;
|
||||
void* pVnode = pMeta;
|
||||
|
||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
bool reverse = false;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||
if (func == NULL) return -1;
|
||||
|
||||
SMetaFltParam param = {.suid = 0,
|
||||
.cid = 0,
|
||||
.type = TSDB_DATA_TYPE_VARCHAR,
|
||||
.val = pVal->datum.p,
|
||||
.reverse = reverse,
|
||||
.filterFunc = func};
|
||||
int32_t ret = metaFilterTableName(pVnode, ¶m, result);
|
||||
if (ret == 0) return 0;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t sysFilte__CreateTime(void* pMeta, SNode* pNode, SArray* result) {
|
||||
// impl later
|
||||
void* pVnode = pMeta;
|
||||
|
||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
bool reverse = false;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||
if (func == NULL) return -1;
|
||||
return 0;
|
||||
}
|
||||
static int32_t sysFilte__Ncolumn(void* pMeta, SNode* pNode, SArray* result) {
|
||||
// impl later
|
||||
void* pVnode = pMeta;
|
||||
|
||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
bool reverse = false;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||
if (func == NULL) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sysFilte__Ttl(void* pMeta, SNode* pNode, SArray* result) {
|
||||
// impl later
|
||||
void* pVnode = pMeta;
|
||||
|
||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
bool reverse = false;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||
if (func == NULL) return -1;
|
||||
return 0;
|
||||
}
|
||||
static int32_t sysFilte__STableName(void* pMeta, SNode* pNode, SArray* result) {
|
||||
// impl later
|
||||
void* pVnode = pMeta;
|
||||
|
||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
bool reverse = false;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||
if (func == NULL) return -1;
|
||||
return 0;
|
||||
}
|
||||
static int32_t sysFilte__Uid(void* pMeta, SNode* pNode, SArray* result) {
|
||||
// impl later
|
||||
void* pVnode = pMeta;
|
||||
|
||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
bool reverse = false;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||
if (func == NULL) return -1;
|
||||
return 0;
|
||||
}
|
||||
static int32_t sysFilte__Type(void* pMeta, SNode* pNode, SArray* result) {
|
||||
// impl later
|
||||
void* pVnode = pMeta;
|
||||
|
||||
SOperatorNode* pOper = (SOperatorNode*)pNode;
|
||||
SValueNode* pVal = (SValueNode*)pOper->pRight;
|
||||
bool reverse = false;
|
||||
|
||||
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
|
||||
if (func == NULL) return -1;
|
||||
return 0;
|
||||
}
|
||||
static int32_t sysChkFilter__DBName(SNode* pNode) {
|
||||
|
|
|
@ -73,6 +73,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
pRpc->idleTime = pInit->idleTime;
|
||||
pRpc->tcphandle =
|
||||
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
|
||||
|
||||
if (pRpc->tcphandle == NULL) {
|
||||
taosMemoryFree(pRpc);
|
||||
return NULL;
|
||||
|
|
Loading…
Reference in New Issue