Merge remote-tracking branch 'origin/3.0' into feature/qnode
This commit is contained in:
commit
7debddd23f
|
@ -14,3 +14,6 @@
|
|||
path = tests
|
||||
url = https://github.com/taosdata/tests
|
||||
branch = 3.0
|
||||
[submodule "examples/rust"]
|
||||
path = examples/rust
|
||||
url = https://github.com/songtianyi/tdengine-rust-bindings.git
|
||||
|
|
|
@ -28,7 +28,7 @@ int32_t init_env() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Subproject commit 1c8924dc668e6aa848214c2fc54e3ace3f5bf8df
|
|
@ -100,7 +100,7 @@ typedef enum _mgmt_table {
|
|||
TSDB_MGMT_TABLE_STREAMS,
|
||||
TSDB_MGMT_TABLE_VARIABLES,
|
||||
TSDB_MGMT_TABLE_CONNS,
|
||||
TSDB_MGMT_TABLE_SCORES,
|
||||
TSDB_MGMT_TABLE_TRANS,
|
||||
TSDB_MGMT_TABLE_GRANTS,
|
||||
TSDB_MGMT_TABLE_VNODES,
|
||||
TSDB_MGMT_TABLE_CLUSTER,
|
||||
|
@ -1733,7 +1733,7 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int64_t consumerId;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
} SMqSetCVgRsp;
|
||||
|
||||
typedef struct {
|
||||
|
@ -1741,9 +1741,42 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int64_t consumerId;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
} SMqMVRebRsp;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int64_t offset;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
} SMqOffset;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
SMqOffset* offsets;
|
||||
} SMqCMResetOffsetReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t reserved;
|
||||
} SMqCMResetOffsetRsp;
|
||||
|
||||
typedef struct {
|
||||
int32_t num;
|
||||
SMqOffset* offsets;
|
||||
} SMqMVResetOffsetReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t reserved;
|
||||
} SMqMVResetOffsetRsp;
|
||||
|
||||
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
|
||||
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
|
||||
int32_t tEncodeSMqCMResetOffsetReq(SCoder* encoder, const SMqCMResetOffsetReq* pReq);
|
||||
int32_t tDecodeSMqCMResetOffsetReq(SCoder* decoder, SMqCMResetOffsetReq* pReq);
|
||||
|
||||
int32_t tEncodeSMqMVResetOffsetReq(SCoder* encoder, const SMqMVResetOffsetReq* pReq);
|
||||
int32_t tDecodeSMqMVResetOffsetReq(SCoder* decoder, SMqMVResetOffsetReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
uint32_t nCols;
|
||||
SSchema* pSchema;
|
||||
|
|
|
@ -142,6 +142,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_RESET_OFFSET, "mnode-reset-offset", SMqCMResetOffsetReq, SMqCMResetOffsetRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
|
||||
|
|
|
@ -100,7 +100,7 @@ SNode* nodesMakeNode(ENodeType type);
|
|||
void nodesDestroyNode(SNode* pNode);
|
||||
|
||||
SNodeList* nodesMakeList();
|
||||
SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode);
|
||||
int32_t nodesListAppend(SNodeList* pList, SNode* pNode);
|
||||
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
|
||||
SNode* nodesListGetNode(SNodeList* pList, int32_t index);
|
||||
void nodesDestroyList(SNodeList* pList);
|
||||
|
|
|
@ -50,6 +50,7 @@ typedef enum EColumnType {
|
|||
|
||||
typedef struct SColumnNode {
|
||||
SExprNode node; // QUERY_NODE_COLUMN
|
||||
uint64_t tableId;
|
||||
int16_t colId;
|
||||
EColumnType colType; // column or tag
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
|
@ -59,10 +60,11 @@ typedef struct SColumnNode {
|
|||
SNode* pProjectRef;
|
||||
} SColumnNode;
|
||||
|
||||
typedef struct SColumnRef {
|
||||
typedef struct SColumnRefNode {
|
||||
ENodeType type;
|
||||
int32_t tupleId;
|
||||
int32_t slotId;
|
||||
} SColumnRef;
|
||||
} SColumnRefNode;
|
||||
|
||||
typedef struct SValueNode {
|
||||
SExprNode node; // QUERY_NODE_VALUE
|
||||
|
@ -269,6 +271,25 @@ typedef struct SSetOperator {
|
|||
SNode* pLimit;
|
||||
} SSetOperator;
|
||||
|
||||
typedef enum ESqlClause {
|
||||
SQL_CLAUSE_FROM = 1,
|
||||
SQL_CLAUSE_WHERE,
|
||||
SQL_CLAUSE_PARTITION_BY,
|
||||
SQL_CLAUSE_WINDOW,
|
||||
SQL_CLAUSE_GROUP_BY,
|
||||
SQL_CLAUSE_HAVING,
|
||||
SQL_CLAUSE_SELECT,
|
||||
SQL_CLAUSE_ORDER_BY
|
||||
} ESqlClause;
|
||||
|
||||
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
|
||||
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext);
|
||||
|
||||
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, uint64_t tableId, bool realCol, SNodeList** pCols);
|
||||
|
||||
typedef bool (*FFuncClassifier)(int32_t funcId);
|
||||
int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs);
|
||||
|
||||
bool nodesIsExprNode(const SNode* pNode);
|
||||
|
||||
bool nodesIsArithmeticOp(const SOperatorNode* pOp);
|
||||
|
|
|
@ -113,6 +113,7 @@ typedef struct STableMetaOutput {
|
|||
typedef struct SDataBuf {
|
||||
void *pData;
|
||||
uint32_t len;
|
||||
void *handle;
|
||||
} SDataBuf;
|
||||
|
||||
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
|
|
|
@ -459,7 +459,6 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260A) //Not a GROUP BY expression
|
||||
#define TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260B) //Not SELECTed expression
|
||||
#define TSDB_CODE_PAR_NOT_SINGLE_GROUP TAOS_DEF_ERROR_CODE(0, 0x260C) //Not a single-group group function
|
||||
#define TSDB_CODE_PAR_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x260D) //Out of memory
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -216,6 +216,10 @@ do { \
|
|||
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
||||
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
||||
|
||||
#define TSDB_TRANS_STAGE_LEN 12
|
||||
#define TSDB_TRANS_DESC_LEN 16
|
||||
#define TSDB_TRANS_ERROR_LEN 128
|
||||
|
||||
#define TSDB_STEP_NAME_LEN 32
|
||||
#define TSDB_STEP_DESC_LEN 128
|
||||
|
||||
|
|
|
@ -395,8 +395,10 @@ static void* hbThreadFunc(void* param) {
|
|||
hbClearReqInfo(pAppHbMgr);
|
||||
break;
|
||||
}
|
||||
|
||||
tSerializeSClientHbBatchReq(buf, tlen, pReq);
|
||||
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
|
||||
SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
|
||||
if (pInfo == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
tFreeClientHbBatchReq(pReq, false);
|
||||
|
|
|
@ -30,17 +30,11 @@ static bool stringLengthCheck(const char* str, size_t maxsize) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool validateUserName(const char* user) {
|
||||
return stringLengthCheck(user, TSDB_USER_LEN - 1);
|
||||
}
|
||||
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
|
||||
|
||||
static bool validatePassword(const char* passwd) {
|
||||
return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
|
||||
}
|
||||
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
|
||||
|
||||
static bool validateDbName(const char* db) {
|
||||
return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
|
||||
}
|
||||
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
|
||||
|
||||
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
|
||||
char key[512] = {0};
|
||||
|
@ -48,10 +42,12 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
|
|||
return strdup(key);
|
||||
}
|
||||
|
||||
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
|
||||
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
||||
SAppInstInfo* pAppInfo);
|
||||
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
|
||||
|
||||
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
|
||||
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port) {
|
||||
if (taos_init() != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -174,7 +170,7 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
|
|||
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
||||
SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
|
||||
pRequest->type = pDcl->msgType;
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen, .handle = NULL};
|
||||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
|
||||
|
@ -322,7 +318,8 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
|
|||
return 0;
|
||||
}
|
||||
|
||||
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
|
||||
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
||||
SAppInstInfo* pAppInfo) {
|
||||
STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
|
||||
if (NULL == pTscObj) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
@ -343,14 +340,16 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t
|
|||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
|
||||
const char* errorMsg =
|
||||
(pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
|
||||
printf("failed to connect to server, reason: %s\n\n", errorMsg);
|
||||
|
||||
destroyRequest(pRequest);
|
||||
taos_close(pTscObj);
|
||||
pTscObj = NULL;
|
||||
} else {
|
||||
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
|
||||
tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
|
||||
pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
|
||||
destroyRequest(pRequest);
|
||||
}
|
||||
|
||||
|
@ -365,11 +364,13 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
|
|||
}
|
||||
|
||||
pMsgSendInfo->msgType = TDMT_MND_CONNECT;
|
||||
|
||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
||||
pMsgSendInfo->requestId = pRequest->requestId;
|
||||
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
|
||||
pMsgSendInfo->param = pRequest;
|
||||
|
||||
|
||||
SConnectReq connectReq = {0};
|
||||
STscObj* pObj = pRequest->pTscObj;
|
||||
|
||||
|
@ -432,7 +433,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|||
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
||||
}
|
||||
|
||||
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
|
||||
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
|
||||
|
||||
if (pMsg->contLen > 0) {
|
||||
buf.pData = calloc(1, pMsg->contLen);
|
||||
|
@ -463,7 +464,8 @@ TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, cons
|
|||
return taos_connect_internal(ip, user, NULL, auth, db, port);
|
||||
}
|
||||
|
||||
TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
|
||||
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
|
||||
const char* db, int dbLen, uint16_t port) {
|
||||
char ipStr[TSDB_EP_LEN] = {0};
|
||||
char dbStr[TSDB_DB_NAME_LEN] = {0};
|
||||
char userStr[TSDB_USER_LEN] = {0};
|
||||
|
@ -497,8 +499,8 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
}
|
||||
|
||||
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
|
||||
tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows,
|
||||
pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
||||
|
||||
if (pResultInfo->numOfRows == 0) {
|
||||
return NULL;
|
||||
|
|
|
@ -97,9 +97,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
|
|||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq);
|
||||
void* pReq = malloc(contLen);
|
||||
tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq);
|
||||
|
||||
pMsgSendInfo->msgInfo.pData = pReq;
|
||||
pMsgSendInfo->msgInfo.len = contLen;
|
||||
pMsgSendInfo->msgInfo.handle = NULL;
|
||||
} else {
|
||||
SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
|
||||
if (pFetchMsg == NULL) {
|
||||
|
@ -111,6 +111,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
|
|||
|
||||
pMsgSendInfo->msgInfo.pData = pFetchMsg;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq);
|
||||
pMsgSendInfo->msgInfo.handle = NULL;
|
||||
}
|
||||
} else {
|
||||
assert(pRequest != NULL);
|
||||
|
|
|
@ -35,9 +35,7 @@ struct tmq_list_t {
|
|||
};
|
||||
|
||||
struct tmq_topic_vgroup_t {
|
||||
char* topic;
|
||||
int32_t vgId;
|
||||
int64_t offset;
|
||||
SMqOffset offset;
|
||||
};
|
||||
|
||||
struct tmq_topic_vgroup_list_t {
|
||||
|
@ -123,6 +121,12 @@ typedef struct {
|
|||
tsem_t rspSem;
|
||||
} SMqCommitCbParam;
|
||||
|
||||
typedef struct {
|
||||
tmq_t* tmq;
|
||||
tsem_t rspSem;
|
||||
tmq_resp_err_t rspErr;
|
||||
} SMqResetOffsetParam;
|
||||
|
||||
tmq_conf_t* tmq_conf_new() {
|
||||
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
|
||||
conf->auto_commit = false;
|
||||
|
@ -173,12 +177,6 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
||||
// build msg
|
||||
// send to mnode
|
||||
return TMQ_RESP_ERR__SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
|
||||
pParam->rspErr = code;
|
||||
|
@ -196,6 +194,13 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tmqResetOffsetCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SMqResetOffsetParam* pParam = (SMqResetOffsetParam*)param;
|
||||
pParam->rspErr = code;
|
||||
tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||
tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
|
||||
if (pTmq == NULL) {
|
||||
|
@ -218,6 +223,55 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
|||
return pTmq;
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
|
||||
SRequestObj* pRequest = NULL;
|
||||
// build msg
|
||||
// send to mnode
|
||||
SMqCMResetOffsetReq req;
|
||||
req.num = offsets->cnt;
|
||||
req.offsets = (SMqOffset*)offsets->elems;
|
||||
|
||||
SCoder encoder;
|
||||
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
|
||||
tEncodeSMqCMResetOffsetReq(&encoder, &req);
|
||||
int32_t tlen = encoder.pos;
|
||||
void* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
tCoderClear(&encoder);
|
||||
return -1;
|
||||
}
|
||||
tCoderClear(&encoder);
|
||||
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
|
||||
tEncodeSMqCMResetOffsetReq(&encoder, &req);
|
||||
tCoderClear(&encoder);
|
||||
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_RESET_OFFSET);
|
||||
if (pRequest == NULL) {
|
||||
tscError("failed to malloc request");
|
||||
}
|
||||
|
||||
SMqResetOffsetParam param = {0};
|
||||
tsem_init(¶m.rspSem, 0, 0);
|
||||
param.tmq = tmq;
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->param = ¶m;
|
||||
sendInfo->fp = tmqResetOffsetCb;
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
tsem_wait(¶m.rspSem);
|
||||
tsem_destroy(¶m.rspSem);
|
||||
|
||||
return TMQ_RESP_ERR__SUCCESS;
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||
SRequestObj* pRequest = NULL;
|
||||
int32_t sz = topic_list->cnt;
|
||||
|
@ -244,6 +298,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
|||
|
||||
char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN);
|
||||
if (topicFname == NULL) {
|
||||
goto _return;
|
||||
}
|
||||
tNameExtractFullName(&name, topicFname);
|
||||
tscDebug("subscribe topic: %s", topicFname);
|
||||
|
@ -251,9 +306,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
|||
.nextVgIdx = 0, .sql = NULL, .sqlLen = 0, .topicId = 0, .topicName = topicFname, .vgs = NULL};
|
||||
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
|
||||
taosArrayPush(tmq->clientTopics, &topic);
|
||||
/*SMqClientTopic topic = {*/
|
||||
/*.*/
|
||||
/*};*/
|
||||
taosArrayPush(req.topicNames, &topicFname);
|
||||
free(dbName);
|
||||
}
|
||||
|
@ -270,13 +322,13 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
|||
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
|
||||
if (pRequest == NULL) {
|
||||
tscError("failed to malloc sqlObj");
|
||||
tscError("failed to malloc request");
|
||||
}
|
||||
|
||||
SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq};
|
||||
tsem_init(¶m.rspSem, 0, 0);
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->param = ¶m;
|
||||
|
@ -401,7 +453,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
tSerializeMCreateTopicReq(buf, tlen, &req);
|
||||
/*printf("formatted: %s\n", dagStr);*/
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
|
||||
pRequest->type = TDMT_MND_CREATE_TOPIC;
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
|
@ -727,8 +779,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
param->pVg = pVg;
|
||||
tsem_init(¶m->rspSem, 0, 0);
|
||||
|
||||
|
||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL};
|
||||
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->requestObjRefId = 0;
|
||||
|
|
|
@ -2306,3 +2306,57 @@ int32_t tDeserializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
|
|||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqOffset(SCoder *encoder, const SMqOffset *pOffset) {
|
||||
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
|
||||
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
|
||||
if (tEncodeCStr(encoder, pOffset->topicName) < 0) return -1;
|
||||
if (tEncodeCStr(encoder, pOffset->cgroup) < 0) return -1;
|
||||
return encoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSMqOffset(SCoder *decoder, SMqOffset *pOffset) {
|
||||
if (tDecodeI32(decoder, &pOffset->vgId) < 0) return -1;
|
||||
if (tDecodeI64(decoder, &pOffset->offset) < 0) return -1;
|
||||
if (tDecodeCStrTo(decoder, pOffset->topicName) < 0) return -1;
|
||||
if (tDecodeCStrTo(decoder, pOffset->cgroup) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqCMResetOffsetReq(SCoder *encoder, const SMqCMResetOffsetReq *pReq) {
|
||||
if (tStartEncode(encoder) < 0) return -1;
|
||||
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
|
||||
}
|
||||
tEndEncode(encoder);
|
||||
return encoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSMqCMResetOffsetReq(SCoder *decoder, SMqCMResetOffsetReq *pReq) {
|
||||
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
|
||||
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
|
||||
if (pReq->offsets == NULL) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqMVResetOffsetReq(SCoder *encoder, const SMqMVResetOffsetReq *pReq) {
|
||||
if (tEncodeI32(encoder, pReq->num) < 0) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
tEncodeSMqOffset(encoder, &pReq->offsets[i]);
|
||||
}
|
||||
return encoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSMqMVResetOffsetReq(SCoder *decoder, SMqMVResetOffsetReq *pReq) {
|
||||
if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
|
||||
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder);
|
||||
if (pReq->offsets == NULL) return -1;
|
||||
for (int32_t i = 0; i < pReq->num; i++) {
|
||||
tDecodeSMqOffset(decoder, &pReq->offsets[i]);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -104,6 +104,10 @@ typedef enum {
|
|||
TRN_STAGE_FINISHED = 8
|
||||
} ETrnStage;
|
||||
|
||||
typedef enum {
|
||||
TRN_TYPE_CREATE_DB = 0,
|
||||
} ETrnType;
|
||||
|
||||
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
|
||||
|
||||
typedef enum {
|
||||
|
@ -135,6 +139,12 @@ typedef struct {
|
|||
SArray* commitLogs;
|
||||
SArray* redoActions;
|
||||
SArray* undoActions;
|
||||
int64_t createdTime;
|
||||
int64_t lastExecTime;
|
||||
int32_t transType;
|
||||
uint64_t dbUid;
|
||||
char dbname[TSDB_DB_NAME_LEN];
|
||||
char lastError[TSDB_TRANS_DESC_LEN];
|
||||
} STrans;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -914,6 +914,8 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
|
|||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
|
||||
|
|
|
@ -295,8 +295,8 @@ char *mndShowStr(int32_t showType) {
|
|||
return "show configs";
|
||||
case TSDB_MGMT_TABLE_CONNS:
|
||||
return "show connections";
|
||||
case TSDB_MGMT_TABLE_SCORES:
|
||||
return "show scores";
|
||||
case TSDB_MGMT_TABLE_TRANS:
|
||||
return "show trans";
|
||||
case TSDB_MGMT_TABLE_GRANTS:
|
||||
return "show grants";
|
||||
case TSDB_MGMT_TABLE_VNODES:
|
||||
|
|
|
@ -420,7 +420,10 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
if (pVgroup->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pReq == NULL) {
|
||||
|
@ -455,7 +458,10 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
if (pVgroup->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
|
@ -942,7 +948,10 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
if (pVgroup->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pReq == NULL) {
|
||||
|
@ -1116,7 +1125,10 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
if (pVgroup->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
|
|
|
@ -134,8 +134,6 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
|
|||
|
||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
|
||||
ASSERT(pConsumerEp->oldConsumerId != -1);
|
||||
int32_t vgId = pConsumerEp->vgId;
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||
|
||||
void *buf;
|
||||
int32_t tlen;
|
||||
|
@ -143,6 +141,9 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t vgId = pConsumerEp->vgId;
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
action.pCont = buf;
|
||||
|
@ -180,15 +181,15 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum
|
|||
}
|
||||
|
||||
static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
|
||||
int32_t vgId = pConsumerEp->vgId;
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||
|
||||
void *buf;
|
||||
int32_t tlen;
|
||||
if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t vgId = pConsumerEp->vgId;
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
action.pCont = buf;
|
||||
|
@ -345,6 +346,14 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
}
|
||||
if (status == MQ_CONSUMER_STATUS__MODIFY) {
|
||||
int32_t removeSz = taosArrayGetSize(pConsumer->recentRemovedTopics);
|
||||
for (int32_t i = 0; i < removeSz; i++) {
|
||||
char *topicName = taosArrayGet(pConsumer->recentRemovedTopics, i);
|
||||
free(topicName);
|
||||
}
|
||||
taosArrayClear(pConsumer->recentRemovedTopics);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
|
||||
|
@ -714,7 +723,10 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM
|
|||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pTopic->dbUid) continue;
|
||||
if (pVgroup->dbUid != pTopic->dbUid) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
pSub->vgNum++;
|
||||
plan->execNode.nodeId = pVgroup->vgId;
|
||||
|
@ -748,7 +760,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT
|
|||
const SMqConsumerEp *pConsumerEp) {
|
||||
ASSERT(pConsumerEp->oldConsumerId == -1);
|
||||
int32_t vgId = pConsumerEp->vgId;
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||
|
||||
SMqSetCVgReq req = {
|
||||
.vgId = vgId,
|
||||
|
@ -776,6 +787,8 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT
|
|||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tEncodeSMqSetCVgReq(&abuf, &req);
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
action.pCont = buf;
|
||||
|
@ -1013,6 +1026,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
char *oldTopicNameDup = strdup(oldTopicName);
|
||||
taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
|
||||
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
|
||||
/*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
|
||||
} else if (newTopicName != NULL) {
|
||||
|
|
|
@ -15,7 +15,10 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndTrans.h"
|
||||
#include "mndAuth.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndSync.h"
|
||||
#include "mndUser.h"
|
||||
|
||||
#define MND_TRANS_VER_NUMBER 1
|
||||
#define MND_TRANS_ARRAY_SIZE 8
|
||||
|
@ -51,7 +54,11 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
|
|||
|
||||
static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
|
||||
static void mndTransSendRpcRsp(STrans *pTrans);
|
||||
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessTransReq(SMnodeMsg *pMsg);
|
||||
|
||||
static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||
static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||
static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter);
|
||||
|
||||
int32_t mndInitTrans(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_TRANS,
|
||||
|
@ -62,7 +69,11 @@ int32_t mndInitTrans(SMnode *pMnode) {
|
|||
.updateFp = (SdbUpdateFp)mndTransActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndTransActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq);
|
||||
|
||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta);
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans);
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
|
@ -80,17 +91,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
|
||||
for (int32_t i = 0; i < redoLogNum; ++i) {
|
||||
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
|
||||
rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
|
||||
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < undoLogNum; ++i) {
|
||||
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
|
||||
rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
|
||||
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < commitLogNum; ++i) {
|
||||
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
|
||||
rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
|
||||
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < redoActionNum; ++i) {
|
||||
|
@ -296,9 +307,43 @@ TRANS_DECODE_OVER:
|
|||
return pRow;
|
||||
}
|
||||
|
||||
static const char *mndTransStr(ETrnStage stage) {
|
||||
switch (stage) {
|
||||
case TRN_STAGE_PREPARE:
|
||||
return "prepare";
|
||||
case TRN_STAGE_REDO_LOG:
|
||||
return "redoLog";
|
||||
case TRN_STAGE_REDO_ACTION:
|
||||
return "redoAction";
|
||||
case TRN_STAGE_COMMIT:
|
||||
return "commit";
|
||||
case TRN_STAGE_COMMIT_LOG:
|
||||
return "commitLog";
|
||||
case TRN_STAGE_UNDO_ACTION:
|
||||
return "undoAction";
|
||||
case TRN_STAGE_UNDO_LOG:
|
||||
return "undoLog";
|
||||
case TRN_STAGE_ROLLBACK:
|
||||
return "rollback";
|
||||
case TRN_STAGE_FINISHED:
|
||||
return "finished";
|
||||
default:
|
||||
return "invalid";
|
||||
}
|
||||
}
|
||||
|
||||
static const char *mndTransType(ETrnType type) {
|
||||
switch (type) {
|
||||
case TRN_TYPE_CREATE_DB:
|
||||
return "create-db";
|
||||
default:
|
||||
return "invalid";
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
||||
pTrans->stage = TRN_STAGE_PREPARE;
|
||||
mTrace("trans:%d, perform insert action, row:%p", pTrans->id, pTrans);
|
||||
// pTrans->stage = TRN_STAGE_PREPARE;
|
||||
mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -309,28 +354,31 @@ static void mndTransDropData(STrans *pTrans) {
|
|||
mndTransDropActions(pTrans->redoActions);
|
||||
mndTransDropActions(pTrans->undoActions);
|
||||
if (pTrans->rpcRsp != NULL) {
|
||||
rpcFreeCont(pTrans->rpcRsp);
|
||||
free(pTrans->rpcRsp);
|
||||
pTrans->rpcRsp = NULL;
|
||||
pTrans->rpcRspLen = 0;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
|
||||
mTrace("trans:%d, perform delete action, row:%p", pTrans->id, pTrans);
|
||||
mTrace("trans:%d, perform delete action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
|
||||
mndTransDropData(pTrans);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
|
||||
if (pNew->stage == TRN_STAGE_COMMIT) pNew->stage = TRN_STAGE_COMMIT_LOG;
|
||||
if (pNew->stage == TRN_STAGE_COMMIT) {
|
||||
pNew->stage = TRN_STAGE_COMMIT_LOG;
|
||||
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
|
||||
}
|
||||
|
||||
mTrace("trans:%d, perform update action, old row:%p stage:%d, new row:%p stage:%d", pOld->id, pOld, pOld->stage, pNew,
|
||||
pNew->stage);
|
||||
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
|
||||
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
|
||||
pOld->stage = pNew->stage;
|
||||
return 0;
|
||||
}
|
||||
|
||||
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
|
||||
static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
|
||||
if (pTrans == NULL) {
|
||||
|
@ -339,7 +387,7 @@ STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
|
|||
return pTrans;
|
||||
}
|
||||
|
||||
void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
|
||||
static void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbRelease(pSdb, pTrans);
|
||||
}
|
||||
|
@ -375,8 +423,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
static void mndTransDropLogs(SArray *pArray) {
|
||||
if (pArray == NULL) return;
|
||||
for (int32_t i = 0; i < pArray->size; ++i) {
|
||||
int32_t size = taosArrayGetSize(pArray);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
|
||||
sdbFreeRaw(pRaw);
|
||||
}
|
||||
|
@ -385,10 +433,10 @@ static void mndTransDropLogs(SArray *pArray) {
|
|||
}
|
||||
|
||||
static void mndTransDropActions(SArray *pArray) {
|
||||
if (pArray == NULL) return;
|
||||
for (int32_t i = 0; i < pArray->size; ++i) {
|
||||
int32_t size = taosArrayGetSize(pArray);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STransAction *pAction = taosArrayGet(pArray, i);
|
||||
free(pAction->pCont);
|
||||
tfree(pAction->pCont);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pArray);
|
||||
|
@ -941,8 +989,8 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
|||
mndTransSendRpcRsp(pTrans);
|
||||
}
|
||||
|
||||
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
|
||||
mndTransPullup(pMsg->pMnode);
|
||||
static int32_t mndProcessTransReq(SMnodeMsg *pReq) {
|
||||
mndTransPullup(pReq->pMnode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -960,3 +1008,122 @@ void mndTransPullup(SMnode *pMnode) {
|
|||
|
||||
sdbWriteFile(pMnode->pSdb);
|
||||
}
|
||||
|
||||
static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
int32_t cols = 0;
|
||||
SSchema *pSchema = pMeta->pSchemas;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "id");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "create_time");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "stage");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "db");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = (TSDB_TRANS_DESC_LEN - 1) + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "type");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "last_exec_time");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "last_error");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = cols;
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
for (int32_t i = 1; i < cols; ++i) {
|
||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
}
|
||||
|
||||
pShow->numOfRows = sdbGetSize(pSdb, SDB_TRANS);
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
strcpy(pMeta->tbName, mndShowStr(pShow->type));
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
STrans *pTrans = NULL;
|
||||
int32_t cols = 0;
|
||||
char *pWrite;
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pTrans->id;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int64_t *)pWrite = pTrans->createdTime;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_TO_VARSTR(pWrite, mndTransStr(pTrans->stage));
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_TO_VARSTR(pWrite, pTrans->dbname);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_TO_VARSTR(pWrite, mndTransType(pTrans->transType));
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int64_t *)pWrite = pTrans->lastExecTime;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_TO_VARSTR(pWrite, pTrans->lastError);
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pTrans);
|
||||
}
|
||||
|
||||
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "sdbInt.h"
|
||||
|
||||
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow);
|
||||
|
||||
const char *sdbTableName(ESdbType type) {
|
||||
switch (type) {
|
||||
case SDB_TRANS:
|
||||
|
@ -221,6 +223,8 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
|||
|
||||
pSdb->tableVer[pOldRow->type]++;
|
||||
sdbFreeRow(pSdb, pRow);
|
||||
|
||||
sdbCheck(pSdb, pOldRow);
|
||||
// sdbRelease(pSdb, pOldRow->pObj);
|
||||
return 0;
|
||||
}
|
||||
|
@ -305,6 +309,19 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
|
|||
return pRet;
|
||||
}
|
||||
|
||||
static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) {
|
||||
SRWLatch *pLock = &pSdb->locks[pRow->type];
|
||||
taosRLockLatch(pLock);
|
||||
|
||||
int32_t ref = atomic_load_32(&pRow->refCount);
|
||||
sdbPrintOper(pSdb, pRow, "checkRow");
|
||||
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
|
||||
sdbFreeRow(pSdb, pRow);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(pLock);
|
||||
}
|
||||
|
||||
void sdbRelease(SSdb *pSdb, void *pObj) {
|
||||
if (pObj == NULL) return;
|
||||
|
||||
|
@ -332,6 +349,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
|
|||
SRWLatch *pLock = &pSdb->locks[type];
|
||||
taosRLockLatch(pLock);
|
||||
|
||||
#if 0
|
||||
if (pIter != NULL) {
|
||||
SSdbRow *pLastRow = *(SSdbRow **)pIter;
|
||||
int32_t ref = atomic_load_32(&pLastRow->refCount);
|
||||
|
@ -339,6 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
|
|||
sdbFreeRow(pSdb, pLastRow);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
SSdbRow **ppRow = taosHashIterate(hash, pIter);
|
||||
while (ppRow != NULL) {
|
||||
|
|
|
@ -192,12 +192,12 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
|
|||
if (pTopic->pReadhandle == NULL) {
|
||||
ASSERT(false);
|
||||
}
|
||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||
pTopic->buffer.output[i].status = 0;
|
||||
for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
|
||||
pTopic->buffer.output[j].status = 0;
|
||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
||||
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta};
|
||||
pTopic->buffer.output[i].pReadHandle = pReadHandle;
|
||||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
|
||||
pTopic->buffer.output[j].pReadHandle = pReadHandle;
|
||||
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -264,3 +264,60 @@ void nodesRewriteNodePostOrder(SNode** pNode, FNodeRewriter rewriter, void* pCon
|
|||
void nodesRewriteListPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext) {
|
||||
(void)rewriteList(pList, TRAVERSAL_POSTORDER, rewriter, pContext);
|
||||
}
|
||||
|
||||
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext) {
|
||||
if (NULL == pSelect) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch (clause) {
|
||||
case SQL_CLAUSE_FROM:
|
||||
nodesWalkNode(pSelect->pFromTable, walker, pContext);
|
||||
nodesWalkNode(pSelect->pWhere, walker, pContext);
|
||||
case SQL_CLAUSE_WHERE:
|
||||
nodesWalkList(pSelect->pPartitionByList, walker, pContext);
|
||||
case SQL_CLAUSE_PARTITION_BY:
|
||||
nodesWalkNode(pSelect->pWindow, walker, pContext);
|
||||
case SQL_CLAUSE_WINDOW:
|
||||
nodesWalkList(pSelect->pGroupByList, walker, pContext);
|
||||
case SQL_CLAUSE_GROUP_BY:
|
||||
nodesWalkNode(pSelect->pHaving, walker, pContext);
|
||||
case SQL_CLAUSE_HAVING:
|
||||
nodesWalkList(pSelect->pProjectionList, walker, pContext);
|
||||
case SQL_CLAUSE_SELECT:
|
||||
nodesWalkList(pSelect->pOrderByList, walker, pContext);
|
||||
case SQL_CLAUSE_ORDER_BY:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewriter rewriter, void* pContext) {
|
||||
if (NULL == pSelect) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch (clause) {
|
||||
case SQL_CLAUSE_FROM:
|
||||
nodesRewriteNode(&(pSelect->pFromTable), rewriter, pContext);
|
||||
nodesRewriteNode(&(pSelect->pWhere), rewriter, pContext);
|
||||
case SQL_CLAUSE_WHERE:
|
||||
nodesRewriteList(pSelect->pPartitionByList, rewriter, pContext);
|
||||
case SQL_CLAUSE_PARTITION_BY:
|
||||
nodesRewriteNode(&(pSelect->pWindow), rewriter, pContext);
|
||||
case SQL_CLAUSE_WINDOW:
|
||||
nodesRewriteList(pSelect->pGroupByList, rewriter, pContext);
|
||||
case SQL_CLAUSE_GROUP_BY:
|
||||
nodesRewriteNode(&(pSelect->pHaving), rewriter, pContext);
|
||||
case SQL_CLAUSE_HAVING:
|
||||
nodesRewriteList(pSelect->pProjectionList, rewriter, pContext);
|
||||
case SQL_CLAUSE_SELECT:
|
||||
nodesRewriteList(pSelect->pOrderByList, rewriter, pContext);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -15,8 +15,10 @@
|
|||
|
||||
#include "querynodes.h"
|
||||
#include "nodesShowStmts.h"
|
||||
#include "taos.h"
|
||||
#include "taoserror.h"
|
||||
#include "taos.h"
|
||||
#include "thash.h"
|
||||
|
||||
static SNode* makeNode(ENodeType type, size_t size) {
|
||||
SNode* p = calloc(1, size);
|
||||
|
@ -99,14 +101,14 @@ SNodeList* nodesMakeList() {
|
|||
return p;
|
||||
}
|
||||
|
||||
SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode) {
|
||||
int32_t nodesListAppend(SNodeList* pList, SNode* pNode) {
|
||||
if (NULL == pList || NULL == pNode) {
|
||||
return NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SListCell* p = calloc(1, sizeof(SListCell));
|
||||
if (NULL == p) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return pList;
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
p->pNode = pNode;
|
||||
if (NULL == pList->pHead) {
|
||||
|
@ -117,7 +119,7 @@ SNodeList* nodesListAppend(SNodeList* pList, SNode* pNode) {
|
|||
}
|
||||
pList->pTail = p;
|
||||
++(pList->length);
|
||||
return pList;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell) {
|
||||
|
@ -239,3 +241,99 @@ bool nodesIsTimeorderQuery(const SNode* pQuery) {
|
|||
bool nodesIsTimelineQuery(const SNode* pQuery) {
|
||||
return false;
|
||||
}
|
||||
|
||||
typedef struct SCollectColumnsCxt {
|
||||
int32_t errCode;
|
||||
uint64_t tableId;
|
||||
bool realCol;
|
||||
SNodeList* pCols;
|
||||
SHashObj* pColIdHash;
|
||||
} SCollectColumnsCxt;
|
||||
|
||||
static EDealRes doCollect(SCollectColumnsCxt* pCxt, int32_t id, SNode* pNode) {
|
||||
if (NULL == taosHashGet(pCxt->pColIdHash, &id, sizeof(id))) {
|
||||
pCxt->errCode = taosHashPut(pCxt->pColIdHash, &id, sizeof(id), NULL, 0);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pCxt->errCode = nodesListAppend(pCxt->pCols, pNode);
|
||||
}
|
||||
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static EDealRes collectColumns(SNode* pNode, void* pContext) {
|
||||
SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext;
|
||||
|
||||
if (pCxt->realCol && QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
int32_t colId = pCol->colId;
|
||||
if (pCxt->tableId == pCol->tableId && colId > 0) {
|
||||
return doCollect(pCxt, colId, pNode);
|
||||
}
|
||||
} else if (!pCxt->realCol && QUERY_NODE_COLUMN_REF == nodeType(pNode)) {
|
||||
return doCollect(pCxt, ((SColumnRefNode*)pNode)->slotId, pNode);
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, uint64_t tableId, bool realCol, SNodeList** pCols) {
|
||||
if (NULL == pSelect || NULL == pCols) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCollectColumnsCxt cxt = {
|
||||
.errCode = TSDB_CODE_SUCCESS,
|
||||
.realCol = realCol,
|
||||
.pCols = nodesMakeList(),
|
||||
.pColIdHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK)
|
||||
};
|
||||
if (NULL == cxt.pCols || NULL == cxt.pColIdHash) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
nodesWalkSelectStmt(pSelect, clause, collectColumns, &cxt);
|
||||
taosHashCleanup(cxt.pColIdHash);
|
||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||
nodesDestroyList(cxt.pCols);
|
||||
return cxt.errCode;
|
||||
}
|
||||
*pCols = cxt.pCols;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SCollectFuncsCxt {
|
||||
int32_t errCode;
|
||||
FFuncClassifier classifier;
|
||||
SNodeList* pFuncs;
|
||||
} SCollectFuncsCxt;
|
||||
|
||||
static EDealRes collectFuncs(SNode* pNode, void* pContext) {
|
||||
SCollectFuncsCxt* pCxt = (SCollectFuncsCxt*)pContext;
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pNode) && pCxt->classifier(((SFunctionNode*)pNode)->funcId)) {
|
||||
pCxt->errCode = nodesListAppend(pCxt->pFuncs, pNode);
|
||||
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNodeList** pFuncs) {
|
||||
if (NULL == pSelect || NULL == pFuncs) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCollectFuncsCxt cxt = {
|
||||
.errCode = TSDB_CODE_SUCCESS,
|
||||
.classifier = classifier,
|
||||
.pFuncs = nodesMakeList()
|
||||
};
|
||||
if (NULL == cxt.pFuncs) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
nodesWalkSelectStmt(pSelect, SQL_CLAUSE_GROUP_BY, collectFuncs, &cxt);
|
||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||
nodesDestroyList(cxt.pFuncs);
|
||||
return cxt.errCode;
|
||||
}
|
||||
*pFuncs = cxt.pFuncs;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ cmd ::= SHOW QUERIES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_QUERIES, 0, 0);
|
|||
cmd ::= SHOW CONNECTIONS.{ setShowOptions(pInfo, TSDB_MGMT_TABLE_CONNS, 0, 0);}
|
||||
cmd ::= SHOW STREAMS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_STREAMS, 0, 0); }
|
||||
cmd ::= SHOW VARIABLES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VARIABLES, 0, 0); }
|
||||
cmd ::= SHOW SCORES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_SCORES, 0, 0); }
|
||||
cmd ::= SHOW SCORES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_TRANS, 0, 0); }
|
||||
cmd ::= SHOW GRANTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_GRANTS, 0, 0); }
|
||||
|
||||
cmd ::= SHOW VNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_VNODES, 0, 0); }
|
||||
|
|
|
@ -96,11 +96,17 @@ SToken getTokenFromRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
|
|||
SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode) {
|
||||
SNodeList* list = nodesMakeList();
|
||||
CHECK_OUT_OF_MEM(list);
|
||||
return nodesListAppend(list, pNode);
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(list, pNode)) {
|
||||
pCxt->valid = false;
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode) {
|
||||
return nodesListAppend(pList, pNode);
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pList, pNode)) {
|
||||
pCxt->valid = false;
|
||||
}
|
||||
return pList;
|
||||
}
|
||||
|
||||
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName) {
|
||||
|
|
|
@ -241,17 +241,6 @@ abort_parse:
|
|||
return cxt.valid ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
typedef enum ESqlClause {
|
||||
SQL_CLAUSE_FROM = 1,
|
||||
SQL_CLAUSE_WHERE,
|
||||
SQL_CLAUSE_PARTITION_BY,
|
||||
SQL_CLAUSE_WINDOW,
|
||||
SQL_CLAUSE_GROUP_BY,
|
||||
SQL_CLAUSE_HAVING,
|
||||
SQL_CLAUSE_SELECT,
|
||||
SQL_CLAUSE_ORDER_BY
|
||||
} ESqlClause;
|
||||
|
||||
static bool afterGroupBy(ESqlClause clause) {
|
||||
return clause > SQL_CLAUSE_GROUP_BY;
|
||||
}
|
||||
|
@ -298,7 +287,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "Not SELECTed expression";
|
||||
case TSDB_CODE_PAR_NOT_SINGLE_GROUP:
|
||||
return "Not a single-group group function";
|
||||
case TSDB_CODE_PAR_OUT_OF_MEMORY:
|
||||
case TSDB_CODE_OUT_OF_MEMORY:
|
||||
return "Out of memory";
|
||||
default:
|
||||
return "Unknown error";
|
||||
|
@ -376,7 +365,7 @@ static void setColumnInfoBySchema(const STableNode* pTable, const SSchema* pColS
|
|||
|
||||
static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SColumnNode* pCol) {
|
||||
pCol->pProjectRef = (SNode*)pExpr;
|
||||
pExpr->pAssociationList = nodesListAppend(pExpr->pAssociationList, (SNode*)pCol);
|
||||
nodesListAppend(pExpr->pAssociationList, (SNode*)pCol);
|
||||
if (NULL != pTable) {
|
||||
strcpy(pCol->tableAlias, pTable->tableAlias);
|
||||
}
|
||||
|
@ -391,7 +380,7 @@ static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode
|
|||
for (int32_t i = 0; i < nums; ++i) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY);
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
setColumnInfoBySchema(pTable, pMeta->schema + i, pCol);
|
||||
nodesListAppend(pList, (SNode*)pCol);
|
||||
|
@ -402,7 +391,7 @@ static int32_t createColumnNodeByTable(STranslateContext* pCxt, const STableNode
|
|||
FOREACH(pNode, pProjectList) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY);
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
setColumnInfoByExpr(pTable, (SExprNode*)pNode, pCol);
|
||||
nodesListAppend(pList, (SNode*)pCol);
|
||||
|
@ -572,7 +561,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
|||
int32_t n = strlen(pVal->literal);
|
||||
pVal->datum.p = calloc(1, n);
|
||||
if (NULL == pVal->datum.p) {
|
||||
generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY);
|
||||
generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
trimStringCopy(pVal->literal, n, pVal->datum.p);
|
||||
|
@ -582,7 +571,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
|||
int32_t n = strlen(pVal->literal);
|
||||
char* tmp = calloc(1, n);
|
||||
if (NULL == tmp) {
|
||||
generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY);
|
||||
generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
int32_t len = trimStringCopy(pVal->literal, n, tmp);
|
||||
|
@ -830,7 +819,7 @@ static int32_t translateStar(STranslateContext* pCxt, SSelectStmt* pSelect, bool
|
|||
size_t nums = taosArrayGetSize(pTables);
|
||||
pSelect->pProjectionList = nodesMakeList();
|
||||
if (NULL == pSelect->pProjectionList) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY);
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
for (size_t i = 0; i < nums; ++i) {
|
||||
STableNode* pTable = taosArrayGetP(pTables, i);
|
||||
|
@ -897,7 +886,7 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro
|
|||
} else {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY);
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
setColumnInfoByExpr(NULL, (SExprNode*)nodesListGetNode(pProjectionList, pos - 1), pCol);
|
||||
((SOrderByExprNode*)pNode)->pExpr = (SNode*)pCol;
|
||||
|
@ -1036,7 +1025,7 @@ int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
pQuery->numOfResCols = LIST_LENGTH(pSelect->pProjectionList);
|
||||
pQuery->pResSchema = calloc(pQuery->numOfResCols, sizeof(SSchema));
|
||||
if (NULL == pQuery->pResSchema) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_OUT_OF_MEMORY);
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
SNode* pNode;
|
||||
int32_t index = 0;
|
||||
|
|
|
@ -2262,7 +2262,7 @@ static void yy_reduce(
|
|||
{ setShowOptions(pInfo, TSDB_MGMT_TABLE_VARIABLES, 0, 0); }
|
||||
break;
|
||||
case 13: /* cmd ::= SHOW SCORES */
|
||||
{ setShowOptions(pInfo, TSDB_MGMT_TABLE_SCORES, 0, 0); }
|
||||
{ setShowOptions(pInfo, TSDB_MGMT_TABLE_TRANS, 0, 0); }
|
||||
break;
|
||||
case 14: /* cmd ::= SHOW GRANTS */
|
||||
{ setShowOptions(pInfo, TSDB_MGMT_TABLE_GRANTS, 0, 0); }
|
||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
|||
|
||||
typedef struct SLogicNode {
|
||||
ENodeType type;
|
||||
int32_t id;
|
||||
SNodeList* pTargets;
|
||||
SNode* pConditions;
|
||||
SNodeList* pChildren;
|
||||
|
|
|
@ -16,71 +16,32 @@
|
|||
#include "plannerImpl.h"
|
||||
#include "functionMgt.h"
|
||||
|
||||
static SLogicNode* createQueryLogicNode(SNode* pStmt);
|
||||
#define CHECK_ALLOC(p, res) \
|
||||
do { \
|
||||
if (NULL == p) { \
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; \
|
||||
return res; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
typedef struct SCollectColumnsCxt {
|
||||
SNodeList* pCols;
|
||||
SHashObj* pColIdHash;
|
||||
} SCollectColumnsCxt;
|
||||
#define CHECK_CODE(exec, res) \
|
||||
do { \
|
||||
int32_t code = exec; \
|
||||
if (TSDB_CODE_SUCCESS != code) { \
|
||||
pCxt->errCode = code; \
|
||||
return res; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
static EDealRes doCollectColumns(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext;
|
||||
int16_t colId = ((SColumnNode*)pNode)->colId;
|
||||
if (colId > 0) {
|
||||
if (NULL == taosHashGet(pCxt->pColIdHash, &colId, sizeof(colId))) {
|
||||
taosHashPut(pCxt->pColIdHash, &colId, sizeof(colId), NULL, 0);
|
||||
nodesListAppend(pCxt->pCols, pNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static SNodeList* collectColumns(SSelectStmt* pSelect) {
|
||||
SCollectColumnsCxt cxt = { .pCols = nodesMakeList(), .pColIdHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK) };
|
||||
if (NULL == cxt.pCols || NULL == cxt.pColIdHash) {
|
||||
return NULL;
|
||||
}
|
||||
nodesWalkNode(pSelect->pFromTable, doCollectColumns, &cxt);
|
||||
nodesWalkNode(pSelect->pWhere, doCollectColumns, &cxt);
|
||||
nodesWalkList(pSelect->pPartitionByList, doCollectColumns, &cxt);
|
||||
nodesWalkNode(pSelect->pWindow, doCollectColumns, &cxt);
|
||||
nodesWalkList(pSelect->pGroupByList, doCollectColumns, &cxt);
|
||||
nodesWalkNode(pSelect->pHaving, doCollectColumns, &cxt);
|
||||
nodesWalkList(pSelect->pProjectionList, doCollectColumns, &cxt);
|
||||
nodesWalkList(pSelect->pOrderByList, doCollectColumns, &cxt);
|
||||
taosHashCleanup(cxt.pColIdHash);
|
||||
return cxt.pCols;
|
||||
}
|
||||
|
||||
typedef struct SCollectAggFuncsCxt {
|
||||
SNodeList* pAggFuncs;
|
||||
} SCollectAggFuncsCxt;
|
||||
|
||||
static EDealRes doCollectAggFuncs(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsAggFunc(((SFunctionNode*)pNode)->funcId)) {
|
||||
SCollectAggFuncsCxt* pCxt = (SCollectAggFuncsCxt*)pContext;
|
||||
nodesListAppend(pCxt->pAggFuncs, pNode);
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static SNodeList* collectAggFuncs(SSelectStmt* pSelect) {
|
||||
SCollectAggFuncsCxt cxt = { .pAggFuncs = nodesMakeList() };
|
||||
if (NULL == cxt.pAggFuncs) {
|
||||
return NULL;
|
||||
}
|
||||
nodesWalkNode(pSelect->pHaving, doCollectAggFuncs, &cxt);
|
||||
nodesWalkList(pSelect->pProjectionList, doCollectAggFuncs, &cxt);
|
||||
if (!pSelect->isDistinct) {
|
||||
nodesWalkList(pSelect->pOrderByList, doCollectAggFuncs, &cxt);
|
||||
}
|
||||
return cxt.pAggFuncs;
|
||||
}
|
||||
typedef struct SPlanContext {
|
||||
int32_t errCode;
|
||||
int32_t planNodeId;
|
||||
SNodeList* pResource;
|
||||
} SPlanContext;
|
||||
|
||||
typedef struct SRewriteExprCxt {
|
||||
int32_t errCode;
|
||||
int32_t planNodeId;
|
||||
SNodeList* pTargets;
|
||||
} SRewriteExprCxt;
|
||||
|
||||
|
@ -90,9 +51,15 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
|
|||
int32_t index = 0;
|
||||
FOREACH(pTarget, pCxt->pTargets) {
|
||||
if (nodesEqualNode(pTarget, *pNode)) {
|
||||
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
|
||||
if (NULL == pCol) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
pCol->tupleId = pCxt->planNodeId;
|
||||
pCol->slotId = index;
|
||||
nodesDestroyNode(*pNode);
|
||||
*pNode = nodesMakeNode(QUERY_NODE_COLUMN_REF);
|
||||
((SColumnRef*)*pNode)->slotId = index;
|
||||
*pNode = (SNode*)pCol;
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
++index;
|
||||
|
@ -100,59 +67,93 @@ static EDealRes doRewriteExpr(SNode** pNode, void* pContext) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t rewriteExpr(SNodeList* pTargets, SSelectStmt* pSelect) {
|
||||
SRewriteExprCxt cxt = { .pTargets = pTargets };
|
||||
nodesRewriteNode(&(pSelect->pFromTable), doRewriteExpr, &cxt);
|
||||
nodesRewriteNode(&(pSelect->pWhere), doRewriteExpr, &cxt);
|
||||
nodesRewriteList(pSelect->pPartitionByList, doRewriteExpr, &cxt);
|
||||
nodesRewriteNode(&(pSelect->pWindow), doRewriteExpr, &cxt);
|
||||
nodesRewriteList(pSelect->pGroupByList, doRewriteExpr, &cxt);
|
||||
nodesRewriteNode(&(pSelect->pHaving), doRewriteExpr, &cxt);
|
||||
nodesRewriteList(pSelect->pProjectionList, doRewriteExpr, &cxt);
|
||||
nodesRewriteList(pSelect->pOrderByList, doRewriteExpr, &cxt);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
static int32_t rewriteExpr(int32_t planNodeId, SNodeList* pTargets, SSelectStmt* pSelect, ESqlClause clause) {
|
||||
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .planNodeId = planNodeId, .pTargets = pTargets };
|
||||
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
||||
static SLogicNode* pushLogicNode(SPlanContext* pCxt, SLogicNode* pRoot, SLogicNode* pNode) {
|
||||
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
static SLogicNode* pushLogicNode(SLogicNode* pRoot, SLogicNode* pNode) {
|
||||
if (NULL == pRoot) {
|
||||
return pNode;
|
||||
}
|
||||
|
||||
if (NULL == pNode) {
|
||||
return pRoot;
|
||||
}
|
||||
pRoot->pParent = pNode;
|
||||
|
||||
if (NULL == pNode->pChildren) {
|
||||
pNode->pChildren = nodesMakeList();
|
||||
if (NULL == pNode->pChildren) {
|
||||
goto error;
|
||||
}
|
||||
nodesListAppend(pNode->pChildren, (SNode*)pRoot);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pNode->pChildren, (SNode*)pRoot)) {
|
||||
goto error;
|
||||
}
|
||||
pRoot->pParent = pNode;
|
||||
return pNode;
|
||||
error:
|
||||
nodesDestroyNode((SNode*)pNode);
|
||||
return pRoot;
|
||||
}
|
||||
|
||||
static SNodeList* createScanTargets(SNodeList* pCols) {
|
||||
|
||||
static SNodeList* createScanTargets(int32_t planNodeId, int32_t numOfScanCols) {
|
||||
SNodeList* pTargets = nodesMakeList();
|
||||
if (NULL == pTargets) {
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < numOfScanCols; ++i) {
|
||||
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
|
||||
if (NULL == pCol || TSDB_CODE_SUCCESS != nodesListAppend(pTargets, (SNode*)pCol)) {
|
||||
nodesDestroyList(pTargets);
|
||||
return NULL;
|
||||
}
|
||||
pCol->tupleId = planNodeId;
|
||||
pCol->slotId = i;
|
||||
}
|
||||
return pTargets;
|
||||
}
|
||||
|
||||
static SLogicNode* createScanLogicNode(SSelectStmt* pSelect, SRealTableNode* pRealTable) {
|
||||
static SLogicNode* createScanLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
|
||||
SNodeList* pCols = collectColumns(pSelect);
|
||||
pScan->pScanCols = nodesCloneList(pCols);
|
||||
//
|
||||
rewriteExpr(pScan->pScanCols, pSelect);
|
||||
pScan->node.pTargets = createScanTargets(pCols);
|
||||
CHECK_ALLOC(pScan, NULL);
|
||||
pScan->node.id = pCxt->planNodeId++;
|
||||
|
||||
pScan->pMeta = pRealTable->pMeta;
|
||||
|
||||
// set columns to scan
|
||||
SNodeList* pCols = NULL;
|
||||
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pScan->pMeta->uid, true, &pCols), (SLogicNode*)pScan);
|
||||
pScan->pScanCols = nodesCloneList(pCols);
|
||||
CHECK_ALLOC(pScan->pScanCols, (SLogicNode*)pScan);
|
||||
|
||||
// pScanCols of SScanLogicNode is equivalent to pTargets of other logic nodes
|
||||
CHECK_CODE(rewriteExpr(pScan->node.id, pScan->pScanCols, pSelect, SQL_CLAUSE_FROM), (SLogicNode*)pScan);
|
||||
|
||||
// set output
|
||||
pScan->node.pTargets = createScanTargets(pScan->node.id, LIST_LENGTH(pScan->pScanCols));
|
||||
CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan);
|
||||
|
||||
return (SLogicNode*)pScan;
|
||||
}
|
||||
|
||||
static SLogicNode* createSubqueryLogicNode(SSelectStmt* pSelect, STempTableNode* pTable) {
|
||||
return createQueryLogicNode(pTable->pSubquery);
|
||||
static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt);
|
||||
|
||||
static SLogicNode* createSubqueryLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable) {
|
||||
return createQueryLogicNode(pCxt, pTable->pSubquery);
|
||||
}
|
||||
|
||||
static SLogicNode* createLogicNodeByTable(SSelectStmt* pSelect, SNode* pTable) {
|
||||
static SLogicNode* createLogicNodeByTable(SPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable) {
|
||||
switch (nodeType(pTable)) {
|
||||
case QUERY_NODE_REAL_TABLE:
|
||||
return createScanLogicNode(pSelect, (SRealTableNode*)pTable);
|
||||
return createScanLogicNode(pCxt, pSelect, (SRealTableNode*)pTable);
|
||||
case QUERY_NODE_TEMP_TABLE:
|
||||
return createSubqueryLogicNode(pSelect, (STempTableNode*)pTable);
|
||||
return createSubqueryLogicNode(pCxt, pSelect, (STempTableNode*)pTable);
|
||||
case QUERY_NODE_JOIN_TABLE:
|
||||
default:
|
||||
break;
|
||||
|
@ -160,45 +161,86 @@ static SLogicNode* createLogicNodeByTable(SSelectStmt* pSelect, SNode* pTable) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SLogicNode* createFilterLogicNode(SNode* pWhere) {
|
||||
static SLogicNode* createFilterLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SNode* pWhere) {
|
||||
if (NULL == pWhere) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SFilterLogicNode* pFilter = (SFilterLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FILTER);
|
||||
CHECK_ALLOC(pFilter, NULL);
|
||||
pFilter->node.id = pCxt->planNodeId++;
|
||||
|
||||
// set filter conditions
|
||||
pFilter->node.pConditions = nodesCloneNode(pWhere);
|
||||
CHECK_ALLOC(pFilter->node.pConditions, (SLogicNode*)pFilter);
|
||||
|
||||
// set the output and rewrite the expression in subsequent clauses with the output
|
||||
SNodeList* pCols = NULL;
|
||||
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_WHERE, 0, false, &pCols), (SLogicNode*)pFilter);
|
||||
pFilter->node.pTargets = nodesCloneList(pCols);
|
||||
CHECK_ALLOC(pFilter->node.pTargets, (SLogicNode*)pFilter);
|
||||
CHECK_CODE(rewriteExpr(pFilter->node.id, pFilter->node.pTargets, pSelect, SQL_CLAUSE_WHERE), (SLogicNode*)pFilter);
|
||||
|
||||
return (SLogicNode*)pFilter;
|
||||
}
|
||||
|
||||
static SLogicNode* createAggLogicNode(SSelectStmt* pSelect, SNodeList* pGroupByList, SNode* pHaving) {
|
||||
SNodeList* pAggFuncs = collectAggFuncs(pSelect);
|
||||
static SLogicNode* createAggLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect, SNodeList* pGroupByList, SNode* pHaving) {
|
||||
SNodeList* pAggFuncs = NULL;
|
||||
CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pAggFuncs), NULL);
|
||||
if (NULL == pAggFuncs && NULL == pGroupByList) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
|
||||
CHECK_ALLOC(pAgg, NULL);
|
||||
pAgg->node.id = pCxt->planNodeId++;
|
||||
|
||||
// set grouyp keys, agg funcs and having conditions
|
||||
pAgg->pGroupKeys = nodesCloneList(pGroupByList);
|
||||
CHECK_ALLOC(pAgg->pGroupKeys, (SLogicNode*)pAgg);
|
||||
pAgg->pAggFuncs = nodesCloneList(pAggFuncs);
|
||||
CHECK_ALLOC(pAgg->pAggFuncs, (SLogicNode*)pAgg);
|
||||
pAgg->node.pConditions = nodesCloneNode(pHaving);
|
||||
CHECK_ALLOC(pAgg->node.pConditions, (SLogicNode*)pAgg);
|
||||
|
||||
// set the output and rewrite the expression in subsequent clauses with the output
|
||||
SNodeList* pCols = NULL;
|
||||
CHECK_CODE(nodesCollectColumns(pSelect, SQL_CLAUSE_HAVING, 0, false, &pCols), (SLogicNode*)pAgg);
|
||||
pAgg->node.pTargets = nodesCloneList(pCols);
|
||||
CHECK_ALLOC(pAgg->node.pTargets, (SLogicNode*)pAgg);
|
||||
CHECK_CODE(rewriteExpr(pAgg->node.id, pAgg->node.pTargets, pSelect, SQL_CLAUSE_HAVING), (SLogicNode*)pAgg);
|
||||
|
||||
return (SLogicNode*)pAgg;
|
||||
}
|
||||
|
||||
static SLogicNode* createSelectLogicNode(SSelectStmt* pSelect) {
|
||||
SLogicNode* pRoot = createLogicNodeByTable(pSelect, pSelect->pFromTable);
|
||||
pRoot = pushLogicNode(pRoot, createFilterLogicNode(pSelect->pWhere));
|
||||
pRoot = pushLogicNode(pRoot, createAggLogicNode(pSelect, pSelect->pGroupByList, pSelect->pHaving));
|
||||
// pRoot = pushLogicNode(pRoot, createProjectLogicNode(pSelect, pSelect->pProjectionList));
|
||||
static SLogicNode* createSelectLogicNode(SPlanContext* pCxt, SSelectStmt* pSelect) {
|
||||
SLogicNode* pRoot = createLogicNodeByTable(pCxt, pSelect, pSelect->pFromTable);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pRoot = pushLogicNode(pCxt, pRoot, createFilterLogicNode(pCxt, pSelect, pSelect->pWhere));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect, pSelect->pGroupByList, pSelect->pHaving));
|
||||
}
|
||||
// pRoot = pushLogicNode(pCxt, pRoot, createProjectLogicNode(pSelect, pSelect->pProjectionList));
|
||||
return pRoot;
|
||||
}
|
||||
|
||||
static SLogicNode* createQueryLogicNode(SNode* pStmt) {
|
||||
static SLogicNode* createQueryLogicNode(SPlanContext* pCxt, SNode* pStmt) {
|
||||
switch (nodeType(pStmt)) {
|
||||
case QUERY_NODE_SELECT_STMT:
|
||||
return createSelectLogicNode((SSelectStmt*)pStmt);
|
||||
return createSelectLogicNode(pCxt, (SSelectStmt*)pStmt);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t createLogicPlan(SNode* pNode, SLogicNode** pLogicNode) {
|
||||
*pLogicNode = createQueryLogicNode(pNode);
|
||||
SPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .planNodeId = 0 };
|
||||
SLogicNode* pRoot = createQueryLogicNode(&cxt, pNode);
|
||||
if (TSDB_CODE_SUCCESS != cxt.errCode) {
|
||||
nodesDestroyNode((SNode*)pRoot);
|
||||
return cxt.errCode;
|
||||
}
|
||||
*pLogicNode = pRoot;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -137,7 +137,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp
|
|||
.pCont = pMsg,
|
||||
.contLen = pInfo->msgInfo.len,
|
||||
.ahandle = (void*) pInfo,
|
||||
.handle = NULL,
|
||||
.handle = pInfo->msgInfo.handle,
|
||||
.code = 0
|
||||
};
|
||||
|
||||
|
|
|
@ -95,6 +95,7 @@ typedef struct SSchTask {
|
|||
int32_t childReady; // child task ready number
|
||||
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
|
||||
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
|
||||
void* handle; // task send handle
|
||||
} SSchTask;
|
||||
|
||||
typedef struct SSchJobAttr {
|
||||
|
|
|
@ -18,6 +18,10 @@
|
|||
#include "query.h"
|
||||
#include "catalog.h"
|
||||
|
||||
typedef struct SSchTrans {
|
||||
void *transInst;
|
||||
void *transHandle;
|
||||
}SSchTrans;
|
||||
static SSchedulerMgmt schMgmt = {0};
|
||||
|
||||
uint64_t schGenTaskId(void) {
|
||||
|
@ -932,6 +936,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
|
|||
pTask = *task;
|
||||
SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode));
|
||||
|
||||
pTask->handle = pMsg->handle;
|
||||
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
|
||||
|
||||
_return:
|
||||
|
@ -1000,6 +1005,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
|||
|
||||
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
|
||||
int32_t code = 0;
|
||||
|
||||
SSchTrans *trans = (SSchTrans *)transport;
|
||||
|
||||
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == pMsgSendInfo) {
|
||||
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
|
||||
|
@ -1018,14 +1026,16 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
|
|||
param->queryId = qId;
|
||||
param->taskId = tId;
|
||||
|
||||
|
||||
pMsgSendInfo->param = param;
|
||||
pMsgSendInfo->msgInfo.pData = msg;
|
||||
pMsgSendInfo->msgInfo.len = msgSize;
|
||||
pMsgSendInfo->msgInfo.handle = trans->transHandle;
|
||||
pMsgSendInfo->msgType = msgType;
|
||||
pMsgSendInfo->fp = fp;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo);
|
||||
code = asyncSendMsgToServer(trans->transInst, epSet, &transporterId, pMsgSendInfo);
|
||||
if (code) {
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
@ -1149,7 +1159,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
|
||||
atomic_store_32(&pTask->lastMsgType, msgType);
|
||||
|
||||
SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
|
||||
SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle};
|
||||
SCH_ERR_JRET(schAsyncSendMsg(&trans, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
|
||||
|
||||
if (isCandidateAddr) {
|
||||
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr));
|
||||
|
|
|
@ -133,15 +133,18 @@ static void clientHandleResp(SCliConn* conn) {
|
|||
rpcMsg.msgType = pHead->msgType;
|
||||
rpcMsg.ahandle = pCtx->ahandle;
|
||||
|
||||
if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP) {
|
||||
if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP ||
|
||||
rpcMsg.msgType == TDMT_VND_RES_READY) {
|
||||
rpcMsg.handle = conn;
|
||||
conn->persist = 1;
|
||||
tDebug("client conn %p persist by app", conn);
|
||||
}
|
||||
|
||||
tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType),
|
||||
inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr),
|
||||
ntohs(conn->locaddr.sin_port));
|
||||
|
||||
conn->secured = pHead->secured;
|
||||
if (conn->push != NULL && conn->ctnRdCnt != 0) {
|
||||
(*conn->push->callback)(conn->push->arg, &rpcMsg);
|
||||
conn->push = NULL;
|
||||
|
@ -156,7 +159,6 @@ static void clientHandleResp(SCliConn* conn) {
|
|||
}
|
||||
}
|
||||
conn->ctnRdCnt += 1;
|
||||
conn->secured = pHead->secured;
|
||||
|
||||
// buf's mem alread translated to rpcMsg.pCont
|
||||
transClearBuffer(&conn->readBuf);
|
||||
|
@ -166,16 +168,14 @@ static void clientHandleResp(SCliConn* conn) {
|
|||
SCliThrdObj* pThrd = conn->hostThrd;
|
||||
|
||||
// user owns conn->persist = 1
|
||||
if (conn->push == NULL || conn->persist == 0) {
|
||||
if (conn->push == NULL && conn->persist == 0) {
|
||||
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
|
||||
|
||||
}
|
||||
destroyCmsg(conn->data);
|
||||
conn->data = NULL;
|
||||
}
|
||||
|
||||
// start thread's timer of conn pool if not active
|
||||
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
|
||||
uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||
// uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
|
||||
}
|
||||
}
|
||||
static void clientHandleExcept(SCliConn* pConn) {
|
||||
|
@ -330,6 +330,9 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b
|
|||
}
|
||||
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||
// impl later
|
||||
if (handle->data == NULL) {
|
||||
return;
|
||||
}
|
||||
SCliConn* conn = handle->data;
|
||||
SConnBuffer* pBuf = &conn->readBuf;
|
||||
if (nread > 0) {
|
||||
|
|
Loading…
Reference in New Issue