Merge branch '3.0' into cpwu/3.0
This commit is contained in:
commit
3fbccf59ce
|
@ -1431,7 +1431,6 @@ typedef struct {
|
||||||
} SRSmaParam;
|
} SRSmaParam;
|
||||||
|
|
||||||
typedef struct SVCreateTbReq {
|
typedef struct SVCreateTbReq {
|
||||||
int64_t ver; // use a general definition
|
|
||||||
char* name;
|
char* name;
|
||||||
uint32_t ttl;
|
uint32_t ttl;
|
||||||
uint32_t keep;
|
uint32_t keep;
|
||||||
|
|
|
@ -199,7 +199,7 @@ typedef enum EOperatorType {
|
||||||
} EOperatorType;
|
} EOperatorType;
|
||||||
|
|
||||||
typedef enum ELogicConditionType {
|
typedef enum ELogicConditionType {
|
||||||
LOGIC_COND_TYPE_AND,
|
LOGIC_COND_TYPE_AND = 1,
|
||||||
LOGIC_COND_TYPE_OR,
|
LOGIC_COND_TYPE_OR,
|
||||||
LOGIC_COND_TYPE_NOT,
|
LOGIC_COND_TYPE_NOT,
|
||||||
} ELogicConditionType;
|
} ELogicConditionType;
|
||||||
|
|
|
@ -397,7 +397,6 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
|
||||||
int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->ver);
|
|
||||||
tlen += taosEncodeString(buf, pReq->name);
|
tlen += taosEncodeString(buf, pReq->name);
|
||||||
tlen += taosEncodeFixedU32(buf, pReq->ttl);
|
tlen += taosEncodeFixedU32(buf, pReq->ttl);
|
||||||
tlen += taosEncodeFixedU32(buf, pReq->keep);
|
tlen += taosEncodeFixedU32(buf, pReq->keep);
|
||||||
|
@ -465,7 +464,6 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
|
||||||
buf = taosDecodeFixedI64(buf, &(pReq->ver));
|
|
||||||
buf = taosDecodeString(buf, &(pReq->name));
|
buf = taosDecodeString(buf, &(pReq->name));
|
||||||
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
|
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
|
||||||
buf = taosDecodeFixedU32(buf, &(pReq->keep));
|
buf = taosDecodeFixedU32(buf, &(pReq->keep));
|
||||||
|
|
|
@ -97,6 +97,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||||
|
|
||||||
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnodeObj *pVnode = pInfo->ahandle;
|
SVnodeObj *pVnode = pInfo->ahandle;
|
||||||
|
int64_t version;
|
||||||
|
|
||||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
||||||
if (pArray == NULL) {
|
if (pArray == NULL) {
|
||||||
|
@ -115,23 +116,32 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodePreprocessWriteReqs(pVnode->pImpl, pArray);
|
vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version);
|
||||||
|
|
||||||
numOfMsgs = taosArrayGetSize(pArray);
|
numOfMsgs = taosArrayGetSize(pArray);
|
||||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg rsp;
|
||||||
|
|
||||||
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, &pRsp);
|
rsp.pCont = NULL;
|
||||||
|
rsp.contLen = 0;
|
||||||
|
rsp.code = 0;
|
||||||
|
rsp.handle = pRpc->handle;
|
||||||
|
rsp.ahandle = pRpc->ahandle;
|
||||||
|
|
||||||
|
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
pRsp->ahandle = pRpc->ahandle;
|
pRsp->ahandle = pRpc->ahandle;
|
||||||
tmsgSendRsp(pRsp);
|
|
||||||
taosMemoryFree(pRsp);
|
taosMemoryFree(pRsp);
|
||||||
} else {
|
} else {
|
||||||
if (code != 0 && terrno != 0) code = terrno;
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
|
@ -153,7 +163,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
|
|
||||||
// todo
|
// todo
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
(void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
// (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -348,7 +348,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
tNameGetFullDbName(&name, dbFName);
|
tNameGetFullDbName(&name, dbFName);
|
||||||
|
|
||||||
SVCreateTbReq req = {0};
|
SVCreateTbReq req = {0};
|
||||||
req.ver = 0;
|
|
||||||
req.name = (char *)tNameGetTableName(&name);
|
req.name = (char *)tNameGetTableName(&name);
|
||||||
req.ttl = 0;
|
req.ttl = 0;
|
||||||
req.keep = 0;
|
req.keep = 0;
|
||||||
|
|
|
@ -13,7 +13,6 @@ target_sources(
|
||||||
"src/vnd/vnodeInt.c"
|
"src/vnd/vnodeInt.c"
|
||||||
"src/vnd/vnodeQuery.c"
|
"src/vnd/vnodeQuery.c"
|
||||||
"src/vnd/vnodeStateMgr.c"
|
"src/vnd/vnodeStateMgr.c"
|
||||||
"src/vnd/vnodeWrite.c"
|
|
||||||
"src/vnd/vnodeModule.c"
|
"src/vnd/vnodeModule.c"
|
||||||
"src/vnd/vnodeSvr.c"
|
"src/vnd/vnodeSvr.c"
|
||||||
|
|
||||||
|
|
|
@ -48,8 +48,8 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs);
|
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||||
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -16,34 +16,32 @@
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
|
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
|
||||||
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp);
|
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp);
|
||||||
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
|
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
|
||||||
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
|
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
|
||||||
|
|
||||||
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) {
|
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
|
||||||
SNodeMsg *pMsg;
|
SNodeMsg *pMsg;
|
||||||
SRpcMsg *pRpc;
|
SRpcMsg *pRpc;
|
||||||
|
|
||||||
|
*version = pVnode->state.processed;
|
||||||
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
||||||
pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
|
pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
|
||||||
pRpc = &pMsg->rpcMsg;
|
pRpc = &pMsg->rpcMsg;
|
||||||
|
|
||||||
// set request version
|
// set request version
|
||||||
void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead));
|
if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
|
||||||
int64_t ver = pVnode->state.processed++;
|
|
||||||
taosEncodeFixedI64(&pBuf, ver);
|
|
||||||
|
|
||||||
if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
/*ASSERT(false);*/
|
|
||||||
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
|
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
walFsync(pVnode->pWal, false);
|
walFsync(pVnode->pWal, false);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
|
||||||
void *ptr = NULL;
|
void *ptr = NULL;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
|
@ -58,33 +56,29 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: change the interface here
|
// todo: change the interface here
|
||||||
int64_t ver;
|
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||||
taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
|
|
||||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
|
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_CREATE_STB:
|
case TDMT_VND_CREATE_STB:
|
||||||
ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
||||||
return 0;
|
break;
|
||||||
case TDMT_VND_CREATE_TABLE:
|
case TDMT_VND_CREATE_TABLE:
|
||||||
return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp);
|
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
||||||
|
vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp);
|
||||||
|
break;
|
||||||
case TDMT_VND_ALTER_STB:
|
case TDMT_VND_ALTER_STB:
|
||||||
return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
||||||
|
break;
|
||||||
case TDMT_VND_DROP_STB:
|
case TDMT_VND_DROP_STB:
|
||||||
vTrace("vgId:%d, process drop stb req", TD_VID(pVnode));
|
vTrace("vgId:%d, process drop stb req", TD_VID(pVnode));
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_DROP_TABLE:
|
case TDMT_VND_DROP_TABLE:
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_SUBMIT:
|
case TDMT_VND_SUBMIT:
|
||||||
/*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/
|
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
|
||||||
if (pVnode->config.streamMode == 0) {
|
vnodeProcessSubmitReq(pVnode, ptr, pRsp);
|
||||||
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
|
||||||
(*pRsp)->handle = pMsg->handle;
|
|
||||||
(*pRsp)->ahandle = pMsg->ahandle;
|
|
||||||
return vnodeProcessSubmitReq(pVnode, ptr, *pRsp);
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_MQ_SET_CONN: {
|
case TDMT_VND_MQ_SET_CONN: {
|
||||||
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||||
|
@ -128,7 +122,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->state.applied = ver;
|
pVnode->state.applied = version;
|
||||||
|
|
||||||
// Check if it needs to commit
|
// Check if it needs to commit
|
||||||
if (vnodeShouldCommit(pVnode)) {
|
if (vnodeShouldCommit(pVnode)) {
|
||||||
|
@ -222,7 +216,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp) {
|
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) {
|
||||||
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
||||||
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
|
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
|
||||||
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
|
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
|
||||||
|
@ -274,12 +268,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
||||||
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
|
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
|
||||||
taosArrayDestroy(vCreateTbBatchRsp.rspList);
|
taosArrayDestroy(vCreateTbBatchRsp.rspList);
|
||||||
|
|
||||||
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
pRsp->pCont = msg;
|
||||||
(*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
pRsp->contLen = contLen;
|
||||||
(*pRsp)->pCont = msg;
|
|
||||||
(*pRsp)->contLen = contLen;
|
|
||||||
(*pRsp)->handle = pMsg->handle;
|
|
||||||
(*pRsp)->ahandle = pMsg->ahandle;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -312,7 +302,6 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
// encode the response (TODO)
|
// encode the response (TODO)
|
||||||
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
|
|
||||||
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
|
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
|
||||||
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
|
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
|
||||||
pRsp->contLen = sizeof(SSubmitRsp);
|
pRsp->contLen = sizeof(SSubmitRsp);
|
||||||
|
|
|
@ -1,18 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
|
||||||
|
|
||||||
/* ------------------------ STATIC METHODS ------------------------ */
|
|
|
@ -646,7 +646,7 @@ predicate(A) ::= expression(B) BETWEEN expression(C) AND expression(D).
|
||||||
predicate(A) ::= expression(B) NOT BETWEEN expression(C) AND expression(D). {
|
predicate(A) ::= expression(B) NOT BETWEEN expression(C) AND expression(D). {
|
||||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||||
SToken e = getTokenFromRawExprNode(pCxt, D);
|
SToken e = getTokenFromRawExprNode(pCxt, D);
|
||||||
A = createRawExprNodeExt(pCxt, &s, &e, createNotBetweenAnd(pCxt, releaseRawExprNode(pCxt, C), releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, D)));
|
A = createRawExprNodeExt(pCxt, &s, &e, createNotBetweenAnd(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C), releaseRawExprNode(pCxt, D)));
|
||||||
}
|
}
|
||||||
predicate(A) ::= expression(B) IS NULL(C). {
|
predicate(A) ::= expression(B) IS NULL(C). {
|
||||||
SToken s = getTokenFromRawExprNode(pCxt, B);
|
SToken s = getTokenFromRawExprNode(pCxt, B);
|
||||||
|
|
|
@ -251,6 +251,9 @@ static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SCol
|
||||||
pCol->colType = pProjCol->colType;
|
pCol->colType = pProjCol->colType;
|
||||||
}
|
}
|
||||||
strcpy(pCol->colName, pExpr->aliasName);
|
strcpy(pCol->colName, pExpr->aliasName);
|
||||||
|
if ('\0' == pCol->node.aliasName[0]) {
|
||||||
|
strcpy(pCol->node.aliasName, pCol->colName);
|
||||||
|
}
|
||||||
pCol->node.resType = pExpr->resType;
|
pCol->node.resType = pExpr->resType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,23 +384,7 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
|
||||||
}
|
}
|
||||||
res = (found ? DEAL_RES_CONTINUE : translateColumnWithoutPrefix(pCxt, pCol));
|
res = (found ? DEAL_RES_CONTINUE : translateColumnWithoutPrefix(pCxt, pCol));
|
||||||
}
|
}
|
||||||
|
return res;
|
||||||
if (DEAL_RES_ERROR == res) {
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (SQL_CLAUSE_WINDOW == pCxt->currClause && QUERY_NODE_STATE_WINDOW == nodeType(pCxt->pCurrStmt->pWindow)) {
|
|
||||||
if (!IS_INTEGER_TYPE(pCol->node.resType.type)) {
|
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE);
|
|
||||||
}
|
|
||||||
if (COLUMN_TYPE_TAG == pCol->colType) {
|
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_COL);
|
|
||||||
}
|
|
||||||
if (TSDB_SUPER_TABLE == pCol->tableType) {
|
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return DEAL_RES_CONTINUE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
||||||
|
@ -1200,9 +1187,27 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
STranslateContext* pCxt = pContext;
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
if (!IS_INTEGER_TYPE(pCol->node.resType.type)) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE);
|
||||||
|
}
|
||||||
|
if (COLUMN_TYPE_TAG == pCol->colType) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_COL);
|
||||||
|
}
|
||||||
|
if (TSDB_SUPER_TABLE == pCol->tableType) {
|
||||||
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t checkStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
|
static int32_t checkStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
|
||||||
|
nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt);
|
||||||
// todo check for "function not support for state_window"
|
// todo check for "function not support for state_window"
|
||||||
return TSDB_CODE_SUCCESS;
|
return pCxt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) {
|
static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) {
|
||||||
|
|
|
@ -3730,7 +3730,7 @@ static YYACTIONTYPE yy_reduce(
|
||||||
{
|
{
|
||||||
SToken s = getTokenFromRawExprNode(pCxt, yymsp[-5].minor.yy456);
|
SToken s = getTokenFromRawExprNode(pCxt, yymsp[-5].minor.yy456);
|
||||||
SToken e = getTokenFromRawExprNode(pCxt, yymsp[0].minor.yy456);
|
SToken e = getTokenFromRawExprNode(pCxt, yymsp[0].minor.yy456);
|
||||||
yylhsminor.yy456 = createRawExprNodeExt(pCxt, &s, &e, createNotBetweenAnd(pCxt, releaseRawExprNode(pCxt, yymsp[-2].minor.yy456), releaseRawExprNode(pCxt, yymsp[-5].minor.yy456), releaseRawExprNode(pCxt, yymsp[0].minor.yy456)));
|
yylhsminor.yy456 = createRawExprNodeExt(pCxt, &s, &e, createNotBetweenAnd(pCxt, releaseRawExprNode(pCxt, yymsp[-5].minor.yy456), releaseRawExprNode(pCxt, yymsp[-2].minor.yy456), releaseRawExprNode(pCxt, yymsp[0].minor.yy456)));
|
||||||
}
|
}
|
||||||
yymsp[-5].minor.yy456 = yylhsminor.yy456;
|
yymsp[-5].minor.yy456 = yylhsminor.yy456;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -77,6 +77,10 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) {
|
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
// todo: release after function splitting
|
||||||
|
if (TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (NULL == pNode->pParent ||
|
if (NULL == pNode->pParent ||
|
||||||
(QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent))) {
|
(QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent))) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -32,6 +32,8 @@ using namespace testing;
|
||||||
} \
|
} \
|
||||||
} while(0);
|
} while(0);
|
||||||
|
|
||||||
|
bool g_isDump = false;
|
||||||
|
|
||||||
class PlannerTestBaseImpl {
|
class PlannerTestBaseImpl {
|
||||||
public:
|
public:
|
||||||
void useDb(const string& acctId, const string& db) {
|
void useDb(const string& acctId, const string& db) {
|
||||||
|
|
|
@ -32,4 +32,6 @@ private:
|
||||||
std::unique_ptr<PlannerTestBaseImpl> impl_;
|
std::unique_ptr<PlannerTestBaseImpl> impl_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
extern bool g_isDump;
|
||||||
|
|
||||||
#endif // PLAN_TEST_UTIL_H
|
#endif // PLAN_TEST_UTIL_H
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
#include "mockCatalog.h"
|
#include "mockCatalog.h"
|
||||||
|
#include "planTestUtil.h"
|
||||||
|
|
||||||
class PlannerEnv : public testing::Environment {
|
class PlannerEnv : public testing::Environment {
|
||||||
public:
|
public:
|
||||||
|
@ -34,8 +35,27 @@ public:
|
||||||
virtual ~PlannerEnv() {}
|
virtual ~PlannerEnv() {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static void parseArg(int argc, char* argv[]) {
|
||||||
|
int opt = 0;
|
||||||
|
const char *optstring = "";
|
||||||
|
static struct option long_options[] = {
|
||||||
|
{"dump", no_argument, NULL, 'd'},
|
||||||
|
{0, 0, 0, 0}
|
||||||
|
};
|
||||||
|
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
|
||||||
|
switch (opt) {
|
||||||
|
case 'd':
|
||||||
|
g_isDump = true;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char* argv[]) {
|
int main(int argc, char* argv[]) {
|
||||||
testing::AddGlobalTestEnvironment(new PlannerEnv());
|
testing::AddGlobalTestEnvironment(new PlannerEnv());
|
||||||
testing::InitGoogleTest(&argc, argv);
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
parseArg(argc, argv);
|
||||||
return RUN_ALL_TESTS();
|
return RUN_ALL_TESTS();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue