Merge branch 'fix/mnode' into fix/tsim
This commit is contained in:
commit
b07dfdbaf5
|
@ -50,8 +50,10 @@ typedef enum EStreamType {
|
||||||
} EStreamType;
|
} EStreamType;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
SArray* pGroupList;
|
||||||
SArray* pTableList;
|
SArray* pTableList;
|
||||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||||
|
bool needSortTableByGroupId;
|
||||||
void* pTagCond;
|
void* pTagCond;
|
||||||
void* pTagIndexCond;
|
void* pTagIndexCond;
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
|
|
|
@ -36,7 +36,7 @@ typedef struct SReadHandle {
|
||||||
void* vnode;
|
void* vnode;
|
||||||
void* mnd;
|
void* mnd;
|
||||||
SMsgCb* pMsgCb;
|
SMsgCb* pMsgCb;
|
||||||
int8_t initTsdbReader;
|
// int8_t initTsdbReader;
|
||||||
} SReadHandle;
|
} SReadHandle;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
|
|
@ -963,6 +963,10 @@ int32_t taosSetCfg(SConfig *pCfg, char* name) {
|
||||||
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
|
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
|
||||||
} else if (strcasecmp("transPullupInterval", name) == 0) {
|
} else if (strcasecmp("transPullupInterval", name) == 0) {
|
||||||
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
|
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
|
||||||
|
} else if (strcasecmp("ttlUnit", name) == 0) {
|
||||||
|
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
|
||||||
|
} else if (strcasecmp("ttlPushInterval", name) == 0) {
|
||||||
|
tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32;
|
||||||
} else if (strcasecmp("tmrDebugFlag", name) == 0) {
|
} else if (strcasecmp("tmrDebugFlag", name) == 0) {
|
||||||
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
|
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
|
||||||
} else if (strcasecmp("tsdbDebugFlag", name) == 0) {
|
} else if (strcasecmp("tsdbDebugFlag", name) == 0) {
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef _TD_MND_AUTH_H_
|
#ifndef _TD_MND_PRIVILEGE_H
|
||||||
#define _TD_MND_AUTH_H_
|
#define _TD_MND_PRIVILEGE_H
|
||||||
|
|
||||||
#include "mndInt.h"
|
#include "mndInt.h"
|
||||||
|
|
||||||
|
@ -59,11 +59,12 @@ typedef enum {
|
||||||
MND_OPER_READ_DB,
|
MND_OPER_READ_DB,
|
||||||
} EOperType;
|
} EOperType;
|
||||||
|
|
||||||
int32_t mndInitAuth(SMnode *pMnode);
|
int32_t mndInitPrivilege(SMnode *pMnode);
|
||||||
void mndCleanupAuth(SMnode *pMnode);
|
void mndCleanupPrivilege(SMnode *pMnode);
|
||||||
|
|
||||||
int32_t mndCheckOperPrivilege(SMnode *pMnode, const char *user, EOperType operType);
|
int32_t mndCheckOperPrivilege(SMnode *pMnode, const char *user, EOperType operType);
|
||||||
int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType, SDbObj *pDb);
|
int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType, SDbObj *pDb);
|
||||||
|
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *name);
|
||||||
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, int32_t showType);
|
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, int32_t showType);
|
||||||
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
|
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
|
||||||
|
|
||||||
|
@ -71,4 +72,4 @@ int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterU
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_MND_AUTH_H_*/
|
#endif /*_TD_MND_PRIVILEGE_H*/
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndAcct.h"
|
#include "mndAcct.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndBnode.h"
|
#include "mndBnode.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
@ -431,6 +431,10 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
goto SUBSCRIBE_OVER;
|
goto SUBSCRIBE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (mndCheckDbPrivilegeByName(pMnode, pMsg->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
|
||||||
|
goto SUBSCRIBE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
// ref topic to prevent drop
|
// ref topic to prevent drop
|
||||||
// TODO make topic complete
|
// TODO make topic complete
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndOffset.h"
|
#include "mndOffset.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndQnode.h"
|
#include "mndQnode.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndFunc.h"
|
#include "mndFunc.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndAcct.h"
|
#include "mndAcct.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndBnode.h"
|
#include "mndBnode.h"
|
||||||
#include "mndCluster.h"
|
#include "mndCluster.h"
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
|
@ -100,7 +100,7 @@ static void *mndThreadFp(void *param) {
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
if (mndGetStop(pMnode)) break;
|
if (mndGetStop(pMnode)) break;
|
||||||
|
|
||||||
if (lastTime % (tsTransPullupInterval * 10) == 1) {
|
if (lastTime % (tsTtlPushInterval * 10) == 1) {
|
||||||
mndTtlTimer(pMnode);
|
mndTtlTimer(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,7 +239,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
|
||||||
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndOffset.h"
|
#include "mndOffset.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
|
|
@ -14,64 +14,13 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
#include "mndDb.h"
|
||||||
|
|
||||||
static int32_t mndProcessAuthReq(SRpcMsg *pReq);
|
int32_t mndInitPrivilege(SMnode *pMnode) { return 0; }
|
||||||
|
|
||||||
int32_t mndInitAuth(SMnode *pMnode) {
|
void mndCleanupPrivilege(SMnode *pMnode) {}
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_AUTH, mndProcessAuthReq);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void mndCleanupAuth(SMnode *pMnode) {}
|
|
||||||
|
|
||||||
static int32_t mndRetriveAuth(SMnode *pMnode, SAuthRsp *pRsp) {
|
|
||||||
SUserObj *pUser = mndAcquireUser(pMnode, pRsp->user);
|
|
||||||
if (pUser == NULL) {
|
|
||||||
*pRsp->secret = 0;
|
|
||||||
mError("user:%s, failed to auth user since %s", pRsp->user, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRsp->spi = 1;
|
|
||||||
pRsp->encrypt = 0;
|
|
||||||
*pRsp->ckey = 0;
|
|
||||||
|
|
||||||
memcpy(pRsp->secret, pUser->pass, TSDB_PASSWORD_LEN);
|
|
||||||
mndReleaseUser(pMnode, pUser);
|
|
||||||
|
|
||||||
mDebug("user:%s, auth info is returned", pRsp->user);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndProcessAuthReq(SRpcMsg *pReq) {
|
|
||||||
SAuthReq authReq = {0};
|
|
||||||
if (tDeserializeSAuthReq(pReq->pCont, pReq->contLen, &authReq) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SAuthReq authRsp = {0};
|
|
||||||
memcpy(authRsp.user, authReq.user, TSDB_USER_LEN);
|
|
||||||
|
|
||||||
int32_t code = mndRetriveAuth(pReq->info.node, &authRsp);
|
|
||||||
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->info.conn.user, authRsp.spi, authRsp.encrypt,
|
|
||||||
authRsp.user);
|
|
||||||
|
|
||||||
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp);
|
|
||||||
void *pRsp = rpcMallocCont(contLen);
|
|
||||||
if (pRsp == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSerializeSAuthReq(pRsp, contLen, &authRsp);
|
|
||||||
|
|
||||||
pReq->info.rsp = pRsp;
|
|
||||||
pReq->info.rspLen = contLen;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndCheckOperPrivilege(SMnode *pMnode, const char *user, EOperType operType) {
|
int32_t mndCheckOperPrivilege(SMnode *pMnode, const char *user, EOperType operType) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -185,15 +134,7 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
|
||||||
if (pUser->sysInfo) goto _OVER;
|
if (pUser->sysInfo) goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (operType == MND_OPER_ALTER_DB) {
|
if (operType == MND_OPER_ALTER_DB || operType == MND_OPER_DROP_DB || operType == MND_OPER_COMPACT_DB) {
|
||||||
if (strcmp(pUser->user, pDb->createUser) == 0 && pUser->sysInfo) goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (operType == MND_OPER_DROP_DB) {
|
|
||||||
if (strcmp(pUser->user, pDb->createUser) == 0 && pUser->sysInfo) goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (operType == MND_OPER_COMPACT_DB) {
|
|
||||||
if (strcmp(pUser->user, pDb->createUser) == 0 && pUser->sysInfo) goto _OVER;
|
if (strcmp(pUser->user, pDb->createUser) == 0 && pUser->sysInfo) goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,3 +161,12 @@ _OVER:
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *name) {
|
||||||
|
SDbObj *pDb = mndAcquireDb(pMnode, name);
|
||||||
|
if (pDb == NULL) return -1;
|
||||||
|
|
||||||
|
int32_t code = mndCheckDbPrivilege(pMnode, user, operType, pDb);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
return code;
|
||||||
|
}
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndProfile.h"
|
#include "mndProfile.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndQnode.h"
|
#include "mndQnode.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "systable.h"
|
#include "systable.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
|
|
||||||
#define SHOW_STEP_SIZE 100
|
#define SHOW_STEP_SIZE 100
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndSma.h"
|
#include "mndSma.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndInfoSchema.h"
|
#include "mndInfoSchema.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndSnode.h"
|
#include "mndSnode.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndInfoSchema.h"
|
#include "mndInfoSchema.h"
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "mndStream.h"
|
#include "mndStream.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
@ -437,10 +437,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckDbPrivilege(pMnode, user, MND_OPER_WRITE_DB, pDb) != 0) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfStbs = -1;
|
int32_t numOfStbs = -1;
|
||||||
if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
|
if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -542,19 +538,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO check read auth for source and write auth for target
|
|
||||||
#if 0
|
|
||||||
pDb = mndAcquireDb(pMnode, createStreamReq.sourceDB);
|
|
||||||
if (pDb == NULL) {
|
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// build stream obj from request
|
// build stream obj from request
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
|
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
|
||||||
|
@ -592,6 +575,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, streamObj.sourceDb) != 0) {
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, streamObj.targetDb) != 0) {
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
// execute creation
|
// execute creation
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
@ -641,13 +634,9 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
|
||||||
// todo check auth
|
return -1;
|
||||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
|
||||||
if (pUser == NULL) {
|
|
||||||
goto DROP_STREAM_OVER;
|
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "mndTopic.h"
|
#include "mndTopic.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
|
@ -480,11 +480,6 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
|
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -571,6 +566,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||||
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
|
@ -579,7 +578,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||||
|
|
||||||
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
#include "mndAuth.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
|
|
@ -116,7 +116,8 @@ typedef void *tsdbReaderT;
|
||||||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||||
|
|
||||||
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
|
int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList);
|
||||||
|
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId,
|
||||||
uint64_t taskId);
|
uint64_t taskId);
|
||||||
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
|
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
|
||||||
void *pMemRef);
|
void *pMemRef);
|
||||||
|
@ -195,7 +196,6 @@ struct SVnodeCfg {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TSKEY lastKey;
|
TSKEY lastKey;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
uint64_t groupId;
|
|
||||||
} STableKeyInfo;
|
} STableKeyInfo;
|
||||||
|
|
||||||
struct SMetaEntry {
|
struct SMetaEntry {
|
||||||
|
|
|
@ -121,7 +121,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub
|
||||||
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
|
||||||
SSubmitBlkRsp* pRsp);
|
SSubmitBlkRsp* pRsp);
|
||||||
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
|
||||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
|
||||||
uint64_t taskId);
|
uint64_t taskId);
|
||||||
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
||||||
void* pMemRef);
|
void* pMemRef);
|
||||||
|
|
|
@ -381,6 +381,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) {
|
||||||
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
|
for (int i = 0; i < taosArrayGetSize(tbUids); ++i) {
|
||||||
tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i);
|
tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i);
|
||||||
metaDropTableByUid(pMeta, *uid, NULL);
|
metaDropTableByUid(pMeta, *uid, NULL);
|
||||||
|
metaDebug("ttl drop table:%"PRId64, *uid);
|
||||||
}
|
}
|
||||||
metaULock(pMeta);
|
metaULock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -443,7 +444,6 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
|
||||||
// drop schema.db (todo)
|
// drop schema.db (todo)
|
||||||
}
|
}
|
||||||
|
|
||||||
metaError("ttl drop table:%s", e.name);
|
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
tdbFree(pData);
|
tdbFree(pData);
|
||||||
|
|
||||||
|
@ -976,7 +976,9 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
||||||
SDecoder dc = {0};
|
SDecoder dc = {0};
|
||||||
|
|
||||||
// get super table
|
// get super table
|
||||||
tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData);
|
if(tdbTbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData) != 0){
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
|
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
|
||||||
tbDbKey.version = *(int64_t *)pData;
|
tbDbKey.version = *(int64_t *)pData;
|
||||||
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
|
tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
|
||||||
|
|
|
@ -403,7 +403,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
.reader = pHandle->execHandle.pExecReader[i],
|
.reader = pHandle->execHandle.pExecReader[i],
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
.vnode = pTq->pVnode,
|
.vnode = pTq->pVnode,
|
||||||
.initTsdbReader = 1,
|
// .initTsdbReader = 1,
|
||||||
};
|
};
|
||||||
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||||
ASSERT(pHandle->execHandle.execCol.task[i]);
|
ASSERT(pHandle->execHandle.execCol.task[i]);
|
||||||
|
@ -479,7 +479,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
.reader = pStreamReader,
|
.reader = pStreamReader,
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
.vnode = pTq->pVnode,
|
.vnode = pTq->pVnode,
|
||||||
.initTsdbReader = 1,
|
// .initTsdbReader = 1,
|
||||||
};
|
};
|
||||||
/*pTask->exec.inputHandle = pStreamReader;*/
|
/*pTask->exec.inputHandle = pStreamReader;*/
|
||||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||||
|
|
|
@ -223,9 +223,8 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
|
||||||
return rows;
|
return rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pTableList) {
|
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, SArray* pTableList) {
|
||||||
size_t tableSize = taosArrayGetSize(pTableList->pTableList);
|
size_t tableSize = taosArrayGetSize(pTableList);
|
||||||
assert(tableSize >= 1);
|
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
|
SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
|
||||||
|
@ -235,7 +234,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
||||||
|
|
||||||
// todo apply the lastkey of table check to avoid to load header file
|
// todo apply the lastkey of table check to avoid to load header file
|
||||||
for (int32_t j = 0; j < tableSize; ++j) {
|
for (int32_t j = 0; j < tableSize; ++j) {
|
||||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j);
|
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList, j);
|
||||||
|
|
||||||
STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
|
STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
|
||||||
info.suid = pTsdbReadHandle->suid;
|
info.suid = pTsdbReadHandle->suid;
|
||||||
|
@ -254,8 +253,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
|
||||||
pTsdbReadHandle->idStr);
|
pTsdbReadHandle->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO group table according to the tag value.
|
|
||||||
taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
|
|
||||||
return pTableCheckInfo;
|
return pTableCheckInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -497,8 +494,21 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
|
int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){
|
||||||
|
STsdbReadHandle* pTsdbReadHandle = reader;
|
||||||
|
if(pTsdbReadHandle->pTableCheckInfo) taosArrayDestroy(pTsdbReadHandle->pTableCheckInfo);
|
||||||
|
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList);
|
||||||
|
if (pTsdbReadHandle->pTableCheckInfo == NULL) {
|
||||||
|
return TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
return TDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
|
||||||
uint64_t taskId) {
|
uint64_t taskId) {
|
||||||
|
if(taosArrayGetSize(tableList) == 0){
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
|
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
|
||||||
if (pTsdbReadHandle == NULL) {
|
if (pTsdbReadHandle == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -543,7 +553,7 @@ tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLis
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle,
|
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle,
|
||||||
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList->pTableList),
|
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList),
|
||||||
pTsdbReadHandle->idStr);
|
pTsdbReadHandle->idStr);
|
||||||
|
|
||||||
return (tsdbReaderT)pTsdbReadHandle;
|
return (tsdbReaderT)pTsdbReadHandle;
|
||||||
|
@ -639,7 +649,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList, qId, taskId);
|
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbReaderOpen(pVnode, pCond, pList->pTableList, qId, taskId);
|
||||||
if (pTsdbReadHandle == NULL) {
|
if (pTsdbReadHandle == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -2842,7 +2852,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id, .groupId = 0};
|
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id};
|
||||||
taosArrayPush(list, &info);
|
taosArrayPush(list, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3644,17 +3654,6 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
|
|
||||||
if (((STableCheckInfo*)key1)->tableId < ((STableCheckInfo*)key2)->tableId) {
|
|
||||||
return -1;
|
|
||||||
} else if (((STableCheckInfo*)key1)->tableId > ((STableCheckInfo*)key2)->tableId) {
|
|
||||||
return 1;
|
|
||||||
} else {
|
|
||||||
ASSERT(false);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
|
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
|
||||||
if (pColumnInfoData == NULL) {
|
if (pColumnInfoData == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -273,6 +273,10 @@ typedef struct STableScanInfo {
|
||||||
|
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
int32_t curTWinIdx;
|
int32_t curTWinIdx;
|
||||||
|
|
||||||
|
int32_t currentGroupId;
|
||||||
|
uint64_t queryId;
|
||||||
|
uint64_t taskId;
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STagScanInfo {
|
typedef struct STagScanInfo {
|
||||||
|
@ -706,7 +710,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
|
|
||||||
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId);
|
||||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
|
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
|
||||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
|
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
@ -749,8 +753,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode,
|
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode,
|
||||||
SExecTaskInfo* pTaskInfo);
|
SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
|
||||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
|
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId);
|
||||||
|
|
||||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
|
||||||
SExecTaskInfo* pTaskInfo);
|
SExecTaskInfo* pTaskInfo);
|
||||||
|
@ -845,7 +849,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
|
|
||||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
||||||
|
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey);
|
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -287,6 +287,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){
|
||||||
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
|
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
|
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||||
|
if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
uint64_t tableUid = pScanNode->uid;
|
uint64_t tableUid = pScanNode->uid;
|
||||||
|
|
||||||
|
@ -314,7 +315,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
for (int i = 0; i < taosArrayGetSize(res); i++) {
|
||||||
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
|
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};
|
||||||
taosArrayPush(pListInfo->pTableList, &info);
|
taosArrayPush(pListInfo->pTableList, &info);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(res);
|
taosArrayDestroy(res);
|
||||||
|
@ -335,9 +336,14 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}else { // Create one table group.
|
}else { // Create one table group.
|
||||||
STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0};
|
STableKeyInfo info = {.lastKey = 0, .uid = tableUid};
|
||||||
taosArrayPush(pListInfo->pTableList, &info);
|
taosArrayPush(pListInfo->pTableList, &info);
|
||||||
}
|
}
|
||||||
|
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
if(pListInfo->pGroupList == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
//put into list as default group, remove it if grouping sorting is required later
|
||||||
|
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3862,9 +3862,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
|
||||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
|
|
||||||
|
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
|
|
||||||
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
|
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
|
||||||
|
@ -3895,8 +3892,67 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey) {
|
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum){
|
||||||
if (groupKey == NULL) {
|
taosArrayClear(pTableListInfo->pGroupList);
|
||||||
|
SArray *sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
|
||||||
|
if(sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
|
||||||
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
|
||||||
|
|
||||||
|
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
|
||||||
|
if (index == -1){
|
||||||
|
void *p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
|
||||||
|
SArray *tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||||
|
if(tGroup == NULL) {
|
||||||
|
taosArrayDestroy(sortSupport);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
if(taosArrayPush(tGroup, info) == NULL){
|
||||||
|
qError("taos push info array error");
|
||||||
|
taosArrayDestroy(sortSupport);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
if(p == NULL){
|
||||||
|
if(taosArrayPush(sortSupport, groupId) != NULL){
|
||||||
|
qError("taos push support array error");
|
||||||
|
taosArrayDestroy(sortSupport);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
if(taosArrayPush(pTableListInfo->pGroupList, &tGroup) != NULL){
|
||||||
|
qError("taos push group array error");
|
||||||
|
taosArrayDestroy(sortSupport);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
|
||||||
|
if(taosArrayInsert(sortSupport, pos, groupId) == NULL){
|
||||||
|
qError("taos insert support array error");
|
||||||
|
taosArrayDestroy(sortSupport);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
if(taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL){
|
||||||
|
qError("taos insert group array error");
|
||||||
|
taosArrayDestroy(sortSupport);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}else{
|
||||||
|
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
|
||||||
|
if(taosArrayPush(tGroup, info) == NULL){
|
||||||
|
qError("taos push uid array error");
|
||||||
|
taosArrayDestroy(sortSupport);
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
taosArrayDestroy(sortSupport);
|
||||||
|
return TDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
|
||||||
|
if (group == NULL) {
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3906,13 +3962,14 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
}
|
}
|
||||||
int32_t keyLen = 0;
|
int32_t keyLen = 0;
|
||||||
void* keyBuf = NULL;
|
void* keyBuf = NULL;
|
||||||
int32_t numOfGroupCols = taosArrayGetSize(groupKey);
|
|
||||||
for (int32_t j = 0; j < numOfGroupCols; ++j) {
|
SNode* node;
|
||||||
SColumn* pCol = taosArrayGet(groupKey, j);
|
FOREACH(node, group) {
|
||||||
keyLen += pCol->bytes; // actual data + null_flag
|
SExprNode *pExpr = (SExprNode *)node;
|
||||||
|
keyLen += pExpr->resType.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
|
int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
|
||||||
keyLen += nullFlagSize;
|
keyLen += nullFlagSize;
|
||||||
|
|
||||||
keyBuf = taosMemoryCalloc(1, keyLen);
|
keyBuf = taosMemoryCalloc(1, keyLen);
|
||||||
|
@ -3920,59 +3977,68 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t groupNum = 0;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pHandle->meta, 0);
|
metaReaderInit(&mr, pHandle->meta, 0);
|
||||||
metaGetTableEntryByUid(&mr, info->uid);
|
metaGetTableEntryByUid(&mr, info->uid);
|
||||||
|
|
||||||
char* isNull = (char*)keyBuf;
|
SNodeList *groupNew = nodesCloneList(group);
|
||||||
char* pStart = (char*)keyBuf + sizeof(int8_t) * numOfGroupCols;
|
|
||||||
for (int32_t j = 0; j < numOfGroupCols; ++j) {
|
|
||||||
SColumn* pCol = taosArrayGet(groupKey, j);
|
|
||||||
|
|
||||||
if (strcmp(pCol->name, "tbname") == 0) {
|
nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
|
||||||
isNull[i] = 0;
|
char* isNull = (char*)keyBuf;
|
||||||
memcpy(pStart, mr.me.name, strlen(mr.me.name));
|
char* pStart = (char*)keyBuf + nullFlagSize;
|
||||||
pStart += strlen(mr.me.name);
|
|
||||||
|
SNode* pNode;
|
||||||
|
int32_t index = 0;
|
||||||
|
FOREACH(pNode, groupNew){
|
||||||
|
SNode* pNew = NULL;
|
||||||
|
int32_t code = scalarCalculateConstants(pNode, &pNew);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
REPLACE_NODE(pNew);
|
||||||
} else {
|
} else {
|
||||||
STagVal tagVal = {0};
|
taosMemoryFree(keyBuf);
|
||||||
tagVal.cid = pCol->colId;
|
nodesClearList(groupNew);
|
||||||
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal);
|
return code;
|
||||||
if (p == NULL) {
|
}
|
||||||
isNull[j] = 1;
|
|
||||||
continue;
|
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
|
||||||
}
|
SValueNode *pValue = (SValueNode *)pNew;
|
||||||
isNull[i] = 0;
|
|
||||||
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL) {
|
||||||
// int32_t dataLen = getJsonValueLen(pkey->pData);
|
isNull[index++] = 1;
|
||||||
// memcpy(pStart, (pkey->pData), dataLen);
|
continue;
|
||||||
// pStart += dataLen;
|
} else {
|
||||||
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
|
isNull[index++] = 0;
|
||||||
memcpy(pStart, tagVal.pData, tagVal.nData);
|
char* data = nodesGetValueFromNode(pValue);
|
||||||
pStart += tagVal.nData;
|
if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){
|
||||||
ASSERT(tagVal.nData <= pCol->bytes);
|
int32_t len = getJsonValueLen(data);
|
||||||
|
memcpy(pStart, data, len);
|
||||||
|
pStart += len;
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
|
||||||
|
memcpy(pStart, data, varDataTLen(data));
|
||||||
|
pStart += varDataTLen(data);
|
||||||
} else {
|
} else {
|
||||||
memcpy(pStart, &(tagVal.i64), pCol->bytes);
|
memcpy(pStart, data, pValue->node.resType.bytes);
|
||||||
pStart += pCol->bytes;
|
pStart += pValue->node.resType.bytes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
int32_t len = (int32_t)(pStart - (char*)keyBuf);
|
||||||
|
uint64_t groupId = calcGroupId(keyBuf, len);
|
||||||
uint64_t* pGroupId = taosHashGet(pTableListInfo->map, keyBuf, len);
|
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
|
||||||
|
groupNum++;
|
||||||
if (!pGroupId) {
|
|
||||||
uint64_t tmpId = calcGroupId(keyBuf, len);
|
|
||||||
info->groupId = tmpId;
|
|
||||||
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &tmpId, sizeof(uint64_t));
|
|
||||||
} else {
|
|
||||||
info->groupId = *pGroupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
nodesClearList(groupNew);
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
}
|
}
|
||||||
taosMemoryFree(keyBuf);
|
taosMemoryFree(keyBuf);
|
||||||
|
|
||||||
|
if(pTableListInfo->needSortTableByGroupId){
|
||||||
|
return sortTableGroup(pTableListInfo, groupNum);
|
||||||
|
}
|
||||||
|
|
||||||
return TDB_CODE_SUCCESS;
|
return TDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3984,39 +4050,36 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
|
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if(code){
|
||||||
pTaskInfo->code = terrno;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||||
int32_t code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbCleanupReadHandle(pDataReader);
|
|
||||||
pTaskInfo->code = terrno;
|
pTaskInfo->code = terrno;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
|
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
|
||||||
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
|
|
||||||
taosArrayDestroy(groupKeys);
|
|
||||||
if (code) {
|
|
||||||
tsdbCleanupReadHandle(pDataReader);
|
|
||||||
pTaskInfo->code = terrno;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
||||||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||||
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||||
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
if(code){
|
||||||
SOperatorInfo* pOperator =
|
return NULL;
|
||||||
createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
|
}
|
||||||
|
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
|
||||||
|
if (code) {
|
||||||
|
pTaskInfo->code = terrno;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
|
||||||
|
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -4025,46 +4088,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
|
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
STimeWindowAggSupp twSup = {
|
STimeWindowAggSupp twSup = {
|
||||||
.waterMark = pTableScanNode->watermark,
|
.waterMark = pTableScanNode->watermark,
|
||||||
.calTrigger = pTableScanNode->triggerType,
|
.calTrigger = pTableScanNode->triggerType,
|
||||||
.maxTs = INT64_MIN,
|
.maxTs = INT64_MIN,
|
||||||
};
|
};
|
||||||
tsdbReaderT pDataReader = NULL;
|
|
||||||
|
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
if (pHandle->initTsdbReader) {
|
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
|
||||||
// for stream
|
|
||||||
ASSERT(pHandle->vnode);
|
|
||||||
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
|
|
||||||
} else {
|
|
||||||
// for tq
|
|
||||||
ASSERT(pHandle->meta);
|
|
||||||
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
|
||||||
qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
|
|
||||||
// return NULL;
|
|
||||||
} else {
|
|
||||||
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
|
|
||||||
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
|
|
||||||
taosArrayDestroy(groupKeys);
|
|
||||||
if (code) {
|
|
||||||
tsdbCleanupReadHandle(pDataReader);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
|
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||||
|
@ -4093,7 +4127,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
} else { // Create one table group.
|
} else { // Create one table group.
|
||||||
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
|
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid};
|
||||||
taosArrayPush(pTableListInfo->pTableList, &info);
|
taosArrayPush(pTableListInfo->pTableList, &info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4118,7 +4152,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
cond.suid = pBlockNode->suid;
|
cond.suid = pBlockNode->suid;
|
||||||
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||||
}
|
}
|
||||||
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
|
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, queryId, taskId);
|
||||||
cleanupQueryTableDataCond(&cond);
|
cleanupQueryTableDataCond(&cond);
|
||||||
|
|
||||||
return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
|
return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
|
||||||
|
@ -4355,7 +4389,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderT pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
|
tsdbReaderT pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, queryId, taskId);
|
||||||
cleanupQueryTableDataCond(&cond);
|
cleanupQueryTableDataCond(&cond);
|
||||||
|
|
||||||
return pReader;
|
return pReader;
|
||||||
|
@ -4584,6 +4618,13 @@ _complete:
|
||||||
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
|
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
|
||||||
taosArrayDestroy(pTableqinfoList->pTableList);
|
taosArrayDestroy(pTableqinfoList->pTableList);
|
||||||
taosHashCleanup(pTableqinfoList->map);
|
taosHashCleanup(pTableqinfoList->map);
|
||||||
|
if(pTableqinfoList->needSortTableByGroupId){
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++){
|
||||||
|
SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
|
||||||
|
taosArrayDestroy(tmp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pTableqinfoList->pGroupList);
|
||||||
|
|
||||||
pTableqinfoList->pTableList = NULL;
|
pTableqinfoList->pTableList = NULL;
|
||||||
pTableqinfoList->map = NULL;
|
pTableqinfoList->map = NULL;
|
||||||
|
|
|
@ -418,7 +418,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
@ -500,6 +500,48 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
if(pInfo->currentGroupId == -1){
|
||||||
|
pInfo->currentGroupId++;
|
||||||
|
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
||||||
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
||||||
|
tsdbCleanupReadHandle(pInfo->dataReader);
|
||||||
|
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
|
||||||
|
pInfo->dataReader = pReader;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||||
|
if(result){
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->currentGroupId++;
|
||||||
|
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
||||||
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
||||||
|
tsdbSetTableList(pInfo->dataReader, tableList);
|
||||||
|
|
||||||
|
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
||||||
|
pInfo->curTWinIdx = 0;
|
||||||
|
pInfo->scanTimes = 0;
|
||||||
|
|
||||||
|
result = doTableScanGroup(pOperator);
|
||||||
|
if(result){
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -525,8 +567,8 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader,
|
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
|
||||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId) {
|
||||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -561,10 +603,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||||
pInfo->dataReader = pDataReader;
|
|
||||||
pInfo->scanFlag = MAIN_SCAN;
|
pInfo->scanFlag = MAIN_SCAN;
|
||||||
pInfo->pColMatchInfo = pColList;
|
pInfo->pColMatchInfo = pColList;
|
||||||
pInfo->curTWinIdx = 0;
|
pInfo->curTWinIdx = 0;
|
||||||
|
pInfo->queryId = queryId;
|
||||||
|
pInfo->taskId = taskId;
|
||||||
|
pInfo->currentGroupId = -1;
|
||||||
|
|
||||||
pOperator->name = "TableScanOperator"; // for debug purpose
|
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
|
@ -778,8 +822,9 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
|
||||||
pTableScanInfo->cond.twindows[0] = win;
|
pTableScanInfo->cond.twindows[0] = win;
|
||||||
pTableScanInfo->curTWinIdx = 0;
|
pTableScanInfo->curTWinIdx = 0;
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
|
pTableScanInfo->currentGroupId = -1;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1087,9 +1132,9 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
||||||
return tableIdList;
|
return tableIdList;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
|
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
|
||||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
|
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
|
||||||
STimeWindowAggSupp* pTwSup) {
|
STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId) {
|
||||||
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
|
||||||
|
@ -1129,7 +1174,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
|
||||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
||||||
if (pSTInfo->interval.interval > 0) {
|
if (pSTInfo->interval.interval > 0) {
|
||||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
||||||
|
@ -1889,11 +1934,12 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pTableList = pTableListInfo;
|
pInfo->pTableList = pTableListInfo;
|
||||||
pInfo->pColMatchInfo = colList;
|
pInfo->pColMatchInfo = colList;
|
||||||
pInfo->pRes = createResDataBlock(pDescNode);
|
pInfo->pRes = createResDataBlock(pDescNode);
|
||||||
pInfo->readHandle = *pReadHandle;
|
pInfo->readHandle = *pReadHandle;
|
||||||
pInfo->curPos = 0;
|
pInfo->curPos = 0;
|
||||||
|
|
||||||
pOperator->name = "TagScanOperator";
|
pOperator->name = "TagScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||||
|
|
||||||
|
@ -1919,10 +1965,7 @@ _error:
|
||||||
|
|
||||||
typedef struct STableMergeScanInfo {
|
typedef struct STableMergeScanInfo {
|
||||||
STableListInfo* tableListInfo;
|
STableListInfo* tableListInfo;
|
||||||
int32_t tableStartIndex;
|
int32_t currentGroupId;
|
||||||
int32_t tableEndIndex;
|
|
||||||
bool hasGroupId;
|
|
||||||
uint64_t groupId;
|
|
||||||
|
|
||||||
SArray* dataReaders; // array of tsdbReaderT*
|
SArray* dataReaders; // array of tsdbReaderT*
|
||||||
SReadHandle readHandle;
|
SReadHandle readHandle;
|
||||||
|
@ -1968,12 +2011,6 @@ typedef struct STableMergeScanInfo {
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
} STableMergeScanInfo;
|
} STableMergeScanInfo;
|
||||||
|
|
||||||
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
|
|
||||||
const STableKeyInfo* info1 = p1;
|
|
||||||
const STableKeyInfo* info2 = p2;
|
|
||||||
return info1->groupId - info2->groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
|
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
|
||||||
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
|
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
|
||||||
|
@ -1985,55 +2022,9 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
|
||||||
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
|
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
|
||||||
generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
|
|
||||||
if (groupKeys) {
|
|
||||||
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(groupKeys);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
|
||||||
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
|
|
||||||
uint64_t taskId) {
|
|
||||||
SQueryTableDataCond cond = {0};
|
|
||||||
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
return code;
|
||||||
}
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) {
|
|
||||||
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
|
|
||||||
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
|
||||||
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
|
|
||||||
|
|
||||||
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId);
|
|
||||||
taosArrayPush(arrayReader, &pReader);
|
|
||||||
|
|
||||||
taosArrayDestroy(subListInfo->pTableList);
|
|
||||||
taosMemoryFree(subListInfo);
|
|
||||||
}
|
|
||||||
cleanupQueryTableDataCond(&cond);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
_error:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
|
|
||||||
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, uint64_t queryId,
|
|
||||||
uint64_t taskId) {
|
|
||||||
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
|
|
||||||
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
|
|
||||||
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
|
||||||
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
|
|
||||||
|
|
||||||
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, subListInfo, queryId, taskId);
|
|
||||||
taosArrayPush(arrayReader, &pReader);
|
|
||||||
|
|
||||||
taosArrayDestroy(subListInfo->pTableList);
|
|
||||||
taosMemoryFree(subListInfo);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2225,32 +2216,34 @@ SArray* generateSortByTsInfo(int32_t order) {
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, SArray* tableList, SArray* arrayReader, uint64_t queryId,
|
||||||
|
uint64_t taskId) {
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(tableList); ++i) {
|
||||||
|
SArray* tmp = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||||
|
taosArrayPush(tmp, taosArrayGet(tableList, i));
|
||||||
|
|
||||||
|
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, tmp, queryId, taskId);
|
||||||
|
taosArrayPush(arrayReader, &pReader);
|
||||||
|
|
||||||
|
taosArrayDestroy(tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
{
|
SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId);
|
||||||
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
|
|
||||||
int32_t i = pInfo->tableStartIndex + 1;
|
|
||||||
for (; i < tableListSize; ++i) {
|
|
||||||
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
|
|
||||||
if (tableKeyInfo->groupId != pInfo->groupId) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pInfo->tableEndIndex = i - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tableStartIdx = pInfo->tableStartIndex;
|
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableList,
|
||||||
int32_t tableEndIdx = pInfo->tableEndIndex;
|
|
||||||
|
|
||||||
STableListInfo* tableListInfo = pInfo->tableListInfo;
|
|
||||||
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
|
|
||||||
pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
|
pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
|
||||||
|
|
||||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
||||||
// the additional one is reserved for merge result
|
// the additional one is reserved for merge result
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
int32_t tableLen = taosArrayGetSize(tableList);
|
||||||
|
pInfo->sortBufSize = pInfo->bufPageSize * ((tableLen==0?1:tableLen) + 1);
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pSortInputBlock, pTaskInfo->id.str);
|
pInfo->pSortInputBlock, pTaskInfo->id.str);
|
||||||
|
@ -2337,38 +2330,43 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
|
|
||||||
if (!pInfo->hasGroupId) {
|
|
||||||
pInfo->hasGroupId = true;
|
|
||||||
|
|
||||||
if (tableListSize == 0) {
|
if (pInfo->currentGroupId == -1) {
|
||||||
|
pInfo->currentGroupId++;
|
||||||
|
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pInfo->tableStartIndex = 0;
|
|
||||||
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
|
|
||||||
startGroupTableMergeScan(pOperator);
|
startGroupTableMergeScan(pOperator);
|
||||||
}
|
}
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
||||||
while (pInfo->tableStartIndex < tableListSize) {
|
if (pBlock != NULL) {
|
||||||
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
|
||||||
if (pBlock != NULL) {
|
if(groupId) pBlock->info.groupId = *groupId;
|
||||||
pBlock->info.groupId = pInfo->groupId;
|
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
return pBlock;
|
return pBlock;
|
||||||
} else {
|
|
||||||
stopGroupTableMergeScan(pOperator);
|
|
||||||
if (pInfo->tableEndIndex >= tableListSize - 1) {
|
|
||||||
doSetOperatorCompleted(pOperator);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
|
||||||
pInfo->groupId =
|
|
||||||
((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
|
|
||||||
startGroupTableMergeScan(pOperator);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stopGroupTableMergeScan(pOperator);
|
||||||
|
pInfo->currentGroupId++;
|
||||||
|
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
startGroupTableMergeScan(pOperator);
|
||||||
|
|
||||||
|
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
||||||
|
if (pBlock != NULL) {
|
||||||
|
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
|
||||||
|
if(groupId) pBlock->info.groupId = *groupId;
|
||||||
|
|
||||||
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
doSetOperatorCompleted(pOperator);
|
||||||
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2445,6 +2443,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
|
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
|
||||||
pInfo->queryId = queryId;
|
pInfo->queryId = queryId;
|
||||||
pInfo->taskId = taskId;
|
pInfo->taskId = taskId;
|
||||||
|
pInfo->currentGroupId = -1;
|
||||||
|
|
||||||
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
|
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
|
||||||
|
|
||||||
|
|
|
@ -1220,6 +1220,7 @@ int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value) {
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
case TSDB_DATA_TYPE_VARCHAR:
|
case TSDB_DATA_TYPE_VARCHAR:
|
||||||
case TSDB_DATA_TYPE_VARBINARY:
|
case TSDB_DATA_TYPE_VARBINARY:
|
||||||
|
case TSDB_DATA_TYPE_JSON:
|
||||||
pNode->datum.p = (char*)value;
|
pNode->datum.p = (char*)value;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -1061,6 +1061,7 @@ static EDealRes partTagsOptRebuildTbanmeImpl(SNode** pNode, void* pContext) {
|
||||||
}
|
}
|
||||||
strcpy(pFunc->functionName, "tbname");
|
strcpy(pFunc->functionName, "tbname");
|
||||||
pFunc->funcType = FUNCTION_TYPE_TBNAME;
|
pFunc->funcType = FUNCTION_TYPE_TBNAME;
|
||||||
|
pFunc->node.resType = ((SColumnNode*)*pNode)->node.resType;
|
||||||
nodesDestroyNode(*pNode);
|
nodesDestroyNode(*pNode);
|
||||||
*pNode = (SNode*)pFunc;
|
*pNode = (SNode*)pFunc;
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
|
@ -1188,7 +1189,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
|
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
|
||||||
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
|
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
|
||||||
{.pName = "SmaIndex", .optimizeFunc = smaOptimize},
|
{.pName = "SmaIndex", .optimizeFunc = smaOptimize},
|
||||||
// {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
|
{.pName = "PartitionTags", .optimizeFunc = partTagsOptimize},
|
||||||
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}
|
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
|
@ -13,9 +13,6 @@ print user sysinfo0 login
|
||||||
sql close
|
sql close
|
||||||
sql connect sysinfo0
|
sql connect sysinfo0
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop
|
|
||||||
return
|
|
||||||
|
|
||||||
print =============== check oper
|
print =============== check oper
|
||||||
sql_error create user u1 pass 'u1'
|
sql_error create user u1 pass 'u1'
|
||||||
sql_error drop user sysinfo1
|
sql_error drop user sysinfo1
|
||||||
|
|
|
@ -412,52 +412,59 @@ class TDTestCase:
|
||||||
tdSql.checkColNameList(res, cname_list)
|
tdSql.checkColNameList(res, cname_list)
|
||||||
#
|
#
|
||||||
# test group by & order by json tag
|
# test group by & order by json tag
|
||||||
|
tdSql.query("select ts,jtag->'tag1' from jsons1 partition by jtag->'tag1' order by jtag->'tag1' desc")
|
||||||
|
tdSql.checkRows(11)
|
||||||
|
tdSql.checkData(0, 1, '"femail"')
|
||||||
|
tdSql.checkData(2, 1, '"收到货"')
|
||||||
|
tdSql.checkData(7, 1, "false")
|
||||||
|
|
||||||
|
|
||||||
# tdSql.error("select count(*) from jsons1 group by jtag")
|
# tdSql.error("select count(*) from jsons1 group by jtag")
|
||||||
# tdSql.error("select count(*) from jsons1 partition by jtag")
|
# tdSql.error("select count(*) from jsons1 partition by jtag")
|
||||||
# tdSql.error("select count(*) from jsons1 group by jtag order by jtag")
|
# tdSql.error("select count(*) from jsons1 group by jtag order by jtag")
|
||||||
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag->'tag2'")
|
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag->'tag2'")
|
||||||
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag")
|
tdSql.error("select count(*) from jsons1 group by jtag->'tag1' order by jtag")
|
||||||
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' desc")
|
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' desc")
|
||||||
tdSql.checkRows(8)
|
# tdSql.checkRows(8)
|
||||||
tdSql.checkData(0, 0, 2)
|
# tdSql.checkData(0, 0, 2)
|
||||||
tdSql.checkData(0, 1, '"femail"')
|
# tdSql.checkData(0, 1, '"femail"')
|
||||||
tdSql.checkData(1, 0, 2)
|
# tdSql.checkData(1, 0, 2)
|
||||||
tdSql.checkData(1, 1, '"收到货"')
|
# tdSql.checkData(1, 1, '"收到货"')
|
||||||
tdSql.checkData(2, 0, 1)
|
# tdSql.checkData(2, 0, 1)
|
||||||
tdSql.checkData(2, 1, "11.000000000")
|
# tdSql.checkData(2, 1, "11.000000000")
|
||||||
tdSql.checkData(5, 0, 1)
|
# tdSql.checkData(5, 0, 1)
|
||||||
tdSql.checkData(5, 1, "false")
|
# tdSql.checkData(5, 1, "false")
|
||||||
tdSql.checkData(6, 0, 1)
|
# tdSql.checkData(6, 0, 1)
|
||||||
tdSql.checkData(6, 1, "null")
|
# tdSql.checkData(6, 1, "null")
|
||||||
tdSql.checkData(7, 0, 2)
|
# tdSql.checkData(7, 0, 2)
|
||||||
tdSql.checkData(7, 1, None)
|
# tdSql.checkData(7, 1, None)
|
||||||
|
|
||||||
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' asc")
|
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1' asc")
|
||||||
tdSql.checkRows(8)
|
# tdSql.checkRows(8)
|
||||||
tdSql.checkData(0, 0, 2)
|
# tdSql.checkData(0, 0, 2)
|
||||||
tdSql.checkData(0, 1, None)
|
# tdSql.checkData(0, 1, None)
|
||||||
tdSql.checkData(2, 0, 1)
|
# tdSql.checkData(2, 0, 1)
|
||||||
tdSql.checkData(2, 1, "false")
|
# tdSql.checkData(2, 1, "false")
|
||||||
tdSql.checkData(5, 0, 1)
|
# tdSql.checkData(5, 0, 1)
|
||||||
tdSql.checkData(5, 1, "11.000000000")
|
# tdSql.checkData(5, 1, "11.000000000")
|
||||||
tdSql.checkData(7, 0, 2)
|
# tdSql.checkData(7, 0, 2)
|
||||||
tdSql.checkData(7, 1, '"femail"')
|
# tdSql.checkData(7, 1, '"femail"')
|
||||||
#
|
#
|
||||||
# test stddev with group by json tag
|
# test stddev with group by json tag
|
||||||
tdSql.query("select stddev(dataint),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
# tdSql.query("select stddev(dataint),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
||||||
tdSql.checkRows(8)
|
# tdSql.checkRows(8)
|
||||||
tdSql.checkData(0, 0, 10)
|
# tdSql.checkData(0, 0, 10)
|
||||||
tdSql.checkData(0, 1, None)
|
# tdSql.checkData(0, 1, None)
|
||||||
tdSql.checkData(4, 0, 0)
|
# tdSql.checkData(4, 0, 0)
|
||||||
tdSql.checkData(4, 1, "5.000000000")
|
# tdSql.checkData(4, 1, "5.000000000")
|
||||||
tdSql.checkData(7, 0, 11)
|
# tdSql.checkData(7, 0, 11)
|
||||||
tdSql.checkData(7, 1, '"femail"')
|
# tdSql.checkData(7, 1, '"femail"')
|
||||||
|
#
|
||||||
res = tdSql.getColNameList("select stddev(dataint),jsons1.jtag->'tag1' from jsons1 group by jsons1.jtag->'tag1' order by jtag->'tag1'")
|
# res = tdSql.getColNameList("select stddev(dataint),jsons1.jtag->'tag1' from jsons1 group by jsons1.jtag->'tag1' order by jtag->'tag1'")
|
||||||
cname_list = []
|
# cname_list = []
|
||||||
cname_list.append("stddev(dataint)")
|
# cname_list.append("stddev(dataint)")
|
||||||
cname_list.append("jsons1.jtag->'tag1'")
|
# cname_list.append("jsons1.jtag->'tag1'")
|
||||||
tdSql.checkColNameList(res, cname_list)
|
# tdSql.checkColNameList(res, cname_list)
|
||||||
|
|
||||||
# test top/bottom with group by json tag
|
# test top/bottom with group by json tag
|
||||||
# tdSql.query("select top(dataint,2),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
# tdSql.query("select top(dataint,2),jtag->'tag1' from jsons1 group by jtag->'tag1' order by jtag->'tag1'")
|
||||||
|
@ -470,8 +477,8 @@ class TDTestCase:
|
||||||
# tdSql.checkData(10, 1, '"femail"')
|
# tdSql.checkData(10, 1, '"femail"')
|
||||||
|
|
||||||
# test having
|
# test having
|
||||||
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
|
# tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
|
||||||
tdSql.checkRows(3)
|
# tdSql.checkRows(3)
|
||||||
|
|
||||||
# subquery with json tag
|
# subquery with json tag
|
||||||
tdSql.query("select * from (select jtag, dataint from jsons1) order by dataint")
|
tdSql.query("select * from (select jtag, dataint from jsons1) order by dataint")
|
||||||
|
|
Loading…
Reference in New Issue