merge from 3.0
This commit is contained in:
commit
f2ee70efb1
|
@ -60,9 +60,9 @@ typedef struct {
|
|||
ReportStartup reportStartupFp;
|
||||
} SMsgCb;
|
||||
|
||||
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
|
||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg);
|
||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
||||
void tmsgSetDefault(const SMsgCb* msgcb);
|
||||
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg);
|
||||
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype);
|
||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
|
||||
void tmsgSendRsp(SRpcMsg* pMsg);
|
||||
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
|
||||
|
|
|
@ -179,6 +179,8 @@ typedef struct {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
//TODO: use varchar(0) to represent NULL type
|
||||
#define IS_NULL_TYPE(_t) ((_t) == TSDB_DATA_TYPE_NULL)
|
||||
#define IS_SIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_TINYINT && (_t) <= TSDB_DATA_TYPE_BIGINT)
|
||||
#define IS_UNSIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_UTINYINT && (_t) <= TSDB_DATA_TYPE_UBIGINT)
|
||||
#define IS_FLOAT_TYPE(_t) ((_t) == TSDB_DATA_TYPE_FLOAT || (_t) == TSDB_DATA_TYPE_DOUBLE)
|
||||
|
|
|
@ -208,6 +208,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_SORT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_FILL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
|
||||
|
|
|
@ -248,6 +248,7 @@ typedef struct SSetOperator {
|
|||
SNode* pRight;
|
||||
SNodeList* pOrderByList; // SOrderByExprNode
|
||||
SNode* pLimit;
|
||||
char stmtName[TSDB_TABLE_NAME_LEN];
|
||||
} SSetOperator;
|
||||
|
||||
typedef enum ESqlClause {
|
||||
|
|
|
@ -646,6 +646,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_INVALID_ALTER_TABLE TAOS_DEF_ERROR_CODE(0, 0x2649)
|
||||
#define TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY TAOS_DEF_ERROR_CODE(0, 0x264A)
|
||||
#define TSDB_CODE_PAR_INVALID_MODIFY_COL TAOS_DEF_ERROR_CODE(0, 0x264B)
|
||||
#define TSDB_CODE_PAR_INVALID_TBNAME TAOS_DEF_ERROR_CODE(0, 0x264C)
|
||||
|
||||
//planner
|
||||
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
|
||||
|
|
|
@ -17,46 +17,46 @@
|
|||
#include "tmsgcb.h"
|
||||
#include "taoserror.h"
|
||||
|
||||
static SMsgCb tsDefaultMsgCb;
|
||||
static SMsgCb defaultMsgCb;
|
||||
|
||||
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
|
||||
void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; }
|
||||
|
||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) {
|
||||
PutToQueueFp fp = pMsgCb->queueFps[qtype];
|
||||
return (*fp)(pMsgCb->mgmt, pMsg);
|
||||
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
|
||||
PutToQueueFp fp = msgcb->queueFps[qtype];
|
||||
return (*fp)(msgcb->mgmt, pMsg);
|
||||
}
|
||||
|
||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
|
||||
GetQueueSizeFp fp = pMsgCb->qsizeFp;
|
||||
return (*fp)(pMsgCb->mgmt, vgId, qtype);
|
||||
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) {
|
||||
GetQueueSizeFp fp = msgcb->qsizeFp;
|
||||
return (*fp)(msgcb->mgmt, vgId, qtype);
|
||||
}
|
||||
|
||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
||||
SendReqFp fp = tsDefaultMsgCb.sendReqFp;
|
||||
SendReqFp fp = defaultMsgCb.sendReqFp;
|
||||
return (*fp)(epSet, pMsg);
|
||||
}
|
||||
|
||||
void tmsgSendRsp(SRpcMsg* pMsg) {
|
||||
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
|
||||
SendRspFp fp = defaultMsgCb.sendRspFp;
|
||||
return (*fp)(pMsg);
|
||||
}
|
||||
|
||||
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
|
||||
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
|
||||
SendRedirectRspFp fp = defaultMsgCb.sendRedirectRspFp;
|
||||
(*fp)(pMsg, pNewEpSet);
|
||||
}
|
||||
|
||||
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) {
|
||||
RegisterBrokenLinkArgFp fp = tsDefaultMsgCb.registerBrokenLinkArgFp;
|
||||
RegisterBrokenLinkArgFp fp = defaultMsgCb.registerBrokenLinkArgFp;
|
||||
(*fp)(pMsg);
|
||||
}
|
||||
|
||||
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) {
|
||||
ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp;
|
||||
ReleaseHandleFp fp = defaultMsgCb.releaseHandleFp;
|
||||
(*fp)(pHandle, type);
|
||||
}
|
||||
|
||||
void tmsgReportStartup(const char* name, const char* desc) {
|
||||
ReportStartup fp = tsDefaultMsgCb.reportStartupFp;
|
||||
ReportStartup fp = defaultMsgCb.reportStartupFp;
|
||||
(*fp)(name, desc);
|
||||
}
|
|
@ -275,7 +275,7 @@ static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus)
|
|||
}
|
||||
|
||||
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
dDebug("start to process net test req");
|
||||
dDebug("msg:%p, net test req will be processed", pMsg);
|
||||
SRpcMsg rsp = {.code = 0, .info = pMsg->info};
|
||||
rsp.pCont = rpcMallocCont(pMsg->contLen);
|
||||
if (rsp.pCont == NULL) {
|
||||
|
@ -287,8 +287,7 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
dDebug("start to process server startup status req");
|
||||
|
||||
dDebug("msg:%p, server startup status req will be processed", pMsg);
|
||||
SServerStatusRsp statusRsp = {0};
|
||||
dmGetServerStartupStatus(pDnode, &statusRsp);
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
|||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||
|
||||
if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) {
|
||||
tmsgSetDefaultMsgCb(&input.msgCb);
|
||||
tmsgSetDefault(&input.msgCb);
|
||||
}
|
||||
|
||||
if (OnlyInSingleProc(pWrapper)) {
|
||||
|
|
|
@ -344,66 +344,6 @@ void dmCleanupClient(SDnode *pDnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static inline int32_t dmGetHideUserAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
|
||||
int32_t code = 0;
|
||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||
|
||||
if (strcmp(user, INTERNAL_USER) == 0) {
|
||||
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
|
||||
} else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
|
||||
taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
|
||||
} else {
|
||||
code = -1;
|
||||
}
|
||||
|
||||
if (code == 0) {
|
||||
memcpy(secret, pass, TSDB_PASSWORD_LEN);
|
||||
*spi = 1;
|
||||
*encrypt = 0;
|
||||
*ckey = 0;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
|
||||
char *ckey) {
|
||||
if (dmGetHideUserAuth(user, spi, encrypt, secret, ckey) == 0) {
|
||||
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SAuthReq authReq = {0};
|
||||
tstrncpy(authReq.user, user, TSDB_USER_LEN);
|
||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
|
||||
void *pReq = rpcMallocCont(contLen);
|
||||
tSerializeSAuthReq(pReq, contLen, &authReq);
|
||||
|
||||
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .info.ahandle = (void *)9528};
|
||||
SRpcMsg rpcRsp = {0};
|
||||
SEpSet epSet = {0};
|
||||
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
|
||||
dmGetMnodeEpSet(&pDnode->data, &epSet);
|
||||
dmSendRecv(&epSet, &rpcMsg, &rpcRsp);
|
||||
|
||||
if (rpcRsp.code != 0) {
|
||||
terrno = rpcRsp.code;
|
||||
dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
|
||||
} else {
|
||||
SAuthRsp authRsp = {0};
|
||||
tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
|
||||
memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
|
||||
memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
|
||||
*spi = authRsp.spi;
|
||||
*encrypt = authRsp.encrypt;
|
||||
dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
|
||||
authRsp.encrypt);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcRsp.pCont);
|
||||
return rpcRsp.code;
|
||||
}
|
||||
|
||||
int32_t dmInitServer(SDnode *pDnode) {
|
||||
SDnodeTrans *pTrans = &pDnode->trans;
|
||||
|
||||
|
@ -416,7 +356,6 @@ int32_t dmInitServer(SDnode *pDnode) {
|
|||
rpcInit.sessions = tsMaxShellConns;
|
||||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.afp = (RpcAfp)dmRetrieveUserAuthInfo;
|
||||
rpcInit.parent = pDnode;
|
||||
|
||||
pTrans->serverRpc = rpcOpen(&rpcInit);
|
||||
|
|
|
@ -197,8 +197,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
goto CONN_OVER;
|
||||
}
|
||||
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
|
||||
mError("user:%s, failed to auth while acquire user, input:%s saved:%s", pReq->conn.user, connReq.passwd,
|
||||
pUser->pass);
|
||||
mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
|
||||
code = TSDB_CODE_RPC_AUTH_FAILURE;
|
||||
goto CONN_OVER;
|
||||
}
|
||||
|
|
|
@ -343,19 +343,20 @@ void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); }
|
|||
int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
void *ahandle = pMsg->info.ahandle;
|
||||
|
||||
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
|
||||
|
||||
if (IsReq(pMsg) && !mndIsMaster(pMnode)) {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||
return -1;
|
||||
}
|
||||
if (IsReq(pMsg)) {
|
||||
if (!mndIsMaster(pMnode)) {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (IsReq(pMsg) && (pMsg->contLen == 0 || pMsg->pCont == NULL)) {
|
||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||
return -1;
|
||||
if (pMsg->contLen == 0 || pMsg->pCont == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
||||
|
|
|
@ -56,7 +56,7 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
msgCb.sendReqFp = sendReq;
|
||||
msgCb.sendRspFp = sendRsp;
|
||||
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
||||
tmsgSetDefaultMsgCb(&msgCb);
|
||||
tmsgSetDefault(&msgCb);
|
||||
|
||||
SMnodeOpt opt = {0};
|
||||
opt.deploy = 1;
|
||||
|
|
|
@ -442,7 +442,8 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
}
|
||||
|
||||
entry.version = version;
|
||||
int tlen;
|
||||
int tlen;
|
||||
SSchema *pNewSchema = NULL;
|
||||
switch (pAlterTbReq->action) {
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||
if (pColumn) {
|
||||
|
@ -451,8 +452,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
}
|
||||
pSchema->sver++;
|
||||
pSchema->nCols++;
|
||||
pSchema->pSchema =
|
||||
taosMemoryRealloc(entry.ntbEntry.schema.pSchema, sizeof(SSchema) * entry.ntbEntry.schema.nCols);
|
||||
pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
|
||||
memcpy(pNewSchema, pSchema->pSchema, sizeof(SSchema) * (pSchema->nCols - 1));
|
||||
pSchema->pSchema = pNewSchema;
|
||||
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes;
|
||||
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type;
|
||||
pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags;
|
||||
|
@ -511,6 +513,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
|
||||
metaULock(pMeta);
|
||||
|
||||
if (pNewSchema) taosMemoryFree(pNewSchema);
|
||||
tDecoderClear(&dc);
|
||||
tdbTbcClose(pTbDbc);
|
||||
tdbTbcClose(pUidIdxc);
|
||||
|
|
|
@ -378,6 +378,13 @@ typedef enum EStreamScanMode {
|
|||
STREAM_SCAN_FROM_DATAREADER,
|
||||
} EStreamScanMode;
|
||||
|
||||
typedef struct SCatchSupporter {
|
||||
SHashObj* pWindowHashTable; // quick locate the window object for each window
|
||||
SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file
|
||||
int32_t keySize;
|
||||
int64_t* pKeyBuf;
|
||||
} SCatchSupporter;
|
||||
|
||||
typedef struct SStreamBlockScanInfo {
|
||||
SArray* pBlockLists; // multiple SSDatablock.
|
||||
SSDataBlock* pRes; // result SSDataBlock
|
||||
|
@ -400,6 +407,8 @@ typedef struct SStreamBlockScanInfo {
|
|||
EStreamScanMode scanMode;
|
||||
SOperatorInfo* pOperatorDumy;
|
||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
||||
SCatchSupporter childAggSup;
|
||||
SArray* childIds;
|
||||
} SStreamBlockScanInfo;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
|
@ -460,6 +469,16 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
bool invertible;
|
||||
} SIntervalAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamFinalIntervalOperatorInfo {
|
||||
SOptrBasicInfo binfo; // basic info
|
||||
SGroupResInfo groupResInfo; // multiple results build supporter
|
||||
SInterval interval; // interval info
|
||||
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
|
||||
SAggSupporter aggSup; // aggregate supporter
|
||||
int32_t order; // current SSDataBlock scan order
|
||||
STimeWindowAggSupp twAggSup;
|
||||
} SStreamFinalIntervalOperatorInfo;
|
||||
|
||||
typedef struct SAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SAggSupporter aggSup;
|
||||
|
@ -696,6 +715,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
|
|||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
|
@ -771,9 +793,8 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
|
|||
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
|
||||
int32_t order);
|
||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
|
||||
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes,
|
||||
uint64_t groupId, int32_t numOfOutput);
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize,
|
||||
const char* pKey, const char* pDir);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -15,10 +15,10 @@
|
|||
|
||||
#include <vnode.h>
|
||||
#include "dataSinkMgt.h"
|
||||
#include "texception.h"
|
||||
#include "os.h"
|
||||
#include "tarray.h"
|
||||
#include "tcache.h"
|
||||
#include "texception.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
#include "tudf.h"
|
||||
|
@ -32,15 +32,15 @@
|
|||
|
||||
typedef struct STaskMgmt {
|
||||
TdThreadMutex lock;
|
||||
SCacheObj *qinfoPool; // query handle pool
|
||||
int32_t vgId;
|
||||
bool closed;
|
||||
SCacheObj *qinfoPool; // query handle pool
|
||||
int32_t vgId;
|
||||
bool closed;
|
||||
} STaskMgmt;
|
||||
|
||||
int32_t qCreateExecTask(SReadHandle *readHandle, int32_t vgId, uint64_t taskId, SSubplan *pSubplan,
|
||||
qTaskInfo_t *pTaskInfo, DataSinkHandle *handle, EOPTR_EXEC_MODEL model) {
|
||||
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
|
||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
|
||||
assert(readHandle != NULL && pSubplan != NULL);
|
||||
SExecTaskInfo **pTask = (SExecTaskInfo **)pTaskInfo;
|
||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||
|
||||
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -56,46 +56,46 @@ int32_t qCreateExecTask(SReadHandle *readHandle, int32_t vgId, uint64_t taskId,
|
|||
code = dsCreateDataSinker(pSubplan->pDataSink, handle);
|
||||
}
|
||||
|
||||
_error:
|
||||
_error:
|
||||
// if failed to add ref for all tables in this query, abort current query
|
||||
return code;
|
||||
}
|
||||
|
||||
#ifdef TEST_IMPL
|
||||
// wait moment
|
||||
int waitMoment(SQInfo *pQInfo) {
|
||||
if (pQInfo->sql) {
|
||||
int ms = 0;
|
||||
char *pcnt = strstr(pQInfo->sql, " count(*)");
|
||||
if (pcnt) return 0;
|
||||
int waitMoment(SQInfo* pQInfo){
|
||||
if(pQInfo->sql) {
|
||||
int ms = 0;
|
||||
char* pcnt = strstr(pQInfo->sql, " count(*)");
|
||||
if(pcnt) return 0;
|
||||
|
||||
char *pos = strstr(pQInfo->sql, " t_");
|
||||
if (pos) {
|
||||
char* pos = strstr(pQInfo->sql, " t_");
|
||||
if(pos){
|
||||
pos += 3;
|
||||
ms = atoi(pos);
|
||||
while (*pos >= '0' && *pos <= '9') {
|
||||
pos++;
|
||||
while(*pos >= '0' && *pos <= '9'){
|
||||
pos ++;
|
||||
}
|
||||
char unit_char = *pos;
|
||||
if (unit_char == 'h') {
|
||||
ms *= 3600 * 1000;
|
||||
} else if (unit_char == 'm') {
|
||||
ms *= 60 * 1000;
|
||||
} else if (unit_char == 's') {
|
||||
if(unit_char == 'h'){
|
||||
ms *= 3600*1000;
|
||||
} else if(unit_char == 'm'){
|
||||
ms *= 60*1000;
|
||||
} else if(unit_char == 's'){
|
||||
ms *= 1000;
|
||||
}
|
||||
}
|
||||
if (ms == 0) return 0;
|
||||
if(ms == 0) return 0;
|
||||
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
|
||||
|
||||
if (ms < 1000) {
|
||||
if(ms < 1000) {
|
||||
taosMsleep(ms);
|
||||
} else {
|
||||
int used_ms = 0;
|
||||
while (used_ms < ms) {
|
||||
while(used_ms < ms) {
|
||||
taosMsleep(1000);
|
||||
used_ms += 1000;
|
||||
if (isTaskKilled(pQInfo)) {
|
||||
if(isTaskKilled(pQInfo)){
|
||||
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
|
||||
break;
|
||||
}
|
||||
|
@ -106,14 +106,15 @@ int waitMoment(SQInfo *pQInfo) {
|
|||
}
|
||||
#endif
|
||||
|
||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
|
||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
|
||||
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
int64_t threadId = taosGetSelfPthreadId();
|
||||
|
||||
*pRes = NULL;
|
||||
int64_t curOwner = 0;
|
||||
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
||||
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void *)curOwner);
|
||||
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
|
||||
(void*)curOwner);
|
||||
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
@ -132,11 +133,13 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
|
|||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
publishQueryAbortEvent(pTaskInfo, ret);
|
||||
pTaskInfo->code = ret;
|
||||
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
||||
cleanUpUdfs();
|
||||
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
|
||||
tstrerror(pTaskInfo->code));
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
/*qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));*/
|
||||
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
|
||||
|
||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
|
||||
|
@ -152,12 +155,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
|
|||
*useconds = pTaskInfo->cost.elapsedTime;
|
||||
}
|
||||
|
||||
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
|
||||
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
|
||||
pTaskInfo->totalRows += current;
|
||||
|
||||
cleanUpUdfs();
|
||||
/*qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",*/
|
||||
/*GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);*/
|
||||
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
|
||||
|
||||
atomic_store_64(&pTaskInfo->owner, 0);
|
||||
return pTaskInfo->code;
|
||||
|
@ -206,17 +209,18 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
|
|||
}
|
||||
|
||||
void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qTaskHandle;
|
||||
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
|
||||
qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
|
||||
|
||||
queryCostStatis(pTaskInfo); // print the query cost summary
|
||||
queryCostStatis(pTaskInfo); // print the query cost summary
|
||||
doDestroyTask(pTaskInfo);
|
||||
}
|
||||
|
||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
|
||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
|
||||
int32_t capacity = 0;
|
||||
int32_t capacity = 0;
|
||||
|
||||
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -106,7 +106,7 @@ static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
|
|||
|
||||
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
|
||||
|
||||
static void releaseQueryBuf(size_t numOfTables);
|
||||
static void releaseQueryBuf(size_t numOfTables);
|
||||
|
||||
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
|
||||
|
||||
|
@ -154,9 +154,8 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
|
|||
|
||||
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
||||
|
||||
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo,
|
||||
SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t* rowCellOffset,
|
||||
SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
||||
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||
int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
||||
|
||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
||||
|
@ -343,26 +342,6 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
|
|||
return pResultRow;
|
||||
}
|
||||
|
||||
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes, uint64_t groupId, int32_t numOfOutput) {
|
||||
SAggSupporter* pSup = &pInfo->aggSup;
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
|
||||
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset);
|
||||
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
|
||||
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
|
||||
continue;
|
||||
}
|
||||
pResInfo->initialized = false;
|
||||
if (pCtx[i].functionId != -1) {
|
||||
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* the struct of key in hash table
|
||||
* +----------+---------------+
|
||||
|
@ -598,9 +577,8 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
|
|||
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
|
||||
}
|
||||
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
|
||||
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
|
||||
int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
// keep it temporarily
|
||||
bool hasAgg = pCtx[k].input.colDataAggIsSet;
|
||||
|
@ -683,8 +661,8 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
|
|||
}
|
||||
}
|
||||
|
||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
||||
int32_t scanFlag, bool createDummyCol) {
|
||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
||||
bool createDummyCol) {
|
||||
if (pBlock->pBlockAgg != NULL) {
|
||||
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
||||
} else {
|
||||
|
@ -735,7 +713,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
|
|||
}
|
||||
|
||||
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
||||
int32_t scanFlag, bool createDummyCol) {
|
||||
int32_t scanFlag, bool createDummyCol) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
||||
|
@ -743,7 +721,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
|||
pCtx[i].input.numOfRows = pBlock->info.rows;
|
||||
|
||||
pCtx[i].pSrcBlock = pBlock;
|
||||
pCtx[i].scanFlag = scanFlag;
|
||||
pCtx[i].scanFlag = scanFlag;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx[i].input;
|
||||
pInput->uid = pBlock->info.uid;
|
||||
|
@ -1003,14 +981,14 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
|||
return false;
|
||||
}
|
||||
|
||||
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
|
||||
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||
// }
|
||||
//
|
||||
// // denote the order type
|
||||
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
|
||||
// // return pCtx->param[0].i == pQueryAttr->order.order;
|
||||
// }
|
||||
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
|
||||
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||
// }
|
||||
//
|
||||
// // denote the order type
|
||||
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
|
||||
// // return pCtx->param[0].i == pQueryAttr->order.order;
|
||||
// }
|
||||
|
||||
// in the reverse table scan, only the following functions need to be executed
|
||||
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
|
||||
|
@ -1128,16 +1106,16 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
|
|||
} else if (fmIsAggFunc(pCtx[i].functionId)) {
|
||||
p = &pCtx[i];
|
||||
}
|
||||
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
|
||||
// tagLen += pCtx[i].resDataInfo.bytes;
|
||||
// pTagCtx[num++] = &pCtx[i];
|
||||
// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
|
||||
// // tag function may be the group by tag column
|
||||
// // ts may be the required primary timestamp column
|
||||
// continue;
|
||||
// } else {
|
||||
// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
|
||||
// }
|
||||
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
|
||||
// tagLen += pCtx[i].resDataInfo.bytes;
|
||||
// pTagCtx[num++] = &pCtx[i];
|
||||
// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
|
||||
// // tag function may be the group by tag column
|
||||
// // ts may be the required primary timestamp column
|
||||
// continue;
|
||||
// } else {
|
||||
// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
|
||||
// }
|
||||
}
|
||||
|
||||
if (p != NULL) {
|
||||
|
@ -2015,7 +1993,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
|
|||
}
|
||||
|
||||
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo) {
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo) {
|
||||
if (pFilterNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -2133,9 +2111,8 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
|
|||
* @param pQInfo
|
||||
* @param result
|
||||
*/
|
||||
int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
||||
SGroupResInfo* pGroupResInfo, int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfExprs) {
|
||||
int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||
int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
|
||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||
int32_t start = pGroupResInfo->index;
|
||||
|
||||
|
@ -2173,11 +2150,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
|
|||
} else {
|
||||
// expand the result into multiple rows. E.g., _wstartts, top(k, 20)
|
||||
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
||||
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||
}
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
|
||||
for(int32_t k = 0; k < pRow->numOfRows; ++k) {
|
||||
colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2193,12 +2170,11 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
|
|||
return 0;
|
||||
}
|
||||
|
||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||
SDiskbasedBuf* pBuf) {
|
||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) {
|
||||
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
||||
|
||||
SExprInfo* pExprInfo = pOperator->pExpr;
|
||||
int32_t numOfExprs = pOperator->numOfExprs;
|
||||
SExprInfo* pExprInfo = pOperator->pExpr;
|
||||
int32_t numOfExprs = pOperator->numOfExprs;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
int32_t* rowCellOffset = pbInfo->rowCellInfoOffset;
|
||||
|
@ -3217,7 +3193,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* pSources, SSDataBlock* pBlock,
|
||||
SOperatorInfo* createExchangeOperatorInfo(void *pTransporter, const SNodeList* pSources, SSDataBlock* pBlock,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -3330,7 +3306,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3
|
|||
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
|
||||
int32_t rowIndex) {
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index
|
||||
// pCtx[j].startRow = rowIndex;
|
||||
// pCtx[j].startRow = rowIndex;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfExpr; ++j) {
|
||||
|
@ -3381,7 +3357,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
|
|||
|
||||
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
// pCtx[i].size = 1;
|
||||
// pCtx[i].size = 1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
|
@ -3607,7 +3583,7 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
|
||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) {
|
||||
// todo add more information about exchange operation
|
||||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
|
||||
*order = TSDB_ORDER_ASC;
|
||||
|
@ -3637,7 +3613,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||
|
||||
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
|
@ -3977,8 +3953,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||
|
||||
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
||||
pProjectInfo->pPseudoColInfo);
|
||||
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -4228,7 +4203,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
|
|||
for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) {
|
||||
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i);
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) {
|
||||
STableKeyInfo* pk = taosArrayGet(pa, j);
|
||||
STableKeyInfo* pk = taosArrayGet(pa, j);
|
||||
STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++];
|
||||
pTQueryInfo->lastKey = pk->lastKey;
|
||||
}
|
||||
|
@ -4364,9 +4339,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->limit = *pLimit;
|
||||
pInfo->slimit = *pSlimit;
|
||||
pInfo->curOffset = pLimit->offset;
|
||||
pInfo->limit = *pLimit;
|
||||
pInfo->slimit = *pSlimit;
|
||||
pInfo->curOffset = pLimit->offset;
|
||||
pInfo->curSOffset = pSlimit->offset;
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
@ -4506,10 +4481,10 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa
|
|||
}
|
||||
|
||||
pCol->slotId = slotId;
|
||||
pCol->colId = colId;
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pCol->colId = colId;
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pCol->precision = pType->precision;
|
||||
pCol->dataBlockId = blockId;
|
||||
|
||||
|
@ -4584,10 +4559,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) {
|
||||
pFuncNode->pParameterList = nodesMakeList();
|
||||
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
if (NULL == res) { // todo handle error
|
||||
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
if (NULL == res) { // todo handle error
|
||||
} else {
|
||||
res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||
res->node.resType = (SDataType) {.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||
nodesListAppend(pFuncNode->pParameterList, res);
|
||||
}
|
||||
}
|
||||
|
@ -4680,7 +4655,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
||||
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
|
@ -4689,27 +4664,27 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
if (pHandle->vnode) {
|
||||
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||
} else {
|
||||
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
||||
queryId, taskId);
|
||||
}
|
||||
|
||||
if (pDataReader == NULL && terrno != 0) {
|
||||
/*qDebug("pDataReader is NULL");*/
|
||||
qDebug("pDataReader is NULL");
|
||||
// return NULL;
|
||||
} else {
|
||||
/*qDebug("pDataReader is not NULL");*/
|
||||
qDebug("pDataReader is not NULL");
|
||||
}
|
||||
|
||||
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
|
||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||
|
||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
SOperatorInfo* pOperator =
|
||||
createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols,
|
||||
tableIdList, pTaskInfo, pScanPhyNode->node.pConditions, pOperatorDumy);
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo,
|
||||
pScanPhyNode->node.pConditions, pOperatorDumy);
|
||||
taosArrayDestroy(tableIdList);
|
||||
return pOperator;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||
|
@ -4858,7 +4833,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
|
||||
|
||||
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
|
||||
SColumn col = extractColumnFromColumnNode(pColNode);
|
||||
SColumn col = extractColumnFromColumnNode(pColNode);
|
||||
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
|
||||
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*)pPhyNode;
|
||||
|
@ -4926,11 +4901,11 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
|||
|
||||
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
|
||||
SColumn c = {0};
|
||||
c.slotId = pColNode->slotId;
|
||||
c.colId = pColNode->colId;
|
||||
c.type = pColNode->node.resType.type;
|
||||
c.bytes = pColNode->node.resType.bytes;
|
||||
c.scale = pColNode->node.resType.scale;
|
||||
c.slotId = pColNode->slotId;
|
||||
c.colId = pColNode->colId;
|
||||
c.type = pColNode->node.resType.type;
|
||||
c.bytes = pColNode->node.resType.bytes;
|
||||
c.scale = pColNode->node.resType.scale;
|
||||
c.precision = pColNode->node.resType.precision;
|
||||
return c;
|
||||
}
|
||||
|
@ -5324,3 +5299,16 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize,
|
||||
const char* pKey, const char* pDir) {
|
||||
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
|
||||
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
|
||||
int32_t pageSize = rowSize * 32;
|
||||
int32_t bufSize = pageSize * 4096;
|
||||
createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir);
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,11 @@
|
|||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
|
||||
typedef struct SWindowPosition {
|
||||
int32_t pageId;
|
||||
int32_t rowId;
|
||||
} SWindowPosition;
|
||||
|
||||
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
|
||||
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
||||
const char* dbName);
|
||||
|
@ -675,6 +680,96 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId, TSKEY ts) {
|
||||
int64_t* pKey = (int64_t*)pSup->pKeyBuf;
|
||||
pKey[0] = groupId;
|
||||
pKey[1] = childId;
|
||||
pKey[2] = ts;
|
||||
}
|
||||
|
||||
static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
|
||||
int32_t pageId, int32_t tsIndex, int64_t childId) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex);
|
||||
TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
|
||||
for (int32_t i = 0; i < pDataBlock->info.rows; i++) {
|
||||
setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]);
|
||||
SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable,
|
||||
pSup->pKeyBuf, pSup->keySize);
|
||||
if (p1 == NULL) {
|
||||
SWindowPosition pos = {.pageId = pageId, .rowId = i};
|
||||
int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos,
|
||||
sizeof(SWindowPosition));
|
||||
if (code != TSDB_CODE_SUCCESS ) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
p1->pageId = pageId;
|
||||
p1->rowId = i;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
|
||||
int32_t tsIndex, int64_t childId) {
|
||||
int32_t start = 0;
|
||||
int32_t stop = 0;
|
||||
int32_t pageSize = getBufPageSize(pSup->pDataBuf);
|
||||
while(start < pDataBlock->info.rows) {
|
||||
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize);
|
||||
SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
|
||||
if (pDB == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId);
|
||||
if (pPage == NULL) {
|
||||
blockDataDestroy(pDB);
|
||||
return terrno;
|
||||
}
|
||||
int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t);
|
||||
assert(size <= pageSize);
|
||||
blockDataToBuf(pPage, pDB);
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pSup->pDataBuf, pPage);
|
||||
blockDataDestroy(pDB);
|
||||
start = stop + 1;
|
||||
int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId);
|
||||
if (code != TSDB_CODE_SUCCESS ) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
|
||||
SSDataBlock* pBlock = pInfo->pUpdateRes;
|
||||
if (pInfo->updateResIndex < pBlock->info.rows) {
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
SCatchSupporter* pCSup = &pInfo->childAggSup;
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
int32_t size = taosArrayGetSize(pInfo->childIds);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
int64_t id = *(int64_t *)taosArrayGet(pInfo->childIds, i);
|
||||
setSupKeyBuf(pCSup, pBlock->info.groupId, id,
|
||||
tsCols[pInfo->updateResIndex]);
|
||||
SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable,
|
||||
pCSup->pKeyBuf, pCSup->keySize);
|
||||
void* buf = getBufPage(pCSup->pDataBuf, pos->pageId);
|
||||
SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
|
||||
blockDataFromBuf(pDB, buf);
|
||||
SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1);
|
||||
blockDataMerge(pInfo->pRes, pSub, NULL);
|
||||
blockDataDestroy(pDB);
|
||||
blockDataDestroy(pSub);
|
||||
}
|
||||
pInfo->updateResIndex++;
|
||||
return pInfo->pRes;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||
// NOTE: this operator does never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -688,6 +783,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
|
||||
if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
|
||||
SSDataBlock* pDB = getDataFromCatch(pInfo);
|
||||
if (pDB != NULL) {
|
||||
return pDB;
|
||||
} else {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
}
|
||||
}
|
||||
|
||||
if (pInfo->validBlockIndex >= total) {
|
||||
doClearBufferedBlocks(pInfo);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
|
@ -695,7 +799,17 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
return taosArrayGetP(pInfo->pBlockLists, current);
|
||||
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
|
||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
|
||||
} else {
|
||||
int32_t code = catchDatablock(pBlock, &pInfo->childAggSup, pInfo->primaryTsIndex, 0);
|
||||
if (code != TDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
return pBlock;
|
||||
} else {
|
||||
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
|
||||
blockDataDestroy(pInfo->pUpdateRes);
|
||||
|
@ -857,6 +971,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
|
|||
pInfo->pOperatorDumy = pOperatorDumy;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
|
||||
size_t childKeyBufSize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
|
||||
initCatchSupporter(&pInfo->childAggSup, 1024, childKeyBufSize,
|
||||
"StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan
|
||||
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
pOperator->blocking = false;
|
||||
|
|
|
@ -8,6 +8,8 @@ typedef enum SResultTsInterpType {
|
|||
RESULT_ROW_END_INTERP = 2,
|
||||
} SResultTsInterpType;
|
||||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator);
|
||||
|
||||
/*
|
||||
* There are two cases to handle:
|
||||
*
|
||||
|
@ -473,8 +475,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun
|
|||
}
|
||||
|
||||
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
|
||||
TSKEY* primaryKeys, int32_t prevPosition, SIntervalAggOperatorInfo* pInfo) {
|
||||
int32_t order = pInfo->order;
|
||||
TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
|
||||
bool ascQuery = (order == TSDB_ORDER_ASC);
|
||||
|
||||
int32_t precision = pInterval->precision;
|
||||
|
@ -723,7 +724,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
STimeWindow nextWin = win;
|
||||
while (1) {
|
||||
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo);
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -1031,18 +1032,41 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
|
|||
}
|
||||
}
|
||||
}
|
||||
static void doClearWindows(SIntervalAggOperatorInfo* pInfo, int32_t numOfOutput, SSDataBlock* pBlock) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
|
||||
void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
|
||||
int16_t bytes, uint64_t groupId, int32_t numOfOutput) {
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
|
||||
GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
|
||||
SqlFunctionCtx* pCtx = pBinfo->pCtx;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].resultInfo = getResultCell(pResult, i, pBinfo->rowCellInfoOffset);
|
||||
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
|
||||
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
|
||||
continue;
|
||||
}
|
||||
pResInfo->initialized = false;
|
||||
if (pCtx[i].functionId != -1) {
|
||||
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
|
||||
SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
int32_t step = 0;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], &pInfo->interval,
|
||||
pInfo->interval.precision, NULL);
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pIntrerval,
|
||||
pIntrerval->precision, NULL);
|
||||
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i,
|
||||
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
doClearWindow(pInfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
|
||||
doClearWindow(pSup, pBinfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1084,7 +1108,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
|
||||
}
|
||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||
doClearWindows(pInfo, pOperator->numOfExprs, pBlock);
|
||||
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
|
||||
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
|
||||
continue;
|
||||
}
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
|
@ -1097,8 +1122,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
|
||||
// TODO: remove for stream
|
||||
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
|
@ -1116,6 +1139,12 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
cleanupAggSup(&pInfo->aggSup);
|
||||
}
|
||||
|
||||
void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo *)param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
}
|
||||
|
||||
bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
|
||||
for (int32_t i = 0; i < numOfCols; i++) {
|
||||
if (!fmIsInvertible(pFCtx[i].functionId)) {
|
||||
|
@ -1185,6 +1214,63 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
|
||||
int32_t code =
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock,
|
||||
keyBufSize, pTaskInfo->id.str);
|
||||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
||||
|
||||
pOperator->name = "StreamFinalIntervalOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL,
|
||||
destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow,
|
||||
NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
|
||||
|
@ -1548,3 +1634,91 @@ _error:
|
|||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
|
||||
int32_t tableGroupId) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
int32_t step = 1;
|
||||
bool ascScan = true;
|
||||
TSKEY* tsCols = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
int32_t forwardStep = 0;
|
||||
|
||||
if (pSDataBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
}
|
||||
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan);
|
||||
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts,
|
||||
&pInfo->interval, pInfo->interval.precision, NULL);
|
||||
while (1) {
|
||||
int32_t code =
|
||||
setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx,
|
||||
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
||||
pos->groupId = tableGroupId;
|
||||
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
*(int64_t*)pos->key = pResult->win.skey;
|
||||
taosArrayPush(pUpdated, &pos);
|
||||
forwardStep =
|
||||
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
// window start(end) key interpolation
|
||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
|
||||
pInfo->order, false);
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
|
||||
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return pUpdated;
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SArray* pUpdated = NULL;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
|
||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
|
||||
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
|
||||
continue;
|
||||
}
|
||||
pUpdated = doHashInterval(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||
}
|
||||
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx);
|
|||
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
int32_t firstFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t lastFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
|
||||
int32_t topFunction(SqlFunctionCtx *pCtx);
|
||||
|
|
|
@ -47,8 +47,10 @@ static int32_t translateInOutNum(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
|||
}
|
||||
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(paraType)) {
|
||||
if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
} else if (IS_NULL_TYPE(paraType)) {
|
||||
paraType = TSDB_DATA_TYPE_BIGINT;
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[paraType].bytes, .type = paraType};
|
||||
|
@ -62,7 +64,7 @@ static int32_t translateInNumOutDou(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
|||
}
|
||||
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(paraType)) {
|
||||
if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
|
@ -115,18 +117,19 @@ static int32_t translateSum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
}
|
||||
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(paraType)) {
|
||||
if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
uint8_t resType = 0;
|
||||
if (IS_SIGNED_NUMERIC_TYPE(paraType) || paraType == TSDB_DATA_TYPE_BOOL) {
|
||||
if (IS_SIGNED_NUMERIC_TYPE(paraType) || TSDB_DATA_TYPE_BOOL == paraType || IS_NULL_TYPE(paraType)) {
|
||||
resType = TSDB_DATA_TYPE_BIGINT;
|
||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(paraType)) {
|
||||
resType = TSDB_DATA_TYPE_UBIGINT;
|
||||
} else if (IS_FLOAT_TYPE(paraType)) {
|
||||
resType = TSDB_DATA_TYPE_DOUBLE;
|
||||
}
|
||||
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[resType].bytes, .type = resType};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -872,7 +875,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = lastFunction,
|
||||
.finalizeFunc = functionFinalize
|
||||
.finalizeFunc = lastFinalize
|
||||
},
|
||||
{
|
||||
.name = "diff",
|
||||
|
|
|
@ -255,7 +255,7 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
|
||||
//pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
|
||||
|
||||
char* in = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
|
||||
|
@ -331,8 +331,18 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) {
|
|||
int32_t countFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElem = getNumofElem(pCtx);
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
*((int64_t*)buf) += numOfElem;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
if (IS_NULL_TYPE(type)) {
|
||||
//select count(NULL) returns 0
|
||||
numOfElem = 1;
|
||||
*((int64_t*)buf) = 0;
|
||||
} else {
|
||||
*((int64_t*)buf) += numOfElem;
|
||||
}
|
||||
|
||||
SET_VAL(pResInfo, numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -378,11 +388,17 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
// Only the pre-computing information loaded and actual data does not loaded
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
if (IS_NULL_TYPE(type)) {
|
||||
GET_RES_INFO(pCtx)->isNullRes = 1;
|
||||
numOfElem = 1;
|
||||
goto _sum_over;
|
||||
}
|
||||
|
||||
if (pInput->colDataAggIsSet) {
|
||||
numOfElem = pInput->numOfRows - pAgg->numOfNull;
|
||||
ASSERT(numOfElem >= 0);
|
||||
|
@ -427,6 +443,7 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
_sum_over:
|
||||
// data in the check operation are all null, not output
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -526,6 +543,12 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
|||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
if (IS_NULL_TYPE(type)) {
|
||||
GET_RES_INFO(pCtx)->isNullRes = 1;
|
||||
numOfElem = 1;
|
||||
goto _avg_over;
|
||||
}
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t* plist = (int8_t*)pCol->pData;
|
||||
|
@ -617,6 +640,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
|
||||
_avg_over:
|
||||
// data in the check operation are all null, not output
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -783,6 +807,12 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SMinmaxResInfo *pBuf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (IS_NULL_TYPE(type)) {
|
||||
GET_RES_INFO(pCtx)->isNullRes = 1;
|
||||
numOfElems = 1;
|
||||
goto _min_max_over;
|
||||
}
|
||||
|
||||
// data in current data block are qualified to the query
|
||||
if (pInput->colDataAggIsSet) {
|
||||
numOfElems = pInput->numOfRows - pAgg->numOfNull;
|
||||
|
@ -1183,6 +1213,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
}
|
||||
}
|
||||
|
||||
_min_max_over:
|
||||
return numOfElems;
|
||||
}
|
||||
|
||||
|
@ -1215,9 +1246,9 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
|
||||
if (pCol->info.type == TSDB_DATA_TYPE_FLOAT) {
|
||||
float v = *(double*) &pRes->v;
|
||||
colDataAppend(pCol, currentRow, (const char*)&v, false);
|
||||
colDataAppend(pCol, currentRow, (const char*)&v, pEntryInfo->isNullRes);
|
||||
} else {
|
||||
colDataAppend(pCol, currentRow, (const char*)&pRes->v, false);
|
||||
colDataAppend(pCol, currentRow, (const char*)&pRes->v, pEntryInfo->isNullRes);
|
||||
}
|
||||
|
||||
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
|
||||
|
@ -1287,6 +1318,12 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
|||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
if (IS_NULL_TYPE(type)) {
|
||||
GET_RES_INFO(pCtx)->isNullRes = 1;
|
||||
numOfElem = 1;
|
||||
goto _stddev_over;
|
||||
}
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t* plist = (int8_t*)pCol->pData;
|
||||
|
@ -1384,6 +1421,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
|
||||
_stddev_over:
|
||||
// data in the check operation are all null, not output
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1943,6 +1981,19 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
|
||||
|
||||
char* in = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SDiffInfo);
|
||||
return true;
|
||||
|
|
|
@ -1364,9 +1364,12 @@ void releaseUdfFuncHandle(char* udfName) {
|
|||
SUdfcFuncStub key = {0};
|
||||
strcpy(key.udfName, udfName);
|
||||
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
|
||||
ASSERT(foundStub);
|
||||
--foundStub->refCount;
|
||||
ASSERT(foundStub->refCount>=0);
|
||||
if (!foundStub) {
|
||||
return;
|
||||
}
|
||||
if (foundStub->refCount > 0) {
|
||||
--foundStub->refCount;
|
||||
}
|
||||
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
|
||||
}
|
||||
|
||||
|
@ -1377,7 +1380,7 @@ int32_t cleanUpUdfs() {
|
|||
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
|
||||
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
|
||||
if (stub->refCount == 0) {
|
||||
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
|
||||
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
|
||||
doTeardownUdf(stub->handle);
|
||||
} else {
|
||||
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
|
||||
|
@ -1530,12 +1533,15 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
|
|||
}
|
||||
SUdfcUvSession *session = handle;
|
||||
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
|
||||
if (session->outputType != output->columnData->info.type
|
||||
|| session->outputLen != output->columnData->info.bytes) {
|
||||
fnError("udfc scalar function calculate error, session type: %d(%d), output type: %d(%d)",
|
||||
session->outputType, session->outputLen,
|
||||
output->columnData->info.type, output->columnData->info.bytes);
|
||||
if (output->columnData == NULL) {
|
||||
fnError("udfc scalar function calculate error. no column data");
|
||||
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
|
||||
} else {
|
||||
if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
|
||||
fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType,
|
||||
session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
|
||||
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
|
||||
}
|
||||
}
|
||||
releaseUdfFuncHandle(udfName);
|
||||
return code;
|
||||
|
@ -1565,7 +1571,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
|||
|
||||
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
|
||||
|
||||
taosMemoryFree(task->session);
|
||||
taosMemoryFree(session);
|
||||
taosMemoryFree(task);
|
||||
|
||||
return err;
|
||||
|
@ -1573,7 +1579,6 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
|
|||
|
||||
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
|
||||
typedef struct SUdfAggRes {
|
||||
SUdfcUvSession *session;
|
||||
int8_t finalResNum;
|
||||
int8_t interResNum;
|
||||
char* finalResBuf;
|
||||
|
@ -1606,7 +1611,6 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
|||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||
|
||||
udfRes->session = (SUdfcUvSession *)handle;
|
||||
SUdfInterBuf buf = {0};
|
||||
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
|
||||
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
|
||||
|
@ -1621,22 +1625,26 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
|||
releaseUdfFuncHandle(pCtx->udfName);
|
||||
return false;
|
||||
}
|
||||
releaseUdfFuncHandle(pCtx->udfName);
|
||||
freeUdfInterBuf(&buf);
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t numOfCols = pInput->numOfInputCols;
|
||||
|
||||
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
SUdfcUvSession *session = udfRes->session;
|
||||
if (session == NULL) {
|
||||
return TSDB_CODE_UDF_NO_FUNC_HANDLE;
|
||||
int32_t udfCode = 0;
|
||||
UdfcFuncHandle handle = 0;
|
||||
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
|
||||
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
|
||||
return udfCode;
|
||||
}
|
||||
|
||||
SUdfcUvSession *session = handle;
|
||||
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t numOfCols = pInput->numOfInputCols;
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
|
@ -1664,7 +1672,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
|||
.numOfResult = udfRes->interResNum};
|
||||
SUdfInterBuf newState = {0};
|
||||
|
||||
int32_t udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
|
||||
udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
|
||||
if (udfCode != 0) {
|
||||
fnError("udfAggProcess error. code: %d", udfCode);
|
||||
newState.numOfResult = 0;
|
||||
|
@ -1684,19 +1692,21 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
|||
blockDataDestroy(inputBlock);
|
||||
taosArrayDestroy(tempBlock.pDataBlock);
|
||||
|
||||
if (udfCode != 0) {
|
||||
releaseUdfFuncHandle(pCtx->udfName);
|
||||
}
|
||||
releaseUdfFuncHandle(pCtx->udfName);
|
||||
freeUdfInterBuf(&newState);
|
||||
return udfCode;
|
||||
}
|
||||
|
||||
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
SUdfcUvSession *session = udfRes->session;
|
||||
if (session == NULL) {
|
||||
return TSDB_CODE_UDF_NO_FUNC_HANDLE;
|
||||
int32_t udfCode = 0;
|
||||
UdfcFuncHandle handle = 0;
|
||||
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
|
||||
fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode);
|
||||
return udfCode;
|
||||
}
|
||||
|
||||
SUdfcUvSession *session = handle;
|
||||
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||
|
||||
|
|
|
@ -621,6 +621,7 @@ column_reference(A) ::= table_name(B) NK_DOT column_name(C).
|
|||
|
||||
pseudo_column(A) ::= ROWTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
|
||||
pseudo_column(A) ::= TBNAME(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
|
||||
pseudo_column(A) ::= table_name(B) NK_DOT TBNAME(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNode(pCxt, &C, createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)))); }
|
||||
pseudo_column(A) ::= QSTARTTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
|
||||
pseudo_column(A) ::= QENDTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
|
||||
pseudo_column(A) ::= WSTARTTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
|
||||
|
|
|
@ -155,9 +155,9 @@ static bool checkPort(SAstCreateContext* pCxt, const SToken* pPortToken, int32_t
|
|||
return TSDB_CODE_SUCCESS == pCxt->errCode;
|
||||
}
|
||||
|
||||
static bool checkDbName(SAstCreateContext* pCxt, SToken* pDbName, bool query) {
|
||||
static bool checkDbName(SAstCreateContext* pCxt, SToken* pDbName, bool demandDb) {
|
||||
if (NULL == pDbName) {
|
||||
if (query && NULL == pCxt->pQueryCxt->db) {
|
||||
if (demandDb && NULL == pCxt->pQueryCxt->db) {
|
||||
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_DB_NOT_SPECIFIED);
|
||||
}
|
||||
} else {
|
||||
|
@ -457,6 +457,8 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
|
|||
}
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pSubquery)) {
|
||||
strcpy(((SSelectStmt*)pSubquery)->stmtName, tempTable->table.tableAlias);
|
||||
} else if (QUERY_NODE_SET_OPERATOR == nodeType(pSubquery)) {
|
||||
strcpy(((SSetOperator*)pSubquery)->stmtName, tempTable->table.tableAlias);
|
||||
}
|
||||
return (SNode*)tempTable;
|
||||
}
|
||||
|
@ -637,6 +639,7 @@ SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode*
|
|||
setOp->opType = type;
|
||||
setOp->pLeft = pLeft;
|
||||
setOp->pRight = pRight;
|
||||
sprintf(setOp->stmtName, "%p", setOp);
|
||||
return (SNode*)setOp;
|
||||
}
|
||||
|
||||
|
@ -1140,7 +1143,7 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
|
|||
|
||||
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName,
|
||||
SToken* pTableName, SNodeList* pCols, SNode* pOptions) {
|
||||
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName)) {
|
||||
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName) || !checkDbName(pCxt, NULL, true)) {
|
||||
return NULL;
|
||||
}
|
||||
SCreateIndexStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_INDEX_STMT);
|
||||
|
|
|
@ -766,6 +766,20 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
|
|||
pCxt->pCurrStmt->hasRepeatScanFuncs = true;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsScanPseudoColumnFunc(pFunc->funcId)) {
|
||||
if (0 == LIST_LENGTH(pFunc->pParameterList)) {
|
||||
if (QUERY_NODE_REAL_TABLE != nodeType(pCxt->pCurrStmt->pFromTable)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_TBNAME);
|
||||
}
|
||||
} else {
|
||||
SValueNode* pVal = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
STableNode* pTable = NULL;
|
||||
pCxt->errCode = findTable(pCxt, pVal->literal, &pTable);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode && (NULL == pTable || QUERY_NODE_REAL_TABLE != nodeType(pTable))) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_TBNAME);
|
||||
}
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
|
@ -1758,12 +1772,13 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static SNode* createSetOperProject(SNode* pNode) {
|
||||
static SNode* createSetOperProject(const char* pTableAlias, SNode* pNode) {
|
||||
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return NULL;
|
||||
}
|
||||
pCol->node.resType = ((SExprNode*)pNode)->resType;
|
||||
strcpy(pCol->tableAlias, pTableAlias);
|
||||
strcpy(pCol->colName, ((SExprNode*)pNode)->aliasName);
|
||||
strcpy(pCol->node.aliasName, pCol->colName);
|
||||
return (SNode*)pCol;
|
||||
|
@ -1817,7 +1832,8 @@ static int32_t translateSetOperatorImpl(STranslateContext* pCxt, SSetOperator* p
|
|||
}
|
||||
strcpy(pRightExpr->aliasName, pLeftExpr->aliasName);
|
||||
pRightExpr->aliasName[strlen(pLeftExpr->aliasName)] = '\0';
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSetOperator->pProjectionList, createSetOperProject(pLeft))) {
|
||||
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSetOperator->pProjectionList,
|
||||
createSetOperProject(pSetOperator->stmtName, pLeft))) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,6 +158,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "Primary timestamp column cannot be dropped";
|
||||
case TSDB_CODE_PAR_INVALID_MODIFY_COL:
|
||||
return "Only binary/nchar column length could be modified";
|
||||
case TSDB_CODE_PAR_INVALID_TBNAME:
|
||||
return "Invalid tbname pseudo column";
|
||||
case TSDB_CODE_OUT_OF_MEMORY:
|
||||
return "Out of memory";
|
||||
default:
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -70,6 +70,12 @@ TEST_F(ParserSelectTest, pseudoColumn) {
|
|||
run("SELECT _WSTARTTS, _WENDTS, COUNT(*) FROM t1 INTERVAL(10s)");
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, pseudoColumnSemanticCheck) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT TBNAME FROM (SELECT * FROM st1s1)", TSDB_CODE_PAR_INVALID_TBNAME, PARSER_STAGE_TRANSLATE);
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, multiResFunc) {
|
||||
useDb("root", "test");
|
||||
|
||||
|
|
|
@ -27,12 +27,13 @@ extern "C" {
|
|||
#define QUERY_POLICY_HYBRID 2
|
||||
#define QUERY_POLICY_QNODE 3
|
||||
|
||||
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
|
||||
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
|
||||
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
|
||||
#define planInfo(param, ...) qInfo("PLAN: " param, __VA_ARGS__)
|
||||
#define planDebug(param, ...) qDebug("PLAN: " param, __VA_ARGS__)
|
||||
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
|
||||
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
|
||||
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
|
||||
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
|
||||
#define planInfo(param, ...) qInfo("PLAN: " param, __VA_ARGS__)
|
||||
#define planDebug(param, ...) qDebug("PLAN: " param, __VA_ARGS__)
|
||||
#define planDebugL(param, ...) qDebugL("PLAN: " param, __VA_ARGS__)
|
||||
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
|
||||
|
||||
int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...);
|
||||
|
||||
|
|
|
@ -310,12 +310,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
|
||||
static int32_t createSubqueryLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable,
|
||||
SLogicNode** pLogicNode) {
|
||||
int32_t code = createQueryLogicNode(pCxt, pTable->pSubquery, pLogicNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, (*pLogicNode)->pTargets) { strcpy(((SColumnNode*)pNode)->tableAlias, pTable->table.tableAlias); }
|
||||
}
|
||||
return code;
|
||||
return createQueryLogicNode(pCxt, pTable->pSubquery, pLogicNode);
|
||||
}
|
||||
|
||||
static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SJoinTableNode* pJoinTable,
|
||||
|
@ -879,7 +874,8 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator
|
|||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByProjections(pCxt, NULL, pSetOperator->pProjectionList, &pProject->node.pTargets);
|
||||
code = createColumnByProjections(pCxt, pSetOperator->stmtName, pSetOperator->pProjectionList,
|
||||
&pProject->node.pTargets);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -933,7 +929,7 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO
|
|||
code = createSetOpAggLogicNode(pCxt, pSetOperator, &pSetOp);
|
||||
break;
|
||||
default:
|
||||
code = -1;
|
||||
code = TSDB_CODE_FAILED;
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -598,39 +598,63 @@ static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) {
|
||||
if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||
static bool cpdContainPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pCond)) {
|
||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pCond;
|
||||
if (LOGIC_COND_TYPE_AND != pLogicCond->condType) {
|
||||
return false;
|
||||
}
|
||||
bool hasPrimaryKeyEqualCond = false;
|
||||
SNode* pCond = NULL;
|
||||
FOREACH(pCond, pLogicCond->pParameterList) {
|
||||
if (cpdContainPrimaryKeyEqualCond(pJoin, pCond)) {
|
||||
hasPrimaryKeyEqualCond = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return hasPrimaryKeyEqualCond;
|
||||
} else {
|
||||
return cpdIsPrimaryKeyEqualCond(pJoin, pCond);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) {
|
||||
if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||
}
|
||||
bool hasPrimaryKeyEqualCond = false;
|
||||
SNode* pCond = NULL;
|
||||
FOREACH(pCond, pOnCond->pParameterList) {
|
||||
if (cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
|
||||
hasPrimaryKeyEqualCond = true;
|
||||
}
|
||||
}
|
||||
if (!hasPrimaryKeyEqualCond) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
// static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) {
|
||||
// if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) {
|
||||
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||
// }
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
|
||||
// static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) {
|
||||
// if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
|
||||
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||
// }
|
||||
// bool hasPrimaryKeyEqualCond = false;
|
||||
// SNode* pCond = NULL;
|
||||
// FOREACH(pCond, pOnCond->pParameterList) {
|
||||
// if (cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
|
||||
// hasPrimaryKeyEqualCond = true;
|
||||
// }
|
||||
// }
|
||||
// if (!hasPrimaryKeyEqualCond) {
|
||||
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||
// }
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
|
||||
static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
if (NULL == pJoin->pOnConditions) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
|
||||
}
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) {
|
||||
return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions);
|
||||
} else {
|
||||
return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions);
|
||||
if (!cpdContainPrimaryKeyEqualCond(pJoin, pJoin->pOnConditions)) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
// if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) {
|
||||
// return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions);
|
||||
// } else {
|
||||
// return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions);
|
||||
// }
|
||||
}
|
||||
|
||||
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
|
|
|
@ -204,6 +204,75 @@ static int32_t ctjSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool unionIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
|
||||
return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
|
||||
}
|
||||
|
||||
SNode* pChild;
|
||||
FOREACH(pChild, pLogicNode->pChildren) {
|
||||
bool isChild = unionIsChildSubplan((SLogicNode*)pChild, groupId);
|
||||
if (isChild) {
|
||||
return isChild;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t unionMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
|
||||
SNode* pChild = NULL;
|
||||
WHERE_EACH(pChild, pChildren) {
|
||||
if (unionIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
|
||||
int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
REPLACE_NODE(NULL);
|
||||
ERASE_NODE(pChildren);
|
||||
continue;
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
WHERE_NEXT;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
|
||||
SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
|
||||
if (NULL == pSubplan) {
|
||||
return NULL;
|
||||
}
|
||||
pSubplan->id.groupId = pCxt->groupId;
|
||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||
pSubplan->pNode = pNode;
|
||||
return pSubplan;
|
||||
}
|
||||
|
||||
static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
|
||||
SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
|
||||
pUnionSubplan->pChildren = NULL;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, pSplitNode->pChildren) {
|
||||
SLogicSubplan* pNewSubplan = unionCreateSubplan(pCxt, (SLogicNode*)pChild);
|
||||
code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, pNewSubplan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
REPLACE_NODE(NULL);
|
||||
code = unionMountSubplan(pNewSubplan, pSubplanChildren);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
nodesDestroyList(pSubplanChildren);
|
||||
DESTORY_LIST(pSplitNode->pChildren);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static SLogicNode* uaMatchByNode(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
|
||||
return pNode;
|
||||
|
@ -227,17 +296,6 @@ static bool uaFindSplitNode(SLogicSubplan* pSubplan, SUaInfo* pInfo) {
|
|||
return NULL != pSplitNode;
|
||||
}
|
||||
|
||||
static SLogicSubplan* uaCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
|
||||
SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
|
||||
if (NULL == pSubplan) {
|
||||
return NULL;
|
||||
}
|
||||
pSubplan->id.groupId = pCxt->groupId;
|
||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||
pSubplan->pNode = pNode;
|
||||
return pSubplan;
|
||||
}
|
||||
|
||||
static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
|
||||
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
|
||||
if (NULL == pExchange) {
|
||||
|
@ -276,20 +334,8 @@ static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, info.pProject->node.pChildren) {
|
||||
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, uaCreateSubplan(pCxt, (SLogicNode*)pChild));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
REPLACE_NODE(NULL);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
nodesClearList(info.pProject->node.pChildren);
|
||||
info.pProject->node.pChildren = NULL;
|
||||
code = uaCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
|
@ -343,20 +389,8 @@ static int32_t unSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, info.pAgg->node.pChildren) {
|
||||
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, uaCreateSubplan(pCxt, (SLogicNode*)pChild));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
REPLACE_NODE(NULL);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
nodesClearList(info.pAgg->node.pChildren);
|
||||
info.pAgg->node.pChildren = NULL;
|
||||
code = unCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
|
|
|
@ -30,6 +30,14 @@ TEST_F(PlanJoinTest, basic) {
|
|||
run("SELECT t1.c1, t2.c1 FROM st1s1 t1 JOIN st1s2 t2 ON t1.ts = t2.ts");
|
||||
}
|
||||
|
||||
TEST_F(PlanJoinTest, complex) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT t1.c1, t2.c2 FROM st1s1 t1, st1s2 t2 "
|
||||
"WHERE t1.ts = t2.ts AND t1.c1 BETWEEN -10 AND 10 AND t2.c1 BETWEEN -100 AND 100 AND "
|
||||
"(t1.c2 LIKE 'nchar%' OR t1.c1 = 0 OR t2.c2 LIKE 'nchar%' OR t2.c1 = 0)");
|
||||
}
|
||||
|
||||
TEST_F(PlanJoinTest, withWhere) {
|
||||
useDb("root", "test");
|
||||
|
||||
|
|
|
@ -23,11 +23,33 @@ class PlanSetOpTest : public PlannerTestBase {};
|
|||
TEST_F(PlanSetOpTest, unionAll) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20");
|
||||
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20");
|
||||
}
|
||||
|
||||
TEST_F(PlanSetOpTest, unionAllSubquery) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT * FROM (SELECT c1, c2 FROM t1 UNION ALL SELECT c1, c2 FROM t1)");
|
||||
}
|
||||
|
||||
TEST_F(PlanSetOpTest, union) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("select c1, c2 from t1 where c1 > 10 union select c1, c2 from t1 where c1 > 20");
|
||||
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION SELECT c1, c2 FROM t1 WHERE c1 > 20");
|
||||
}
|
||||
|
||||
TEST_F(PlanSetOpTest, unionContainJoin) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT t1.c1 FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts "
|
||||
"WHERE t1.c1 IS NOT NULL GROUP BY t1.c1 HAVING t1.c1 IS NOT NULL "
|
||||
"UNION "
|
||||
"SELECT t1.c1 FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts "
|
||||
"WHERE t1.c1 IS NOT NULL GROUP BY t1.c1 HAVING t1.c1 IS NOT NULL");
|
||||
}
|
||||
|
||||
TEST_F(PlanSetOpTest, unionSubquery) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT * FROM (SELECT c1, c2 FROM t1 UNION SELECT c1, c2 FROM t1)");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue